EventBus源码分析

什么是EventBus

EventBus是一个使用”观察者模式”的、松耦合的开源框架,能够帮助我们简化代码、松依赖、加速开发。

EventBus基于观察者模式中的发布-订阅(Publish-Subscribe)模式(事件总线),与传统的观察者模式
不同,在传统的观察者模式下,我们更多是处于订阅者的身份,数据的发布大多数情况下我们是不可控的,我
们一般只能触发数据开始发送,而EventBus则将发布(post)和订阅的能力都交由我们自己处理;
当然,EventBus同样拥有观察者模式的优点,就是发布者和订阅者是解耦的。

1、订阅者注册时,会用两个HashMap来保存其信息,分别是根据事件类型(key)保存订阅信息
(订阅者 - 订阅方法)的subscriptionsByEventType和根据订阅者(key)保存其对应的所有订阅事件的
typesBySubscriber;

2、解析订阅者信息时,有两种方式,第一种是配置了EventBusAnnotationProcessor的,则在编译时
会扫描对应的@Subscribe注解,将所有订阅方法的信息解析生成到配置的EventBusIndex类文件中,后面
直接根据订阅者类名从该EventBusIndex类中拿到其对应的SubscriberMethod信息,即编译时解析;另一
中方式则是通过反射,在订阅者订阅时,动态解析获取其中的订阅方法信息;第一种的效率要远高于第二种,
但是配置起来更加繁琐,必须在Gradle中引入注解解析器,配置注解生成类,还必须手动初始化配置
EventBusIndex类;

3、在添加订阅者订阅方法信息时,借用HashMap特点去重的特点(不允许存在相同key),进行两重检查(
1、以事件类型为key进行检查;2、以方法签名(方法名+事件类型)为key进行检查,允许添加父子类中相同签
名的方法);

4、sticky事件实现原理:创建一个ConcurrentHashMap,以事件类型为key,用来保存所有的Sticky事件
。在调用postSticky时,就将事件添加到该Map中。在扫描添加订阅者的订阅方法信息(Subscription)时,
如果订阅方法是sticky的,则检查Map中是否有保存订阅方法订阅的sticky事件,有则立刻发送给该订阅方法。

5、在进行事件发送时,根据事件类型从subscriptionsByEventType中拿到对应的订阅信息Subscription
,根据Subscription中保存的订阅者和订阅方法信息,利用不同线程的Poster,在指定线程上使用反射调
起订阅方法;

6、解除订阅的过程实际就是操作订阅者注册时,添加的两个HashMap:subscriptionsByEventType和
typesBySubscriber,分别从typesBySubscriber中移除订阅者订阅的所有事件,从
subscriptionsByEventType中移除订阅者的订阅事件的Subscription;

image

image

源码分析

EventBus实例创建

使用懒汉单例创建:

1
2
3
4
5
6
7
8
9
10
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}

