EventBus Source Code Architecture and Kernel Principles
In the previous article, we discussed the core concepts and usage of EventBus. We know that behind its simple register, post, and unregister APIs lies an elegant architecture capable of high-performance event routing and thread scheduling.
But how exactly does it work? How does it map events to subscribers without creating a bottleneck? How does it achieve lock-free posting using ThreadLocal? And how are the thread modes (MAIN, ASYNC) implemented internally without reinventing the wheel?
In this article, we will dissect the source code of EventBus 3.x, analyzing its registration data structures, its dispatch pipeline, and its thread-switching mechanics.
I. Core Data Structures: The Two Maps
At the heart of EventBus are two internal registries (Maps) that maintain the relationships between Subscriptions, Event Types, and Subscriber objects. Understanding these is the key to understanding the entire framework.
public class EventBus {
// 1. Event Routing Table: EventType -> List of Subscriptions
// Used during post(Event) to find all targets that need this event.
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 2. Unregistration Table: Subscriber Object -> List of EventTypes it listens to
// Used during unregister(Subscriber) to quickly clean up subscriptionsByEventType.
private final Map<Object, List<Class<?>>> typesBySubscriber;
}
A Subscription is a wrapper object that pairs the subscriber instance with the target method:
final class Subscription {
final Object subscriber; // e.g., the specific MainActivity instance
final SubscriberMethod subscriberMethod; // e.g., onMessage(MessageEvent)
volatile boolean active; // Failsafe flag to prevent race conditions during unregister
}
public class SubscriberMethod {
final Method method; // The java.lang.reflect.Method object
final ThreadMode threadMode; // MAIN, ASYNC, etc.
final Class<?> eventType; // MessageEvent.class
final int priority;
final boolean sticky;
}
Design Insight:
Why is subscriptionsByEventType's value a CopyOnWriteArrayList?
Event distribution (post) happens constantly and concurrently across multiple threads (reads). Registration and unregistration happen less frequently (writes). CopyOnWriteArrayList is the perfect concurrent data structure for "Read-Heavy, Write-Rare" scenarios, providing lock-free reads while ensuring thread safety during writes.
II. The Registration Phase: Building the Index
When EventBus.getDefault().register(this) is called, the framework must discover all methods annotated with @Subscribe and populate the two core Maps.
2.1 Discovering Methods
// EventBus.java
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 1. Find all @Subscribe methods for this class
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 2. Thread-safe subscription phase
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
The heavy lifting is delegated to SubscriberMethodFinder. It checks an internal METHOD_CACHE first. If it's a cache miss, it parses the class.
// SubscriberMethodFinder.java
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// Check Cache
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// Determine strategy: APT Index or Reflection
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass); // APT Index route
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation");
} else {
// Populate cache
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
Optimization via Object Pooling:
Whether using reflection or the APT index, the finder uses a FindState object to track the class hierarchy and method validation. Because FindState creation is frequent during heavy registration phases, EventBus utilizes an internal object pool (FIND_STATE_POOL) array to reuse these instances, drastically reducing GC pressure.
2.2 Constructing the Maps
Once the methods are found, subscribe() maps the relationships:
// EventBus.java
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 1. Update subscriptionsByEventType (Forward Index)
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber already registered to event " + eventType);
}
}
// Insert based on priority
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 2. Update typesBySubscriber (Reverse Index)
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 3. Handle Sticky Events immediately if applicable (Covered later)
// ...
}
III. The Posting Phase: Delivering the Event
The post() operation is designed for maximum throughput. A critical architectural choice here is the use of ThreadLocal to achieve lock-free event queueing.
3.1 ThreadLocal Posting State
// EventBus.java
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
// 1. Get the state strictly bound to the CURRENT executing thread
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
// 2. Enqueue the event
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 3. Drain the queue
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
Why ThreadLocal?
Without ThreadLocal, if multiple threads called post() concurrently, they would all compete for a shared, locked queue. By giving every thread its own localized PostingThreadState and eventQueue, multiple threads can post and process their own events simultaneously in a completely lock-free manner, eliminating thread contention overhead.
3.2 Polymorphism Resolution
When you post a UserEvent, EventBus also routes it to methods listening for BaseEvent or Object.
// EventBus.java
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) { // Default is true
// Traverse the class hierarchy to find all superclasses and interfaces
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// ... handling NoSubscriberEvent
}
lookupAllEventTypes() employs an internal caching mechanism (eventTypesCache) because walking the class inheritance tree is computationally expensive to do on every post.
3.3 Dispatching to the Appropriate Thread
postSingleEventForEventType() retrieves the CopyOnWriteArrayList<Subscription> from our mapping and iterates through it, passing the event to postToSubscription().
// EventBus.java
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// Direct invocation on the current thread
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
// Hop to the main thread
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// Hop to the background thread
backgroundPoster.enqueue(subscription, event);
} else {
// Already in background, invoke directly
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// Always dispatch to an isolated thread pool
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
IV. Thread Switching Architecture: The Posters
How does EventBus actually switch threads? It relies on three specific components: HandlerPoster, BackgroundPoster, and AsyncPoster.
All of them queue up operations using a highly optimized object pool item called PendingPost.
4.1 PendingPost and the Object Pool
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Keep the pool to a max size of 10000
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
This is another anti-GC stutter optimization. Since high-frequency posting to other threads would constantly generate GC garbage, EventBus recycles the wrapper objects.
4.2 HandlerPoster (To Main Thread)
HandlerPoster extends Android's native Handler, bound to the Main Looper.
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private boolean handlerActive;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// Send an empty message to trigger handleMessage on the Main Thread
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
// Runs on Main Thread
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
// To avoid blocking the UI thread for too long,
// it processes messages for a maximum of 10ms at a time.
if (SystemClock.uptimeMillis() - started >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
eventBus.invokeSubscriber(pendingPost);
}
} finally {
handlerActive = rescheduled;
}
}
}
Notice the Time-Slicing Mechanism: If HandlerPoster has 50 events in the queue, executing them all back-to-back might block the UI thread for 100ms, causing a frame drop. It enforces a strict timeout (maxMillisInsideHandleMessage = 10ms). If time expires, it reschedules itself for the next frame via sendMessage, allowing Android to render the UI in between.
4.3 BackgroundPoster and AsyncPoster (To Worker Threads)
BackgroundPoster: ImplementsRunnable. It uses avolatile boolean executorRunningflag. If it's already running, it just enqueues the event. Only one background task iterates over the queue synchronously, preventing thread explosion for lightweight DB/Disk operations.AsyncPoster: ImplementsRunnable. It doesn't check state; everyenqueue()immediately submits a new task to EventBus's sharedExecutorService(CachedThreadPool), ensuring concurrent heavy computations.
V. Sticky Events Implementation
The EventBus class maintains a thread-safe map for sticky events:
// Map: EventType -> Event Instance
private final Map<Class<?>, Object> stickyEvents;
When you call postSticky(event):
public void postSticky(Object event) {
synchronized (stickyEvents) {
// Cache the latest instance of this exact event class
stickyEvents.put(event.getClass(), event);
}
// Then deliver it normally to current subscribers
post(event);
}
The magic happens during registration. Let's look back at subscribe():
// EventBus.java -> subscribe()
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Iterate through all cached sticky events
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// Is the cached event assignable to the method's requested type?
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// Immediately deliver it to the newly registered subscriber
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
When a component calls register(this), EventBus detects if any @Subscribe methods are marked sticky=true. If so, it immediately queries the stickyEvents map and fires postToSubscription explicitly for that single subscriber, effectively "replaying" the history.
VI. High-Performance Architecture Summary
EventBus may look like a simple observer pattern wrapper, but its internals reveal a meticulous focus on Android performance metrics (Memory, GC, Frame Rates).
| Design Goal | EventBus Source Code Solution |
|---|---|
| Method Discovery Overhead | APT Index generation + METHOD_CACHE |
| Lock-Free Event Dispatch | ThreadLocal state management (PostingThreadState) |
| Concurrency Read vs Write | CopyOnWriteArrayList for the routing table |
| Mitigating GC Thrashing | Custom Object Pools (PendingPost and FindState) |
| Main Thread Blocking | 10ms Time-slicing within HandlerPoster.handleMessage |
| Race Conditions on Unregister | Safe Subscription.active volatility flags |
EventBus's core is less than 2000 lines of code across roughly 10 classes. Studying it provides deep, practical insights into Java concurrency primitives, ThreadLocal patterns, and memory-conscious Android development.