深入 EventBus 源码探究

文章目录
  1. 1. register()
  2. 2. subscribe()
  3. 3. post()
  4. 4. unregister()
  5. 5. 小结
  6. 6. 参考文献

学问积年而成,而每日不自知。

上一篇对 Android EventBus 的使用作了简单的小结,我们也知道,EventBus 一直以来也很受开发者的欢迎。EventBus 自 3.0 版本开始,事件的订阅从方法名换成了注解的方式。本篇将解析 EventBus 的源码,以深入理解该框架的实现。

下面,将通过 EventBus 的使用流程来分析其调用流程,从熟悉的使用方法入手,深入到 EventBus 的实现内部,并理解其实现原理。

register()

先从宏观角度把握,看下面的 register() 流程图:

看 register() 方法的源码如下:

1
2
3
4
5
6
7
8
9
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

通过 subscriberMethodFinder 来找到订阅者订阅的事件,返回一个 SubscriberMethod 对象的 List,而 SubscriberMethod 类包含了这个方法的 Method 对象、将来响应订阅的线程模式 ThreadMode、订阅的事件类型 eventType、订阅的优先级 priority 以及是否接收 sticky 事件的 boolean 值。

subscribe()

上文提到的 SubscriberMethod 里包含了所有需要接下来执行 subscribe() 的信息。代码中,有 SubscriberMethodFinder 类的对象,对于 SubscriberMethodFinder,即用来查找和缓存订阅者响应函数信息的类。EventBus 3.0 版本中,提供了一个 EventBusAnnotationProcessor 注解处理器,以在编译期通过 @Subscribe 注解并解析,处理其中包含的信息,生成 Java 类来保存所有订阅者关于订阅的信息,比在运行时通过反射来获取这些订阅者信息的速度快很多。

看 EventBus 项目里的 EventBusPerformance,编译后在 build 文件夹里找到个可以自定义的 MyEventBusIndex 类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* This class is generated by EventBus, do not edit.
*/

public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscriberClassEventBusAsync.class,
true, new SubscriberMethodInfo[]{
new SubscriberMethodInfo("onEventAsync", TestEvent.class, ThreadMode.ASYNC),
}));

putIndex(new SimpleSubscriberInfo(TestRunnerActivity.class, true, new SubscriberMethodInfo[]{
new SubscriberMethodInfo("onEventMainThread", TestFinishedEvent.class, ThreadMode.MAIN),
}));
}

private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}

@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}

代码中,使用一个静态的 HashMap 即 SUBSCRIBER_INDEX 来保存订阅类的信息,其中包含订阅类的 Class 对象、是否需要检查父类和订阅方法信息的 SubscriberMethodInfo 的数组。其中,SubscriberMethodInfo 数组中保存了订阅方法的方法名、订阅的事件类型、线程模式、是否接收 sticky 事件和优先级 priority,即 register() 所有需要的信息。

此外,有一个可选配置 EventBus 时,通过 EventBusBuilder:

1
eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

将编译生成的 MyEventBusIndex 配置进去,就可以在 SubscriberMethodFinder 类中直接查找出订阅类的信息,不再利用注解判断。

SubscriberMethodFinder 类中也提供了由注解获取订阅类信息的方法,其中,findSubscriberMethods() 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从 METHOD_CACHE 取,看是否有缓存,key:订阅类,value:订阅方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 是否忽视注解器生成的 MyEventBusIndex 类
if (ignoreGeneratedIndex) {
// 反射读取订阅类中的订阅方法
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 从注解器生成的 MyEventBusIndex 类中获得订阅类中的订阅方法信息
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

其中,findUsingInfo() 方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

查找所说的 MyEventBusIndex 类中的信息,转换成 List,从而获得订阅类中相关订阅函数的各种信息。再看上面的 findUsingReflection() 方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 做订阅方法的校验和保存
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 反射获取订阅方法信息
findUsingReflectionInSingleClass(findState);
// 查找父类的订阅方法
findState.moveToSuperclass();
}
// 返回 SubscriberMethod 的 List
return getMethodsAndRelease(findState);
}

通过 FindState 类做订阅方法的校验和保存,再看其中的 prepareFindState() 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}