构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
EventBus(EventBusBuilder builder) {
// key:订阅的事件,value:订阅这个事件的所有订阅者集合
// private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
subscriptionsByEventType = new HashMap<>();

// key:订阅者对象(Activity/Fragment),value:这个订阅者订阅的事件集合(上面集合的key)
// private final Map<Object, List<Class<?>>> typesBySubscriber;
typesBySubscriber = new HashMap<>();

// 粘性事件 key:粘性事件的class对象, value:事件对象
// private final Map<Class<?>, Object> stickyEvents;
stickyEvents = new ConcurrentHashMap<>();

// 事件主线程处理(继承自Handler)
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

// 事件Background同步处理(后台线程)
// 采用Executors.newCachedThreadPool()来后台处理
backgroundPoster = new BackgroundPoster(this);

// 事件异步线程处理(和BackgroundPoster一样,也用Executors.newCachedThreadPool
// 处理异步事件,但是多个事件是异步的)
asyncPoster = new AsyncPoster(this);

indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;

//订阅者订阅函数的信息存储和查找类
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;

//是否支持事件继承(存在继承关系的事件,父类是否也要受到通知)
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

CopyOnWriteArrayList解决了多线程并发的问题,在添加时,会拷贝原数组的到新的数组中,在添加
时,在新的数组中进行操作,并且多线程使用锁机制,而读取时,没有加锁(根据数组是否处于添加状态,可能
读取到的是旧数组,也可能是新数组,所以读取实时性差),适合读多写少的场景,不适合数据量巨大的
场景(拷贝太耗性能).

ConcurrentHashMap是高性能的并发HashMap,相比于HashTable更加高效。不同于HashTable全局
只有一把锁的情况(多线程并发的情景下,抢不到锁的线程需要等待其他线程释放锁,会先被挂起),
ConcurrentHashMap采用的是分段锁(所有数据根据Hash运算分到不同段,一个段分配一把锁,这样在多线
程并发的情景下,不同线程访问到不同的数据段时,不需要等待锁的释放,减少不必要的等待和上下文切换开
销)。

注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void register(Object subscriber) {
// 首先获得订阅者的class对象
Class<?> subscriberClass = subscriber.getClass();
// 通过subscriberMethodFinder来找到订阅者订阅了哪些事件.返回一个SubscriberMethod对象的List,SubscriberMethod
// 里包含了这个方法的Method对象,以及将来响应订阅是在哪个线程的ThreadMode,订阅的事件类型eventType,订阅的优
// 先级priority,是否接收粘性sticky事件的boolean值.
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//订阅
subscribe(subscriber, subscriberMethod);
}
}
}
findSubscriberMethods

该方法用来查找到订阅者订阅的事件(订阅方法的第一个参数类型)、线程、是否sticky、优先级等信息封装到
SubscriberMethod对象中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从METHOD_CACHE取看是否有缓存,key:保存订阅者的类名,value:保存类中订阅的方法数据
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

// 是否忽略注解器生成的MyEventBusIndex类(强制使用注解的方式)
if (ignoreGeneratedIndex) {
// 利用反射来读取订阅者中的订阅方法信息
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 从注解器生成的MyEventBusIndex类中获得订阅类的订阅方法信息
// 注解生成器需要额外的配置在gradle中
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 保存进METHOD_CACHE缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

findSubscriberMethods中包含了两种解析获取订阅者信息的方法:

1、findUsingReflection(subscriberClass);

这种方法通过反射的方式,获取订阅者类中订阅者的相关信息;这种方式也是EventBus2.0的做法,效率较低;

2、findUsingInfo(subscriberClass);

这种方式通过引入一个注解解析器(EventBusAnnotationProcessor),在编译时期动态生成一个注解解析
类(一般配置为MyEventBusIndex),来解析订阅者的注解;

这是EventBus3.0新引入的特性,由于是编译时生成,所以运行时不需要耗费额外的性能去解析注解,
但是相应的,编译的周期将会变长。
EventBusAnnotationProcessor并不是EventBus3.0标配的,需要额外配置引入;

编译期解析注解

在编译时,注解处理器会去扫描源码中的注解,然后交给对应的注解处理器进行处理,比如EventBusAnnotationProcessor注册了处理Subscribe注解,在编译时,扫描到的注解就会交给它处
理,EventBusAnnotationProcessor检查注解信息是否合法(订阅方法、订阅者及父类修饰权限,
方法是否静态等),然后通过IO输出对应的Java文件(build文件夹下)–MyEventBusIndex.java

如果引入了EventBusAnnotationProcessor,则注解将在编译器被解析进一个新生成的类中,该类的类名
可以由我们配置:

image

在annotationProcessorOptions配置了生成的类名,配置之后会自动生成该类,我们可以直接使用!

假设使用官方Demo中的MyEventBusIndex来命名;

这种方式需要在第一个使用EventBus时,先对EventBus进行配置(一般在Application中完成配置):

1
2
3
4
5
6
7
8
9
10
11
// 这种方式生成一个我们配置的EventBus,当时我们必须将其保存为全局变量,
// 以后每次使用时,使用给实例,比较麻烦
EventBus mEventBus = EventBus.builder()
.addIndex(new MyEventBusIndex())
.build();

// 这种方式将我们配置的EventBus实例代替了全局的默认单例,以后使用EventBus.getDefault()
// 获取到的都是我们配置的EventBus,推荐!
EventBus mEventBus = EventBus.builder()
.addIndex(new MyEventBusIndex())
.installDefaultEventBus();

MyEventBusIndex在配置完gradle后,就会生成,其中的内容在编译期动态填充:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* This class is generated by EventBus, do not edit.
*/
public class MyEventBusIndex implements SubscriberInfoIndex {
// 保存订阅者的类名和订阅信息
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
// 在编译器找到对应的订阅者类名称和其中对应的注解信息
// 保存订阅者类名、是否检查父类订阅已经注解的信息进SimpleSubscriberInfo中
// SubscriberMethodInfo中保存了订阅事件的方法名、订阅的事件类、订阅方法被调用的线程、优先级已
// 及是否sticky(具体看SubscriberMethodInfo类实现)
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscriberClassEventBusAsync.class,
true, new SubscriberMethodInfo[]{
new SubscriberMethodInfo("onEventAsync", TestEvent.class, ThreadMode.ASYNC),
}));
// 在编译器时,类名已经是确定了,可以直接拿到
putIndex(new SimpleSubscriberInfo(TestRunnerActivity.class, true, new SubscriberMethodInfo[]{
new SubscriberMethodInfo("onEventMainThread", TestFinishedEvent.class, ThreadMode.MAIN),
}));
}

