EventBus 源码架构与内部实现
在上一篇文章中,我们学习了 EventBus 的设计思想与核心 API 用法。我们知道了三步就可以让组件通信——register、@Subscribe、post。但这个极简的 API 表面之下,隐藏着一个精心设计的高性能事件分发引擎。
本文将从 EventBus 的源码出发,逐层剖析以下核心问题:
register()是如何"发现"订阅方法的?反射扫描与编译时索引分别是怎么工作的?post()是如何将事件从一个线程安全地投递到另一个线程的?- "粘性事件"的缓存与回放机制,在源码中是怎样实现的?
- EventBus 使用了哪些"隐藏的"性能优化手段?
本文基于 EventBus 3.3.1(最新稳定版)的源码进行分析。
一、 整体架构:一张地图
在深入任何单个流程之前,我们先建立一个 EventBus 的全局心智模型。
┌───────────────────────────────────────────────────────────────────┐
│ EventBus │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 核心数据结构 │ │
│ │ │ │
│ │ subscriptionsByEventType: Map<Class, List<Subscription>> │ │
│ │ typesBySubscriber: Map<Object, List<Class>> │ │
│ │ stickyEvents: Map<Class, Object> │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────┐ ┌──────────────────────────────────────┐ │
│ │ 注册引擎 │ │ 分发引擎 │ │
│ │ │ │ │ │
│ │ SubscriberMethod│ │ currentPostingThreadState (TLS) │ │
│ │ Finder │ │ ┌──────────┐ ┌───────────────────┐ │ │
│ │ ┌─────────┐ │ │ │ Handler │ │ BackgroundPoster │ │ │
│ │ │反射扫描 │ │ │ │ Poster │ │ │ │ │
│ │ ├─────────┤ │ │ ├──────────┤ ├───────────────────┤ │ │
│ │ │编译时索引│ │ │ │ AsyncPoster │ │ │
│ │ └─────────┘ │ │ └──────────────────────────────────┘ │ │
│ └─────────────────┘ └──────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────┘
EventBus 本质上由两大引擎组成:
- 注册引擎(SubscriberMethodFinder):负责在注册时发现订阅者的
@Subscribe方法 - 分发引擎(Poster 体系):负责在发布事件时,根据线程模式将事件投递到正确的线程
中间的桥梁是三个核心数据结构:
subscriptionsByEventType:事件类型 → 订阅关系列表(正向索引,用于 post)typesBySubscriber:订阅者对象 → 事件类型列表(反向索引,用于 unregister)stickyEvents:事件类型 → 最新的粘性事件快照
二、 注册流程:从 register() 到方法发现
2.1 register() 方法的源码解析
当我们调用 EventBus.getDefault().register(this) 时,发生了什么?
// EventBus.java
public void register(Object subscriber) {
// 1. 获取订阅者的 Class 对象
Class<?> subscriberClass = subscriber.getClass();
// 2. 【核心】通过 SubscriberMethodFinder 查找该类中所有的 @Subscribe 方法
List<SubscriberMethod> subscriberMethods =
subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 3. 在同步块中逐个建立订阅关系
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
整个 register 过程可以拆为两大步骤:发现(找到所有订阅方法)和绑定(将方法与事件类型关联到数据结构中)。
2.2 方法发现:SubscriberMethodFinder
SubscriberMethodFinder 是 EventBus 的"侦察兵"。它负责回答一个问题:这个类(及其父类)中,哪些方法被 @Subscribe 标注了?
// SubscriberMethodFinder.java
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 第一道缓存:如果之前已经扫描过这个类,直接返回缓存结果
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
// 路径 A:纯反射扫描
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 路径 B:优先使用编译时索引,索引缺失时降级为反射
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 写入缓存,同一个类后续不再重复扫描
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
这里的设计体现了一个重要的性能优化思路:双路查找 + 结果缓存。
findSubscriberMethods(SubscriberClass)
│
├── 命中 METHOD_CACHE? ──→ 是 → 直接返回
│
└── 未命中
│
├── ignoreGeneratedIndex = true
│ └── findUsingReflection() ← 纯反射
│
└── ignoreGeneratedIndex = false
└── findUsingInfo() ← 优先索引,降级反射
│
├── 索引中找到了? → 使用索引数据
└── 索引中没有? → 对当前类执行反射扫描
路径 A:反射扫描的源码
当没有设置编译时索引时,EventBus 会使用 Java 反射来"手动翻阅"类的方法表:
// SubscriberMethodFinder.java
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// getDeclaredMethods() 比 getMethods() 更快,
// 因为它不需要向上遍历父类链。
// 对于 Activity 这样的"胖类",性能差异显著。
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 降级方案:某些旧设备上 getDeclaredMethods 会抛出 NoClassDefFoundError
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true; // getMethods 已经包含父类方法,标记跳过
}
for (Method method : methods) {
int modifiers = method.getModifiers();
// 条件1:方法必须是 public
// 条件2:方法不能是 abstract、static、bridge 或 synthetic(编译器合成方法)
if ((modifiers & Modifier.PUBLIC) != 0
&& (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// 条件3:方法只有一个参数
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 条件4:方法被 @Subscribe 注解标注
Class<?> eventType = parameterTypes[0]; // 参数类型即事件类型
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(
method, eventType, threadMode,
subscribeAnnotation.priority(),
subscribeAnnotation.sticky()
));
}
}
}
}
}
}
一个合格的订阅方法必须满足四个"硬条件":
| 条件 | 描述 | 不满足会怎样 |
|---|---|---|
public |
方法必须是公开的 | 反射调用 method.invoke() 会抛 IllegalAccessException |
| 非 abstract/static | 方法必须有具体实现、属于实例 | 无法对抽象方法或静态方法进行有效的事件投递 |
| 1 个参数 | 参数类型用于标识该方法订阅的事件类型 | EventBus 无法判定这个方法关心哪种事件 |
@Subscribe |
必须有注解标记 | EventBus 不会扫描未标注的方法 |
路径 B:编译时索引的查找逻辑
当我们通过 EventBus.builder().addIndex(MyEventBusIndex()) 注入了编译时索引,findUsingInfo 方法会优先从索引中查询:
// SubscriberMethodFinder.java
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
// 从当前类向上逐层遍历父类链
while (findState.clazz != null) {
// 尝试从索引中获取该类的订阅信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
// 索引命中:直接取出预编译好的 SubscriberMethod 数组
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 索引未命中(该类不在索引覆盖范围内):降级为反射
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
这段代码的精妙之处在于:它不是"全有或全无"的——索引覆盖了的类用索引,没覆盖到的类降级为反射。这保证了即使某些类不在索引中(比如动态加载的类),EventBus 也不会崩溃。
FindState 对象池
注意 prepareFindState() 方法——它不是每次都 new FindState(),而是从一个固定大小为 4 的对象池中复用:
private static final int POOL_SIZE = 4;
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null; // 取出后置空
return state;
}
}
}
return new FindState(); // 池子空了才创建新对象
}
这是一个典型的轻量级对象池设计。FindState 内部持有多个 List 和 Map,每次 register 都创建新的会产生大量短命对象,加重 GC 压力。通过对象池复用,可以显著减少不必要的内存分配。用完后调用 findState.recycle() 清空内部状态,还回池中。
2.3 建立订阅关系:subscribe() 方法
方法被发现后,subscribe() 方法将它们"登记"到 EventBus 的核心数据结构中:
// EventBus.java — 必须在 synchronized 块中调用
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// ====== 步骤 1:更新正向索引 subscriptionsByEventType ======
CopyOnWriteArrayList<Subscription> subscriptions =
subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 防重复注册:同一个对象不能注册两次
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass()
+ " already registered to event " + eventType);
}
}
// 按优先级插入,高优先级排前面
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size
|| subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// ====== 步骤 2:更新反向索引 typesBySubscriber ======
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// ====== 步骤 3:处理粘性事件 ======
if (subscriberMethod.sticky) {
if (eventInheritance) {
// 如果开启了事件继承,需要遍历所有已缓存的粘性事件
// 检查是否有该事件类型的子类型也匹配
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
让我们用一幅图来展示注册完成后的数据结构状态:
假设 MainActivity 有两个订阅方法:
@Subscribe fun onLogin(LoginEvent) priority=5
@Subscribe(sticky=true) fun onTheme(ThemeEvent) priority=1
注册后的数据结构:
subscriptionsByEventType (正向索引):
┌──────────────────┬──────────────────────────────────────────┐
│ LoginEvent.class │ → [Subscription(MainActivity, onLogin)] │
├──────────────────┼──────────────────────────────────────────┤
│ ThemeEvent.class │ → [Subscription(MainActivity, onTheme)] │
└──────────────────┴──────────────────────────────────────────┘
typesBySubscriber (反向索引):
┌──────────────────┬─────────────────────────────────────┐
│ mainActivity实例 │ → [LoginEvent.class, ThemeEvent.class]│
└──────────────────┴─────────────────────────────────────┘
为什么需要"反向索引" typesBySubscriber? 因为 unregister(subscriber) 时,我们需要快速知道"这个订阅者关心哪些事件类型",然后逐一从 subscriptionsByEventType 中移除。如果没有反向索引,就需要遍历整个 subscriptionsByEventType 的所有 Key-Value 对——这在事件类型很多时性能不可接受。
为什么使用 CopyOnWriteArrayList? 因为 post() 在遍历订阅者列表时(读操作),可能同时有其他线程在执行 register() 或 unregister()(写操作)。CopyOnWriteArrayList 在写入时会复制一个新数组,不影响正在进行的读遍历,天然支持读多写少的并发场景。
三、 注销流程:unregister() 的清理逻辑
// EventBus.java
public synchronized void unregister(Object subscriber) {
// 1. 通过反向索引,快速获取该订阅者关心的所有事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 2. 逐一从正向索引中移除
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 3. 从反向索引中彻底移除该订阅者
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING,
"Subscriber to unregister was not registered before: "
+ subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false; // 标记为失活
subscriptions.remove(i);
i--;
size--;
}
}
}
}
注意 subscription.active = false 这行代码。为什么要先标记为失活再从列表中移除?因为在并发场景下,可能正有另一个线程在遍历 subscriptions 并准备向这个 Subscription 投递事件。标记为 active = false 后,投递方法 invokeSubscriber(PendingPost) 中会检查这个标志:
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) { // ← 这里检查!已注销的就跳过
invokeSubscriber(subscription, event);
}
}
这是一种经典的安全失效(Fail-Safe) 设计——即使 unregister 和事件投递之间存在竞态条件,也不会导致向已销毁的对象投递事件。
四、 事件分发流程:从 post() 到订阅方法调用
4.1 post() 方法的源码解析
// EventBus.java
public void post(Object event) {
// 1. 获取当前线程的 PostingThreadState(ThreadLocal 提供线程隔离)
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
// 2. 将事件加入当前线程的事件队列
eventQueue.add(event);
// 3. 防止递归重入
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true; // 标记"我正在投递"
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 4. 循环处理队列中的所有事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 5. 清理状态
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
这里有两个非常值得学习的设计:
设计 1:ThreadLocal + 事件队列
PostingThreadState 通过 ThreadLocal 存储,每个线程都有自己独立的事件队列和状态:
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>(); // 事件队列
boolean isPosting; // 是否正在投递(防重入)
boolean isMainThread; // 当前是否在主线程
Subscription subscription; // 当前正在处理的订阅关系
Object event; // 当前正在处理的事件
boolean canceled; // 是否被取消
}
这就像每个柜台服务员(线程)都有自己独立的待办任务栏(eventQueue)。即使多个服务员同时在工作,他们各自的任务栏互不干扰。这种设计彻底消除了
post()方法中对全局锁的需求——因为每个线程只操作自己的队列。
设计 2:isPosting 防重入
isPosting 标志位用于处理一个边界情况:如果在 @Subscribe 方法内部又调用了 post(),新事件只会被加入队列,不会立即触发新一轮的 while 循环。当前次 while 循环会在处理完当前事件后,自然地拿到新入队的事件并处理。
线程A调用 post(Event1):
isPosting = true
取出 Event1 → 分发 → 订阅方法中又调用了 post(Event2)
Event2 被加入 eventQueue,但 isPosting 已经是 true,直接返回
Event1 处理完毕 → while 循环继续 → 取出 Event2 → 分发
队列空了 → isPosting = false
4.2 事件类型匹配:事件继承机制
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// 查找该事件类型的所有父类和接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 只匹配精确类型
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
// 没有任何订阅者关心这个事件
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class) {
post(new NoSubscriberEvent(this, event)); // 发布一个"无人接收"的元事件
}
}
}
eventInheritance 默认为 true,意味着 EventBus 支持事件类型的多态匹配。如果你发布了一个 LoginSuccessEvent extends BaseEvent,那么订阅 BaseEvent 的方法也会收到这个事件。
lookupAllEventTypes() 的实现是递归遍历父类链和接口树,并将结果缓存:
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass(); // 向上遍历父类链
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
lookupAllEventTypes(LoginSuccessEvent.class) 的遍历路径:
LoginSuccessEvent → BaseUserEvent → BaseEvent → Object
↓
Serializable (接口)
结果: [LoginSuccessEvent, BaseUserEvent, Serializable, BaseEvent, Object]
性能权衡: 事件继承机制提供了灵活性,但每次 post 都需要遍历父类链(虽然有缓存)。如果你不需要事件多态,可以通过 EventBus.builder().eventInheritance(false) 关闭它,获得更好的性能。
4.3 线程切换核心:postToSubscription() 与 Poster 体系
这是 EventBus 最核心的分发逻辑——根据订阅方法声明的 ThreadMode,选择不同的投递策略:
private void postToSubscription(Subscription subscription, Object event,
boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 同步直调:在当前线程直接调用,零开销
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 已在主线程:同步直调
invokeSubscriber(subscription, event);
} else {
// 不在主线程:通过 HandlerPoster 入队,切换到主线程
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
// 无论当前线程,始终入队排队执行
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 在主线程发布:切到后台线程执行
backgroundPoster.enqueue(subscription, event);
} else {
// 已在后台线程:同步直调
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 始终异步执行,提交到线程池
asyncPoster.enqueue(subscription, event);
break;
}
}
我们可以把这个逻辑理解为一个智能路由器:
postToSubscription(event, isMainThread)
│
├─ POSTING ──────→ invokeSubscriber() [零切换,直接调用]
│
├─ MAIN ─────────→ isMainThread?
│ ├─ 是 → invokeSubscriber() [同步]
│ └─ 否 → HandlerPoster [切到主线程]
│
├─ MAIN_ORDERED ─→ HandlerPoster [始终排队]
│
├─ BACKGROUND ───→ isMainThread?
│ ├─ 是 → BackgroundPoster [切到后台]
│ └─ 否 → invokeSubscriber() [同步]
│
└─ ASYNC ────────→ AsyncPoster [始终异步]
4.4 三大 Poster 的实现
HandlerPoster:主线程投递器
HandlerPoster 的核心原理就是利用 Android 的 Handler + Looper 消息机制。它继承自 Handler,关联到主线程的 Looper,将事件封装到 Message 中发送。
其工作流程如下:
子线程调用 post(event):
↓
HandlerPoster.enqueue(subscription, event):
→ 将 event 封装为 PendingPost 入队
→ 发送一个空 Message 到主线程的 MessageQueue
↓
主线程的 Looper 取出 Message:
→ HandlerPoster.handleMessage() 被调用
→ 从 PendingPost 队列中取出事件
→ 调用 eventBus.invokeSubscriber(pendingPost)
HandlerPoster 的一个巧妙设计是它在 handleMessage() 中不是只处理一个事件就返回,而是在一个 while 循环中批量处理队列中的事件,只有当处理时间超过 maxMillisInsideHandleMessage(默认 10ms)时才主动让出主线程,防止阻塞 UI 渲染。
BackgroundPoster:后台串行投递器
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning; // 是否已有任务在执行
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 只在没有线程在处理时,才向线程池提交新任务
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
// 阻塞等待队列中的下一个事件(最多等1秒)
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll(); // double-check
if (pendingPost == null) {
executorRunning = false; // 队列确实空了,退出
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING,
Thread.currentThread().getName() + " was interrupted", e);
}
} finally {
executorRunning = false;
}
}
}
关键特性:串行执行。
BackgroundPoster 始终最多只有一个 Runnable 在线程池中执行。当新事件入队时:
- 如果
executorRunning == true(已有线程在跑run()循环),只入队不提交新任务——正在运行的while循环会自然拿到新入队的事件 - 如果
executorRunning == false(没有线程在跑),才提交一个新的run()任务
这保证了所有 BACKGROUND 模式的事件按入队顺序串行执行,不会出现并发问题。但代价是:一个慢的订阅方法会阻塞后续所有 BACKGROUND 事件的处理。
AsyncPoster:异步并行投递器
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
// 每次入队都提交一个新的 Runnable 到线程池
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
与 BackgroundPoster 形成鲜明对比:AsyncPoster 的 enqueue() 每次都向线程池提交一个新任务。每个事件都有独立的线程来执行。
BackgroundPoster vs AsyncPoster:
BackgroundPoster (串行):
线程池 Thread-1: [Event1 → Event2 → Event3] ← 一个线程顺序处理
AsyncPoster (并行):
线程池 Thread-1: [Event1]
线程池 Thread-2: [Event2] ← 每个事件独占一个线程
线程池 Thread-3: [Event3]
4.5 PendingPost 对象池
PendingPost 是 Poster 体系中传递事件数据的载体。EventBus 为它实现了一个链表式无锁对象池:
final class PendingPost {
// 对象池:用单链表实现的LIFO池
private final static List<PendingPost> pendingPostPool = new ArrayList<>();
Object event;
Subscription subscription;
PendingPost next; // 队列中的下一个节点
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
// 从池中取出一个并复用
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
// 池子空了,创建新的
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// 控制池大小,防止内存膨胀
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
这是一个高频调用路径上的重要优化。每次 post 事件分发到 Poster 时都需要一个 PendingPost 对象,如果每次都 new,在高频事件场景下会产生大量临时对象,给 GC 带来压力。
4.6 最终调用:反射 invoke
无论经过了怎样的线程切换和队列分发,事件最终都会到达 invokeSubscriber():
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 通过反射调用订阅者的方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
是的,最终的方法调用依然是通过 Method.invoke() 反射完成的。编译时索引只优化了"发现阶段"(register 时找到方法),投递阶段仍然需要反射来调用。
五、 粘性事件的源码级实现
5.1 发布粘性事件
public void postSticky(Object event) {
synchronized (stickyEvents) {
// 步骤 1:将事件存入粘性缓存(覆盖同类型的旧事件)
stickyEvents.put(event.getClass(), event);
}
// 步骤 2:像普通事件一样分发给当前已注册的订阅者
// 注意:先 put 再 post,确保如果订阅者在回调中调用 removeStickyEvent,
// 它能拿到正确的粘性事件
post(event);
}
stickyEvents 是一个 ConcurrentHashMap<Class<?>, Object>,Key 是事件的 Class,Value 是该类型最新的事件实例。
5.2 注册时回放粘性事件
回看 subscribe() 方法的最后一段:
if (subscriberMethod.sticky) {
if (eventInheritance) {
// 遍历所有已缓存的粘性事件
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 检查:缓存中的事件类型,是否"是" 订阅方法期望的事件类型(或其子类)?
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
流程总结:
register(subscriber):
→ 发现方法 A: @Subscribe(sticky=true) fun onTheme(ThemeEvent)
→ subscribe(subscriber, A):
→ 建立正向/反向索引(和普通事件一样)
→ 检查 stickyEvents 中是否有 ThemeEvent(或其子类)
→ 如果有 → 立即调用 checkPostStickyEventToSubscription()
→ 等价于对这个新订阅者执行一次 postToSubscription()
六、 高性能设计总结
通过源码分析,我们可以归纳出 EventBus 在性能方面的多项精心设计:
6.1 缓存体系
| 缓存 | 位置 | 作用 |
|---|---|---|
METHOD_CACHE |
SubscriberMethodFinder |
同一个类只需扫描一次,后续注册直接命中缓存 |
eventTypesCache |
EventBus |
事件类型的父类/接口链只计算一次 |
FIND_STATE_POOL |
SubscriberMethodFinder |
FindState 对象复用,减少 GC |
PendingPost Pool |
PendingPost |
事件投递载体复用,减少高频分配 |
6.2 并发设计
| 机制 | 组件 | 作用 |
|---|---|---|
synchronized (this) |
subscribe() / unregister() |
保证注册和注销的原子性 |
CopyOnWriteArrayList |
subscriptionsByEventType 的 Value |
读多写少场景的无锁读取 |
ThreadLocal |
PostingThreadState |
post() 路径上完全无锁 |
volatile |
BackgroundPoster.executorRunning |
保证可见性 |
subscription.active |
Subscription |
安全失效机制,防竞态投递 |
6.3 类设计
classDiagram
class EventBus {
-Map~Class, CopyOnWriteArrayList~Subscription~~ subscriptionsByEventType
-Map~Object, List~Class~~ typesBySubscriber
-Map~Class, Object~ stickyEvents
-SubscriberMethodFinder subscriberMethodFinder
-Poster mainThreadPoster
-Poster backgroundPoster
-Poster asyncPoster
+register(Object subscriber)
+unregister(Object subscriber)
+post(Object event)
+postSticky(Object event)
}
class SubscriberMethodFinder {
-ConcurrentHashMap METHOD_CACHE
-List~SubscriberInfoIndex~ subscriberInfoIndexes
+findSubscriberMethods(Class) List~SubscriberMethod~
-findUsingInfo(Class) List~SubscriberMethod~
-findUsingReflection(Class) List~SubscriberMethod~
}
class Subscription {
+Object subscriber
+SubscriberMethod subscriberMethod
+boolean active
}
class SubscriberMethod {
+Method method
+Class eventType
+ThreadMode threadMode
+int priority
+boolean sticky
}
class Poster {
<<interface>>
+enqueue(Subscription, Object)
}
class HandlerPoster {
-PendingPostQueue queue
+handleMessage(Message)
}
class BackgroundPoster {
-PendingPostQueue queue
-volatile boolean executorRunning
+run()
}
class AsyncPoster {
-PendingPostQueue queue
+run()
}
class PendingPost {
+Object event
+Subscription subscription
+PendingPost next
+obtainPendingPost()$
+releasePendingPost()$
}
EventBus --> SubscriberMethodFinder : 使用
EventBus --> Subscription : 管理
Subscription --> SubscriberMethod : 包含
EventBus --> Poster : 持有
Poster <|.. HandlerPoster
Poster <|.. BackgroundPoster
Poster <|.. AsyncPoster
HandlerPoster --> PendingPost : 使用
BackgroundPoster --> PendingPost : 使用
AsyncPoster --> PendingPost : 使用
七、 完整生命周期:一个事件从诞生到被处理
让我们用一个完整的时序图来总结 EventBus 的核心流程:
编译时 运行时
│ │
▼ │
┌─────────────────────────┐ │
│ @Subscribe 注解处理器 │ │
│ 生成 SubscriberInfoIndex │ │
└───────────┬─────────────┘ │
│ ▼
│ ┌──────────────────────────────┐
│ │ register(subscriber) │
│ │ ├→ findSubscriberMethods() │
▼ │ │ ├→ 查 METHOD_CACHE │
索引数据 ───────────────────→│ │ ├→ 查 索引 / 反射 │
│ │ └→ 写入 METHOD_CACHE │
│ └→ subscribe(method) │
│ ├→ 写 subscriptionsByEventType │
│ ├→ 写 typesBySubscriber│
│ └→ 粘性事件回放 │
└──────────────────────────────┘
┌──────────────────────────────┐
│ post(event) │
│ ├→ ThreadLocal.get() │
│ │ → PostingThreadState │
│ ├→ 入队 eventQueue │
│ ├→ postSingleEvent() │
│ │ ├→ lookupAllEventTypes()│
│ │ └→ postSingleEventForEventType() │
│ │ └→ 遍历 subscriptions │
│ │ └→ postToSubscription() │
│ │ ├→ POSTING: invoke │
│ │ ├→ MAIN: Handler │
│ │ ├→ BG: BackgroundPoster │
│ │ └→ ASYNC: AsyncPoster │
│ └→ invokeSubscriber() │
│ └→ method.invoke() │
└──────────────────────────────┘
八、 总结
通过本文的源码级分析,我们可以看到 EventBus 虽然表面上只是一个"注册-发布-接收"的简单框架,但其内部设计中蕴含了大量值得学习的工程实践:
| 设计目标 | 源码中的解决方案 |
|---|---|
| 方法发现性能 | 编译时注解处理器 + METHOD_CACHE + FindState 对象池 |
| 事件分发无锁 | ThreadLocal(PostingThreadState) |
| 读多写少的并发安全 | CopyOnWriteArrayList(订阅者列表) |
| 高频分配的 GC 压力 | PendingPost 对象池 + FindState 对象池 |
| 注销时的竞态安全 | Subscription.active 标志位 |
| 灵活的线程切换 | Poster 接口 + 三种实现(Handler / BackgroundPoster / AsyncPoster) |
| 事件类型的多态匹配 | lookupAllEventTypes 递归遍历 + eventTypesCache 缓存 |
EventBus 的源码规模不大(核心类不到 10 个,总代码量约 2000 行),但它涵盖了反射、注解处理器、ThreadLocal、CopyOnWriteArrayList、对象池、Handler 机制等多个 Java/Android 核心知识点。阅读它的源码,既是学习设计模式的最佳实践,也是深入理解 Android 线程模型的绝佳入口。