通过 FIND_STATE_POOL 静态数组来保存 FindState 对象,复用 FindState,避免重复创建过多的对象。最终,其是通过 findUsingReflectionInSingleClass() 以获取相关订阅方法的具体信息,findUsingReflectionInSingleClass() 方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
// 反射得到方法数组
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 判断是否只有一个事件参数
if (parameterTypes.length == 1) {
// 获取注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 校验添加方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

至此,订阅类的所有 SubscriberMethod 保存完毕,再通过上面的 getMethodsAndRelease() 返回 List,下面重点看 subscribe() 方法的实现,register()中有 subscribe(subscriber, subscriberMethod),其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 从 subscriptionsByEventType 里检查是否添加过 Subscription 的 List,是则抛出异常
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
// EventBus 存储方法
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
// 由优先级 priority 来添加 Subscription 对象,优先级越高,List 的插入位置越
// 靠前
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
// 将订阅的对象和订阅的事件保存到 typesBySubscriber 中
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) {
// 是否分发订阅了响应事件类中父类事件的方法
if (eventInheritance) {
// 须考虑事件类型子类中存在的粘性事件 sticky events,要注意的是,有大量的 sticky events
// 时,迭代所有的事件或许效率不高,因此,应该改变其数据结构为更有效率的查找,比如,用一个额外
// 的 map 存储父类的子类:Class -> List<Class>。
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

至此,上面的 register() 流程图全部走完,涉及到了 register() 和 subscribe() 方法。核心的要点即是,扫描了所有的方法,将匹配的方法最终保存在 subscriptionsByEventType(Map,key:eventType;value:CopyOnWriteArrayList) 中,eventType 是方法参数的 Class,Subscription 中保存着 subscriber、subscriberMethod(method,threadMode 和 eventType) 和 priority。

post()

register() 时,将方法存在 subscriptionsByEventType 中,post() 会去 subscriptionsByEventType 中取方法,再调用。同样,在翻看源码前,看 post() 的流程图:

看 post() 方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void post(Object event) {
// 获取当前线程的 Posting 状态
PostingThreadState postingState = currentPostingThreadState.get();
// 获取当前线程的事件队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
// 判断当前是否 UI 线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
// 发送单个事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

再看 currentPostingThreadState 的实现如下:

1
2
3
4
5
6
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};

其实现是一个包含了 PostingThreadState 的 ThreadLocal 对象。其中,ThreadLocal 是一个线程内部的数据存储类,由其可以在指定的线程中存储数据,而这段数据不会与其他线程共享,其内部原理即通过生成一个自身包裹的泛型对象的数组,不同的线程赋予不同的数组索引值,如此,每个线程通过 get() 方法获取的时候,取到的是自己线程对应的数据。这里,取到的是每个线程的 PostingThreadState 状态,接下来看发送单个事件的 postSingleEvent() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 是否触发订阅了事件 (eventClass) 的父类,以及接口类的响应方法
if (eventInheritance) {
// 查找 eventClass 类所有的父类及接口,用于匹配
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// post 单个
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 发送 NoSubscriberEvent 事件,若须处理,自行接收
post(new NoSubscriberEvent(this, event));
}
}
}

显然,postSingleEventForEventType() 方法里进行事件的分发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
// 获取订阅事件的 subscription 列表
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 分发给订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

看 postToSubscription() 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

眼熟?是的!Android EventBus 使用小结对这 4 种 ThreadMode 均作过总结。上述代码即从 subscriptionsByEventType 里获得所有订阅该事件的 subscription 列表,而后,通过 postToSubscription() 方法分发事件,最后在 postToSubscription() 里通过不同的 ThreadMode 在不同的线程里调用订阅者的方法。看 invokeSubscriber() 方法的源码如下:

1
2
3
4
5
6
7
8
9
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

即通过反射调用订阅者的订阅函数,并把 event 对象作为参数传入。至此,post() 的大致流程也结束了。

unregister()

最后一步,注销掉,看 unregister() 方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 分别解除每个订阅了的事件类型
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

再看 unsubscribeByEventType() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
// 依次取消订阅
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

小结

EventBus 里使用注解处理器,预处理获取订阅信息,其将订阅者的方法缓存到METHOD_CACHE里避免重复查找。其最大的作用即是“通讯”,在同进程中进行不同线程间的通讯,简化了 Handler 的操作。但是,在不同进程之间,该通信方法便无法继续,只能借助广播来完成,是由于不同进程中无法维持一个事件单列。至此,深入 EventBus 的源码探究完毕,EventBus 的使用小结和源码探究两篇文章到此结束。

本人才疏学浅,如有疏漏错误之处,望读者中有识之士不吝赐教,谢谢。

1
Email: [email protected] / WeChat: Wolverine623

您也可以关注我个人的微信公众号码农六哥第一时间获得博客的更新通知,或后台留言与我交流

参考文献

1.http://p.codekk.com/blogs/detail/54cfab086c4761e5001b2538

2.http://blog.csdn.net/lmj623565791/article/details/40920453

3.http://t.cn/RGaG7AG