private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}

@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}

流程

1、findUsingInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 根据订阅者的信息调用getSubscriberInfo进行查找FindState的信息
// FindState封装了订阅这的信息:subscriberInfo
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

2、getSubscriberInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private SubscriberInfo getSubscriberInfo(FindState findState) {
// 如果FindState中已经保存过SubscriberInfo,则直接取出
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
// 没有则进入SubscriberInfoIndex集合中进行查找
// 这个SubscriberInfoIndex实际就是我们配置EventBus时,调用addIndex添加进来的
// MyEventBusIndex,调用其getSubscriberInfo可以拿到编译期生成的注册信息
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

3、回到findUsingInfo中,此时subscriberInfo已经取到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 此处使用数组作为池来缓存FindState,默认创建了4个FindState,当需要新的FindState时,
// 用新的引用指向其中一个数组元素,然后让数组元素置空;
// 这样做的目的是为了避免重复创建FindState实例,因为FindState对象较大
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 根据订阅者的信息调用getSubscriberInfo进行查找FindState的信息
// FindState封装了订阅这的信息:subscriberInfo
findState.subscriberInfo = getSubscriberInfo(findState);
// 如果没有subscriberInfo,则改用反射的方式来获取订阅者信息
if (findState.subscriberInfo != null) {
// 从subscriberInfo中解析出订阅者信息,再添加进findState.subscriberMethods集合中
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
// 将findState.clazz重置为当前类的父类,再次查找订阅信息
findState.moveToSuperclass();
}
// 只需要SubscriberMethod信息,findState清空,回收回池中
return getMethodsAndRelease(findState);
}

运行期反射注解

运行期反射的方式来获取订阅者的配置信息,这种方式是EventBus2.0的做法,相对来说,效率更低,因为
在程序运行期间,需要耗费额外的性能来解析订阅者;

1、findUsingReflection

1
2
3
4
5
6
7
8
9
10
11
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 流程大致与编译期的方式相同
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 唯一的区别在这里,在这个方法中进行了反射操作,来解析订阅者信息
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

