RxJava Core Principles: Reactive Streams and Operator Source Code Parsing
In Android development, asynchronous operations and event handling are ubiquitous. From traditional Callback interfaces to Handler/Looper, and then to AsyncTask, developers have constantly sought more elegant ways to handle complex asynchronous data flows. With its powerful reactive programming model and rich operators, RxJava has become a sharp tool for handling asynchronous tasks.
However, merely knowing how to invoke map and flatMap is not enough to write robust code. RxJava has a steep learning curve, and its underlying implementation often feels magical to beginners. This article will peel back the API exterior of RxJava and delve deep into its underlying operation mechanisms as a "reactive stream."
What is RxJava?
Summarized in one sentence: RxJava is an asynchronous operation library based on event streams.
It abstracts everything into a "Stream" of data, utilizing an extended observer pattern to conduct the production and consumption of events.
Why Do We Need RxJava?
In complex business scenarios, we frequently encounter situations such as:
- After Request A succeeds, use A's result to initiate Request B.
- Simultaneously initiate Requests C and D; wait for both to return, then merge the results to refresh the UI.
- During the processing of these requests, exceptions can happen at any time, requiring unified error handling.
If you use traditional Callbacks, the code rapidly descends into "Callback Hell", forming a horizontally expanding nested structure that is difficult to read and maintain. Through Fluent APIs and extremely rich operators, RxJava flattens complex asynchronous logic into linear code structures.
Core Metaphor: The Industrial Assembly Line
The most fitting metaphor for understanding RxJava is an industrial production assembly line:
- Observable: The starting point of the assembly line, responsible for continuously producing raw materials (emitting events).
- Observer: The endpoint of the assembly line, responsible for receiving the packaged final product and consuming it.
- Operator: Individual processing stations in the middle of the assembly line. For instance,
mapis responsible for painting raw materials, whilefilteris responsible for discarding defective semi-finished goods. - subscribe: The switch to start the assembly line. Before the switch is pressed, the assembly line is static; once pressed, raw materials flow from the origin down the processing stations all the way to the end.
Core Skeleton: Extended Observer Pattern
The core skeleton of RxJava is the observer pattern, but it introduces more concepts than the traditional observer pattern:
onNext(T): Emits normal data.onError(Throwable): Emits an error signal. Once invoked, the data stream terminates.onComplete(): Emits a completion signal, normally concluding the data stream.onSubscribe(Disposable): A callback when a subscription relationship is established. You can cut off the assembly line throughDisposable.
Let's start with basic creation and subscription to see what happens underneath:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) { }
@Override
public void onComplete() { }
});
Underlying Mechanism of create
When you call Observable.create, it actually returns an ObservableCreate object, which saves the ObservableOnSubscribe you passed in (i.e., the logic for producing data).
// RxJava Source Code Snippet
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
At this point, data has not started emitting; this is known as a Cold Stream.
Execution Flow of subscribe
When subscribe(Observer) is invoked, the assembly line starts. We dive into the source code of Observable:
public final void subscribe(Observer<? super T> observer) {
// 1. Wrap the Observer
observer = RxJavaPlugins.onSubscribe(this, observer);
// 2. Core: Call subscribeActual implemented by each subclass
subscribeActual(observer);
}
In subscribeActual of ObservableCreate:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1. Create the emitter, passing in the target Observer
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2. Trigger onSubscribe callback
observer.onSubscribe(parent);
try {
// 3. Execute developer-defined emission logic
source.subscribe(parent);
} catch (Throwable ex) {
parent.onError(ex);
}
}
What happened?
CreateEmitter plays the role of a "middleman". The emitter.onNext("Hello") called by the developer in the custom logic actually invokes CreateEmitter.onNext. CreateEmitter internally performs state checks (whether unsubscribed, whether exception thrown) and then passes the data unaltered to the real Observer.onNext("Hello").
The Magic of Operators: The Ultimate Application of the Decorator Pattern
The true power of RxJava lies in operators. Why can map change data types in the middle? Why can a long chain of calls process data layer by layer?
Its core essence is: Decorator Pattern.
Source Code Parsing of the map Operator
Suppose we have this code snippet:
Observable.create(...) // Returns ObservableCreate (denoted as O1)
.map(s -> s.length()) // Returns ObservableMap (denoted as O2)
.subscribe(observer); // Passing in the real Observer (denoted as Obs1)
When we call map:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
// Wraps the current Observable (O1) and the transformation function mapper into a new ObservableMap
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Here's the critical point: when we invoke subscribe(Obs1) on the finally returned O2, the flow is as follows:
-
Bottom-Up Subscription Construction: Calling
O2.subscribe(Obs1)->O2isObservableMap, which implementssubscribeActualinternally:@Override public void subscribeActual(Observer<? super U> t) { // O2 takes the downstream Obs1 and wraps it into a MapObserver (denoted as Obs2) // Then it passes Obs2 to the upstream O1 to subscribe source.subscribe(new MapObserver<T, U>(t, function)); }Next,
O1.subscribe(Obs2)propagates upstream, hunting upwards until the initialObservableCreate. -
Top-Down Data Emission:
ObservableCreatebegins to emit data, invokingObs2.onNext(data). How isonNextimplemented inObs2(i.e.,MapObserver)?@Override public void onNext(T t) { if (done) return; U v; try { // 1. Apply the mapper function we passed in, e.g., s -> s.length() v = mapper.apply(t); } catch (Throwable ex) { fail(ex); return; } // 2. Pass the transformed result to the further downstream Observer (Obs1) downstream.onNext(v); }
Hitting the Essence with Diagrams
Through the source code parsing above, we can draw the following execution flow diagram:
sequenceDiagram
participant App as Developer (calls subscribe)
participant O2 as ObservableMap
participant O1 as ObservableCreate
participant Obs2 as MapObserver
participant Obs1 as Downstream Real Observer
Note over App, O1: 1. Subscription Process (Bottom-Up)
App->>O2: subscribe(Obs1)
O2->>O2: Wraps Obs1 yielding Obs2
O2->>O1: source.subscribe(Obs2)
O1->>O1: Starts emission logic
Note over O1, App: 2. Emission Process (Top-Down)
O1->>Obs2: onNext("Hello")
Obs2->>Obs2: length = mapper.apply("Hello")
Obs2->>Obs1: downstream.onNext(5)
Summary:
Every time an operator is invoked, RxJava will generate a new Observable wrapping the old Observable.
When subscribe is invoked, the subscription action flows upstream from the very end of the chain, and each layer will generate a new Observer wrapping the downstream Observer.
When data starts emitting, the data stream flows downstream from the very top, undergoing processing at each layer's Observer, finally reaching the developer-defined Observer.
This is the truth behind RxJava's so-called "onion model" or "Russian nesting doll" mechanism.
Conclusion
RxJava is not merely an asynchronous library; it provides a complete functional reactive programming paradigm.
- What: An asynchronous processing framework based on event streams.
- How: It utilizes a highly decoupled Observer Pattern as the skeleton, combined with the Decorator Pattern to achieve infinite nesting of operators and data processing. Subscription behavior wraps layer by layer from bottom to top, while data flow processes step by step from top to bottom.
- Why this way: This design allows complex asynchronous operations to be disassembled into single-responsibility "processing stations" (operators). Whether it's data transformation (
map), filtering (filter), or merging (zip), they can be seamlessly concatenated through assembly line nodes, completely obliterating the nightmare of nested callbacks.
After grasping this onion-peeling architecture of RxJava, when we encounter issues like operator composition failures or dropped events, we can clearly outline the flow of data in our minds. However, the current assembly line is merely operating in "the same workshop" (single thread). In actual development, we often need to place network requests in the background workshop, and present results in the foreground workshop.
This leads to another extremely powerful feature of RxJava—Schedulers. In the next article, we will deeply dissect the underlying mysteries of thread switching in RxJava.