RxJava Thread Scheduling Mechanism and Backpressure Principles Deep Dive
In the previous article, we explored the foundational skeleton of RxJava reactive streams and the "onion model" of operators. But in that model, everything from data production and processing to consumption occurred on the exact same thread where subscribe() was invoked.
However, the golden rule of Android development is: The UI thread must not execute time-consuming operations, and worker threads must not update the UI. The reason RxJava leads the pack in Android development is not merely due to its elegant chained invocations, but more so its incredibly simple thread-switching capabilities.
With just two lines of code, complex asynchronous tasks can shuttle effortlessly between different threads:
Observable.create(...)
.subscribeOn(Schedulers.io()) // Specify that data production happens on the IO thread
.observeOn(AndroidSchedulers.mainThread()) // Specify that data consumption happens on the Main thread
.subscribe(observer);
What are the principles behind these two lines of code? Why is it a common industry saying that "calling subscribeOn multiple times only takes effect the first time, whereas calling observeOn takes effect every time"? This article will completely uncover the hidden cards of RxJava's thread scheduling from the source code level, and discuss the unavoidable "Backpressure" problem in reactive streams.
Core Metaphor of Thread Scheduling: Multinational Logistics Network
We can compare RxJava's thread scheduling to a multinational logistics network:
subscribeOndetermines in which country's factory (thread) the product is manufactured. Once the factory is confirmed, regardless of how many dispatchers are involved in between, the product's origin cannot be altered.observeOndetermines which type of transportation vehicle (thread) the product transfers to during transit. Before reaching the consumer, the product can transfer multiple times (e.g., shipping by sea first, transferring to a truck, and finally delivered by an electric scooter).
Deep Dive into subscribeOn
The role of subscribeOn is to specify the thread where the upstream Observable produces data and executes operations.
Source Code Tracking
When you invoke .subscribeOn(Schedulers.io()), RxJava wraps the current Observable into an ObservableSubscribeOn. Let's directly examine its subscribeActual method:
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 1. Create the wrapping Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 2. Trigger the downstream onSubscribe callback
observer.onSubscribe(parent);
// 3. Core: Do not call source.subscribe() directly, but toss it to the specified thread to execute!
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Let's see what this SubscribeTask is:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// Execute the upstream subscription logic in a new thread
source.subscribe(parent);
}
}
Why does multiple subscribeOn only take effect the "first" time?
This classic concurrency trap is actually a pseudo-proposition. To be precise, the subscribeOn that is highest in position determines the thread where the source executes.
Recall the "bottom-up subscription logic" from the previous article. Suppose we write code like this:
Observable.create(...) // O1
.subscribeOn(Schedulers.io()) // O2
.subscribeOn(Schedulers.computation()) // O3
.subscribe(observer);
Execution Flow:
- The
subscribeaction propagates bottom-up. It reachesO3first.O3discovers it needs to switch to the Computation thread, so it spawns a Computation thread and executes the upstreamO2.subscribewithin this new thread. - The
O2subscribeaction is now running on the Computation thread. Next,O2discovers it needs to switch to the IO thread, so it spawns an IO thread and executes the upstreamO1.subscribewithin this new IO thread. - Where does
O1.subscribe(which is the emission logic inside our writtencreate) ultimately run? Undoubtedly, it runs in the IO thread of the final switch!
Although the Computation thread was indeed created and executed for an extremely brief moment (used for subscribing upwards), the actual working business logic (upstream data production) was intercepted by the topmost subscribeOn and dispatched to the IO thread.
Deep Dive into observeOn
The role of observeOn is to specify the thread where its downstream operators and Observer receive data.
Source Code Tracking
Calling observeOn generates ObservableObserveOn. Its principles are far more complex than subscribeOn because data flows continuously downwards; you can't simply use a Runnable to toss every piece of data into the thread pool, as that would cause extreme thread resource waste and out-of-order execution.
RxJava utilizes a Lock-Free Concurrent Queue (SpscLinkedArrayQueue) + Drain Loop Model.
Look inside ObservableObserveOn's internal ObserveOnObserver:
@Override
public void onNext(T t) {
if (done) return;
// 1. Data emitted from upstream is not passed directly downstream, but first stuffed into a queue
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 2. Schedule execution
schedule();
}
void schedule() {
// Ensure only one worker is executing at any same moment
if (getAndIncrement() == 0) {
// Toss it to the specified thread to execute the run method
worker.schedule(this);
}
}
When the specified thread (e.g., Android Main Thread) starts executing, it invokes run, which subsequently executes the famous drainNormal():
void drainNormal() {
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
// ... Omitted part of code ...
for (;;) {
// Inside the target thread, continuously poll data from the queue via an infinite loop
T v = q.poll();
boolean empty = v == null;
if (empty) {
break;
}
// Hand it to downstream for consumption
a.onNext(v);
}
}
Why does observeOn take effect every time it is called?
Understanding drainNormal, this question is easily solved.
Data flows from top to bottom. Every time it passes through an observeOn, the data from the current thread is stuffed into the queue of this observeOn, and then the thread of the corresponding Scheduler is awakened to poll data from the queue and send it to the next layer. Therefore, every time observeOn is called, the data stream practically crosses a thread boundary, switching the "transportation vehicle" once.
Traffic Disaster: The Birth of the Backpressure Mechanism
While we enjoy the silky-smooth asynchronous experience brought by RxJava, we might encounter a fatal flaw.
Suppose the upstream reads a database in the IO thread, emitting 10,000 events per second; while the downstream renders the UI on the Main thread, capable of processing only 10 events per second.
According to the observeOn principles analyzed above, all data emitted by the upstream will surge into the SpscLinkedArrayQueue buffer.
What is the result? The queue expands infinitely, ultimately causing an OOM (Out Of Memory). This is exactly like forcefully pumping water into one end of a pipe while the other end trickles slowly; the pipe is bound to burst eventually.
This is the Backpressure problem that plagued countless developers in the RxJava 1.x era: the problem of data backlog caused by asynchrony when the downstream's processing capacity cannot keep up with the upstream's emission capacity.
RxJava 2's Revolution: Flowable
To fundamentally resolve the backpressure issue, RxJava 2 underwent drastic refactoring, completely severing Observable (which lacks backpressure support) from classes possessing backpressure support, thereby introducing Flowable.
In Flowable, the Observer (Subscriber) is no longer passively receiving; instead, it is granted request authority (request).
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// Upstream emits data based on downstream's requested volume
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER) // Backpressure strategy must be specified
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// Tells the upstream: I will process 10 first, do not send more
s.request(10);
}
@Override
public void onNext(Integer integer) {
// Consume data
}
// ...
});
Backpressure Strategy (BackpressureStrategy)
When constructing a Flowable, what if the upstream fundamentally cannot control its production speed (e.g., frantic user clicks, hardware callbacks from sensors)? This requires BackpressureStrategy as a fallback:
ERROR: If the backlog exceeds the default watermark (usually 128), it directly throws aMissingBackpressureException, failing fast.DROP: Since the downstream can't process it in time, newly generated data from the upstream is simply discarded to preserve the bigger picture.LATEST: Discards intermediate data, keeping only the most recent one. This is extremely useful in UI refreshes, where users only care about the latest state.BUFFER: Unrestricted caching without throwing exceptions, just like RxJava 1.x. If the upstream is truly endless, it will still OOM.
Conclusion
- The essence of
subscribeOn: Wraps the upward subscription action (source.subscribe), placing it in a new thread to execute. The source thread is determined by the topmost invocation. - The essence of
observeOn: Internally holds a concurrent queue, placing data received from upstream into the queue, and awakening the target thread to loop and poll data from the queue to send downstream. Every invocation genuinely completes a thread transition. - The Backpressure Problem: Backlog caused by mismatched asynchronous upstream/downstream rates.
Observabledoes not support backpressure, suitable for minor events like single network requests;Flowableachieves reactive pull supporting backpressure viaSubscription.request(n), supplemented by strategies like Drop or Latest, making it suitable for controlling massive data streams.
Thoroughly understanding RxJava's thread models and backpressure mechanisms will allow you to navigate effortlessly through Android's intricate thread mazes, writing industrial-grade application code that is both silky smooth and immune to OOMs.