2、findUsingReflectionInSingleClass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
// 通过反射获取订阅者的所有方法
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
// 获取方法的修饰权限
int modifiers = method.getModifiers();
// 修饰权限是public,并且不是static,abstract、synthetic等
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 拿到方法的所有参数的类型
Class<?>[] parameterTypes = method.getParameterTypes();
// 判断方法是否只有一个参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 如果是订阅方法
if (subscribeAnnotation != null) {
// 拿到订阅方法的第一个参数类型,即订阅者订阅的事件类型
Class<?> eventType = parameterTypes[0];
// 校验订阅方法的订阅的事件类型是否在同一个订阅者中已经订阅了,没有这返回true,
// 如果已经已经订阅,则校验两个订阅方法是否有相同的方法签名;
if (findState.checkAdd(method, eventType)) {
// 解析@Subscribe注解上的信息,拿到被调用线程、调用优先级、是否sticky信息
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 如果订阅方法的修饰权限存在问题,则直接抛出异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

checkAdd方法是一级检查,根据事件类型来检查同一订阅者是否有重复的订阅方法的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
// 通常情况下,一个订阅者中不会同时存在两个方法订阅相同的事件!
// 根据事件类型来检查,查看该订阅者是否已经有方法订阅了该事件
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
// 如果已经有方法订阅了该事件,检查是否是相同的方法
if (existing instanceof Method) {
// 检查是否是重复添加的订阅方法,有可能存在父子类继承的关系,则是允许的,否则
// 抛出异常
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// 说明两个订阅方法存在继承关系,添加任意对象替代method,
// 下次遇到这种情况,直接对比方法签名即可
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
// 因为前面的方法已经限制了订阅方法参数个数为1个,所以不存在通过控制参数个数来重载的
// 订阅方法的情况,只要方法名或事件类型存在不同,则可以添加为订阅方法
return checkAddWithMethodSignature(method, eventType);
}
}

checkAddWithMethodSignature是二级检查,根据方法前面来进行重复订阅方法的检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
// 构造方法签名
String methodKey = methodKeyBuilder.toString();
// 拿到订阅方法类型
Class<?> methodClass = method.getDeclaringClass();
// 根据HashMap的特点,检查是否已经有相同方法签名的订阅方法
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
// 没有,或者两个方法存在父子类继承关系
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// 此时两个订阅方法重复了
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}

sticky事件:
EventBus中可以发送Sticky事件,这种事件在发送之后会被缓存起来,以后如果有订阅者订阅了sticky事件,
则缓存起来的sticky事件会立刻发送给订阅者;

1
2
3
4
5
6
7
8
// 移除sticky事件
MessageEvent stickyEvent = EventBus.getDefault().getStickyEvent(MessageEvent.class);
// Better check that an event was actually posted before
if(stickyEvent != null) {
// "Consume" the sticky event
EventBus.getDefault().removeStickyEvent(stickyEvent);
// Now do something with it
}

订阅subscribe

在获取到订阅者的信息之后,接下来需要同步的将信息注册进EventBus中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 订阅操作必须位于同步代码块中
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 获取订阅的事件类型
Class<?> eventType = subscriberMethod.eventType;
// 创建一个订阅类
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 检查对应事件类型的订阅类是否已经存在了
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
// 一个事件类型可能有多个订阅者,这里定义一个集合来保存所有的订阅者,然后,再根据
// 事件类型保存到HashMap中
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

// 根据订阅者订阅时设置的优先级,插入到集合中,集合索引越小,优先级越高
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

// 检查订阅者的订阅的事件集合是否已经存在,不存在则创建一个保存订阅者订阅的所有事件的集合
// 以订阅者为key,保存到typesBySubscriber HashMap中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 将当前订阅事件保存到订阅者订阅的所有事件的集合中
subscribedEvents.add(eventType);

