RxJava 线程调度机制与背压原理深剖
在上一篇文章中,我们探究了 RxJava 响应式流的基础骨架和操作符的“洋葱模型”。但在那个模型中,从数据的生产、加工到消费,全部发生在调用 subscribe() 的同一个线程中。
然而,Android 开发的黄金法则是:UI 线程不执行耗时操作,子线程不更新 UI。RxJava 之所以能在 Android 开发中一骑绝尘,绝不仅仅因为其优雅的链式调用,更在于它极其简单的线程切换能力。
只需两行代码,就能让复杂的异步任务在不同线程间穿梭自如:
Observable.create(...)
.subscribeOn(Schedulers.io()) // 指定生产数据在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定消费数据在主线程
.subscribe(observer);
这两行代码背后的原理是什么?为什么业界常说“多次调用 subscribeOn 只有第一次有效,而多次调用 observeOn 每次都有效”?本文将从源码层面为你彻底揭开 RxJava 线程调度的底牌,并探讨响应式流中不可回避的“背压(Backpressure)”问题。
线程调度的核心隐喻:跨国物流网络
我们可以将 RxJava 的线程调度比作一个跨国物流网络:
subscribeOn决定了商品在哪个国家的**工厂(线程)**生产。一旦工厂确定了,不管中间经过多少调度员,商品的产地已经不可更改。observeOn则决定了商品在运输途中,换乘哪种交通工具(线程)。在到达消费者手中之前,商品可以多次换乘(比如先走海运,再换卡车,最后换电动车派送)。
深度剖析 subscribeOn
subscribeOn 的作用是指定上游 Observable 产生数据和执行操作所在的线程。
源码追踪
当你调用 .subscribeOn(Schedulers.io()) 时,RxJava 会将当前的 Observable 包装进一个 ObservableSubscribeOn 中。我们直接看它的 subscribeActual 方法:
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 1. 创建包装的 Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 2. 触发下游的 onSubscribe 回调
observer.onSubscribe(parent);
// 3. 核心:不直接调用 source.subscribe(),而是把它扔到指定的线程去执行!
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
来看看这个 SubscribeTask 是什么:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 在新线程中,执行上游的订阅逻辑
source.subscribe(parent);
}
}
为什么多次 subscribeOn 只有“第一次”有效?
这个经典的架构陷阱,其实是一个伪命题。准确地说,是位置最靠上的 subscribeOn 决定了源头执行的线程。
回想一下上一篇文章中的“自下而上的订阅逻辑”。假设我们写了这样的代码:
Observable.create(...) // O1
.subscribeOn(Schedulers.io()) // O2
.subscribeOn(Schedulers.computation()) // O3
.subscribe(observer);
执行流程:
subscribe动作自下而上传递。先到达O3,O3发现需要切换到 Computation 线程,于是它开启一个 Computation 线程,在这个新线程里执行上游的O2.subscribe。O2的subscribe动作此时运行在 Computation 线程。接着,O2发现需要切换到 IO 线程,于是它又开启一个 IO 线程,在这个新线程里执行上游的O1.subscribe。O1.subscribe(也就是我们写的create里面的发射逻辑)最终运行在哪里?毫无疑问,运行在最后一次切换的 IO 线程里!
虽然 Computation 线程确实被创建并执行了极短的一瞬间(用于向上订阅),但真正干活的业务逻辑(上游的数据生产)是被最顶层的 subscribeOn 拦截并分配到 IO 线程的。
深度剖析 observeOn
observeOn 的作用是指定其下游操作符和 Observer 接收数据所在的线程。
源码追踪
调用 observeOn 会生成 ObservableObserveOn。它的原理比 subscribeOn 复杂得多,因为数据是源源不断流下来的,不能简单地用 Runnable 把每个数据丢到线程池里,那样会造成线程资源的极度浪费和乱序。
RxJava 使用了无锁并发队列(SpscLinkedArrayQueue) + 排水模型(Drain Loop)。
看 ObservableObserveOn 内部的 ObserveOnObserver:
@Override
public void onNext(T t) {
if (done) return;
// 1. 上游发来的数据,不直接传给下游,而是先塞进队列里
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 2. 调度执行
schedule();
}
void schedule() {
// 确保同一时刻只有一个 worker 在干活
if (getAndIncrement() == 0) {
// 扔到指定的线程去执行 run 方法
worker.schedule(this);
}
}
当指定的线程(比如 Android 主线程)开始执行时,会调用 run,进而执行著名的 drainNormal():
void drainNormal() {
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
// ...省略部分代码...
for (;;) {
// 在目标线程里,通过死循环不断从队列里取数据
T v = q.poll();
boolean empty = v == null;
if (empty) {
break;
}
// 交给下游去消费
a.onNext(v);
}
}
为什么 observeOn 多次调用每次都有效?
理解了 drainNormal,这个问题迎刃而解。
数据流自上而下传递。每经过一个 observeOn,当前线程的数据就会被塞进这个 observeOn 的队列中,然后触发对应 Scheduler 的线程去队列里取数据,并交给下一层。因此,每调用一次 observeOn,数据流就会实打实地跨越一次线程边界,切换一次“交通工具”。
流量灾难:背压(Backpressure)机制的诞生
当我们享受 RxJava 带来的丝滑异步体验时,可能会遇到一个致命问题。
假设上游在 IO 线程读取数据库,每秒发出 10000 个事件;而下游在 UI 线程渲染界面,每秒只能处理 10 个事件。
根据前面分析的 observeOn 原理,上游发出的数据会全部涌入 SpscLinkedArrayQueue 缓冲队列中。
结果是什么? 队列无限膨胀,最终导致 OOM (Out Of Memory)。 这就好比水管的一端拼命注水,另一端出水极慢,最终水管一定会爆裂。
这就是在 RxJava 1.x 时代困扰无数开发者的背压问题(Backpressure):即下游处理能力跟不上上游发送能力时,由于异步导致的数据积压问题。
RxJava 2 的变革:Flowable
为了从根本上解决背压问题,RxJava 2 进行了大刀阔斧的重构,将没有背压支持的 Observable 和拥有背压支持的类彻底剥离,引入了 Flowable。
在 Flowable 中,观察者(Subscriber)不再是被动接收,而是拥有了请求权限(request)。
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 上游根据下游的请求量发数据
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER) // 必须指定背压策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 告诉上游:我先处理 10 个,你不要发多了
s.request(10);
}
@Override
public void onNext(Integer integer) {
// 消费数据
}
// ...
});
背压策略 (BackpressureStrategy)
如果在构建 Flowable 时,上游就是无法控制生产速度(比如用户的疯狂点击、传感器的硬件回调),该怎么办?这就需要 BackpressureStrategy 兜底:
ERROR:如果积压超过了默认的水位线(通常是 128),直接抛出MissingBackpressureException,快速失败。DROP:既然下游处理不过来,那上游新产生的数据我直接丢弃掉,保全大局。LATEST:丢弃中间的数据,只保留最新的一条。这在 UI 刷新中非常有用,用户只关心最新状态。BUFFER:像 RxJava 1.x 一样无限制缓存,不抛出异常。如果上游真的无穷无尽,还是会 OOM。
总结
subscribeOn的本质:包装向上的订阅动作(source.subscribe),将其放置在新线程中执行。由最上层的调用决定源头线程。observeOn的本质:内部持有一个并发队列,将上游发来的数据放入队列,并唤醒目标线程循环从队列中取数据,发送给下游。每一次调用都会真实地完成一次线程跃迁。- 背压问题:异步上下游速率不匹配导致的积压。
Observable不支持背压,适合少量事件,如单一网络请求;Flowable通过Subscription.request(n)实现响应式拉取支持背压,辅以丢弃、只保留最新等策略,适合海量数据流的控制。
彻底理解了 RxJava 的线程模型与背压机制,你便能游刃有余地穿梭于 Android 错综复杂的线程迷宫之中,写出既丝滑又不会发生 OOM 的工业级应用代码。