// 检查订阅的方法是否是sticky
if (subscriberMethod.sticky) {
// EventBus中事件默认是可以继承的
if (eventInheritance) {
// sticky事件在Post的时候就会被保存到stickyEvents集合(ConcurrentHashMap)中,
// 发送事件之前,先加载所有的sticky事件是否有与当前事件存在继承关系的,有则
// 发送该子类或父类事件
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
// sticky事件在被订阅时立即会发送,因为这种事件发送之后会被缓存起来,
// 当有新的订阅者时,就会立即把缓存的事件发送给订阅者
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

image

发送事件post

1、post

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 由于同一线程是同步的,不同线程拿到的是不同的PostingThreadState,所以同一时间拿到的eventQueue
// 不可能存在1个以上的事件
public void post(Object event) {
// 获取当前线程保存的发送状态信息(PostingThreadState是一个静态类,保存了当前线程的订阅事件队列
// 订阅信息类等信息,每一个线程都有单独的一个PostingThreadState,保存在ThreadLocal中)
PostingThreadState postingState = currentPostingThreadState.get();
// 拿到订阅事件队列,将发送的事件添加进去
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

// 如果当前线程不处于发送事件的状态
if (!postingState.isPosting) {
// 保存当前线程是否为主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
// 标记为发送状态
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环订阅事件队列,不断单个发送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 发送事件结束
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

2、postSingleEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 找到发送事件的类型
Class<?> eventClass = event.getClass();
// 用来判断是否找到事件的订阅者
boolean subscriptionFound = false;
// 如果需要考虑订阅事件的父类或接口的订阅者
if (eventInheritance) {
// 找到当前订阅事件的所有父类和实现接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 只要当前事件类和一个父类或接口中有一个找到了其订阅者,则算找到订阅者
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 不考虑订阅事件的父类和接口
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 没有找到事件的订阅者
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
// 如果没有找到事件的订阅者,并且设置了sendNoSubscriberEvent为true,则会发送
// NoSubscriberEvent事件
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

3、postSingleEventForEventType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
// 拿到在subscribe时保存的订阅了对应事件的所有订阅信息集合(订阅者、订阅的方法)
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍历每一个订阅信息实例,使用PostingThreadState来保存信息
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
// 分发过程是否被中断
boolean aborted = false;
try {
// 分发事件给订阅者
postToSubscription(subscription, event, postingState.isMainThread);
// 优先级高的订阅者在接收到事件后可以调用cancelEventDelivery来取消事件
// 的分发因为这个过程是循环(线性)进行的,后面的订阅者将被取消接收
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

4、postToSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 判断订阅者设置的被调用线程
switch (subscription.subscriberMethod.threadMode) {
// 哪个线程发送事件,就在对应的线程上调用订阅者方法
// 注意如果是主线程,不能进行耗时操作
case POSTING:
invokeSubscriber(subscription, event);
break;
// 不管在哪个线程发送事件,始终在主线程调用订阅方法
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
// 在这些Poster中最终也是调用了eventBus.invokeSubscriber(pendingPost)
mainThreadPoster.enqueue(subscription, event);
}
break;
// 如果事件不是在主线程发送的,则订阅方法会在相同的线程中被调用
// 如果是在主线程,则会启动一个唯一的后台线程去处理,如果事件有多个时,则会用队列管理这些
// 事件,依次发送
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
// 不管是在哪个线程发送事件,始终会开启一个新的线程来调用订阅方法
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

5、invokeSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
// 这个方法真正执行了订阅方法的调用
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 采用了反射的方式(类似于动态代理),调用订阅者的订阅方法
// 这种方式调用方法,比用传统方式调用要更耗时
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

image

解除订阅unregister

解除订阅实际就是从原先订阅是保存进的集合中清除对应的订阅者即可;

1、unregister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void unregister(Object subscriber) {
// 拿到当前订阅者订阅的所有事件的集合(typesBySubscriber在subscribe时创建的)
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 解除订阅者订阅的所有事件
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 移除对应的订阅者
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

2、unsubscribeByEventType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 拿到所有订阅该事件的订阅信息集合(每个订阅信息中保存了对应的订阅者,已经订阅的方法信息
// 同样,subscriptionsByEventType也是在subscribe是创建)
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
// 遍历,移除订阅者对应的订阅信息
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
// 一般情况下,同一订阅者(同一Activity/Fragment)不会同时有两个订阅方法订阅
// 同一事件,但不能排除有这种可能
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

参考 : https://www.jianshu.com/p/f057c460c77es