Flow 响应式数据流深度解析
Flow 要解决的根本问题
上一篇文章我们深入剖析了协程的取消与异常处理机制,理解了 Job 状态机如何保证结构化并发的安全性。但协程解决的是"如何安全地执行一次异步任务",现实中更常见的问题是:如何处理随时间产生的多个值?
数据库查询的实时变更通知、用户输入框的击键事件序列、传感器的持续读数、分页加载的数据流——这些场景的共同特点是:数据不是一次性产生的,而是在时间轴上持续流淌。
传统的解法是 RxJava 的 Observable。它功能强大,但学习曲线极其陡峭:数百个操作符、线程调度器的 subscribeOn/observeOn 双轨模型、手动管理 CompositeDisposable……一条响应式链写错了调度器,就是隐蔽的线程安全 Bug,出了问题还难以调试。
Kotlin Flow 的核心设计目标是:在协程的结构化并发保障下,提供一套类型安全、可组合的异步数据流 API。它没有发明新的运行时——Flow 的每一个设计决策都深度集成到 suspend/Continuation 体系中。理解 Flow,就是理解协程生态的完整拼图。
Flow 的设计哲学:冷流的本质
Flow 接口与冷流语义
Flow 的核心接口极其简洁:
// kotlinx.coroutines.flow.Flow —— 核心接口
public interface Flow<out T> {
/**
* 接收流中的值。这是整个 Flow 体系唯一的终端触发点。
* 注意:collect 是 suspend 函数——流的消费必须在协程中进行。
*/
public suspend fun collect(collector: FlowCollector<T>)
}
// FlowCollector —— 值的接收者
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
两个接口,核心原语只有一个:collect。这个极简设计蕴含了整个冷流的哲学:
流在被收集之前,什么都不会发生。
调用 flow { emit(1) } 不会执行任何代码——它只是创建了一个持有"生产逻辑"的对象。只有收集者调用 collect 时,生产逻辑才会启动。每一个收集者都会触发一次独立的执行——如同每次泡茶都需要重新烧水,而不是从一个公共壶里分水。
与之对比的冰冷真相:RxJava 的 Observable 同样是冷流,但你必须手动管理 Disposable 的生命周期;而 Flow 的收集天然绑定在协程 Scope 里,Scope 取消时收集自动停止,永不泄漏。
SafeFlow:flow {} 构建器的内部实现
当你写下以下代码:
val myFlow: Flow<Int> = flow {
emit(1)
delay(100)
emit(2)
}
编译器和运行时实际上创建了一个 SafeFlow 实例:
// kotlinx.coroutines.flow 内部实现(简化)
private class SafeFlow<T>(
// 捕获用户定义的生产逻辑(整个 flow { } 块)
private val block: suspend FlowCollector<T>.() -> Unit
) : AbstractFlow<T>() {
// collect 调用时,才真正执行生产逻辑
override suspend fun collectSafely(collector: FlowCollector<T>) {
// 将 block 的 receiver 绑定为 collector
// emit() 的调用实际上是 collector.emit()
collector.block()
}
}
AbstractFlow 的 collect 方法在调用 collectSafely 之前,会将用户的 collector 包装成 SafeCollector:
// AbstractFlow.kt
public final override suspend fun collect(collector: FlowCollector<T>) {
// SafeCollector 是一个守卫层,强制执行两条不变式:
// 1. emit 必须在收集所在的 CoroutineContext 中调用(上下文一致性)
// 2. emit 不能并发调用(Sequential Emission 约束)
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
SafeCollector 的存在解释了为什么在 flow { } 块中使用 withContext 切换线程会抛出 IllegalStateException:
// ❌ 在 flow{ } 中切换上下文:违反上下文一致性约束
val badFlow = flow<Int> {
withContext(Dispatchers.IO) {
emit(1) // 💥 IllegalStateException: Flow invariant is violated
}
}
// ✅ 正确做法:用 flowOn 在流操作符层面切换上下文
val goodFlow = flow<Int> {
emit(1) // 在 flow{ } 块内 emit 都在同一 context
}.flowOn(Dispatchers.IO) // flowOn 负责切换上游的执行 context
SafeCollector 在每次 emit 时检查当前 CoroutineContext 是否与声明的一致,不一致则立即抛出。这不是运行时的心血来潮——这是强制执行 Flow 的顺序发射(Sequential Emission)契约。如果允许并发 emit,下游的收集者就必须处理并发安全,这与流的使用心智模型严重冲突。
Flow 与 Sequence 的同构关系
Flow 是异步的 Sequence。这不是比喻,而是结构上的同构:
| 对比维度 | Sequence<T> |
Flow<T> |
|---|---|---|
| 生产者 | SequenceScope.yield() |
FlowCollector.emit() |
| 消费触发 | iterator.next() 主动拉取 |
collect { } 订阅推送 |
| 执行特性 | 懒惰(Lazy)同步 | 懒惰(Lazy)异步 |
| 挂起支持 | ❌ 不支持 | ✅ emit 和 collect 均为 suspend |
| 结构化生命周期 | 无 | 绑定到 CoroutineScope |
Sequence 使用协程(SequenceScope 也是一种受限的 suspend 实现),但它的挂起只能暂停在 yield() 里;Flow 的生产者和消费者都可以在任意 suspend 点挂起,且它们可以在不同的协程(甚至不同的线程)上交替执行。这是 Flow 真正的威力所在。
Flow 操作符全景剖析
Flow 的所有中间操作符(map、filter、flatMapConcat 等)都遵循同一个实现模式:创建一个新的 Flow,在新 Flow 的 collect 实现中,收集上游 Flow 并对每个值做变换。
// map 操作符的简化实现——揭示所有中间操作符的共同模式
public fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R> = flow {
// collect 上游的 flow(this@ 是接收者,即上游 flow)
collect { value ->
// 对每个值应用变换,然后 emit 到下游
emit(transform(value))
}
}
这种模式的精妙之处:操作符链是纯惰性的——只有最终调用 collect 时,整条链才从下游向上游依次触发。没有中间缓存,没有预先计算——这与 Kotlin 的 Sequence 处理模式完全一致。
转换操作符:flatMap 家族的差异
// 三个 flatMap 变体的核心差异
val sourceFlow = flow { emit(1); delay(100); emit(2) }
// flatMapConcat:串行——等待内部 flow 完全结束,再处理下一个值
sourceFlow.flatMapConcat { value ->
flow {
emit("${value}a")
delay(200)
emit("${value}b")
}
}
// 输出顺序:1a → 1b → 2a → 2b(严格串行,内存安全)
// flatMapMerge:并行——同时收集所有内部 flow,结果交错
sourceFlow.flatMapMerge { value ->
flow {
emit("${value}a")
delay(200)
emit("${value}b")
}
}
// 输出顺序:1a → 2a → 1b → 2b(并行,速度最快但顺序不定)
// flatMapLatest:取最新——新值到达时取消上一个内部 flow
sourceFlow.flatMapLatest { value ->
flow {
emit("${value}a")
delay(200)
emit("${value}b") // 如果 value=2 在这里到达,此行被取消
}
}
// 输出顺序:1a → 2a → 2b(中间值 1b 被跳过)
选型原则:
| 场景 | 推荐操作符 |
|---|---|
| 顺序依赖(第 N 步依赖第 N-1 步结果) | flatMapConcat |
| 独立并行(多个 API 并发请求,都需要结果) | flatMapMerge |
| 取最新(搜索建议、实时过滤) | flatMapLatest |
组合操作符:zip vs combine 的语义差异
val nums = flow { emit(1); delay(300); emit(2); delay(300); emit(3) }
val strs = flow { emit("A"); delay(400); emit("B"); delay(400); emit("C") }
// zip:像拉链——必须等两条流各自提供一个值,才能组合一对
nums.zip(strs) { n, s -> "$n-$s" }.collect { println(it) }
// 输出:1-A(400ms), 2-B(800ms), 3-C(1200ms)
// 节奏由较慢的那条流决定。流结束时 zip 也结束。
// combine:取最新——任意一条流发射新值,立即与另一条流的"当前最新值"组合
nums.combine(strs) { n, s -> "$n-$s" }.collect { println(it) }
// 输出(大约):
// 1-A(400ms,strs 先到,等 nums)
// 2-A(300ms 后 nums 发射 2,strs 仍是 A)
// 2-B(100ms 后 strs 发射 B)
// 3-B(300ms 后 nums 发射 3)
// 3-C(100ms 后 strs 发射 C)
zip是"配对快递"——双方都必须准备好才能发货;combine是"实时报价"——任何一方有更新就立刻重新计算,另一方沿用上次的值。
combine 在 Android UI 开发中极为常见:当用户名输入框(usernameFlow)和密码输入框(passwordFlow)各自发射新值时,立刻计算"登录按钮是否可用"——而不需要两者同步更新。
过滤与去重操作符
// distinctUntilChanged:只有值真正改变时才向下游传递
// 对于 StateFlow 和数据库查询流尤其有用——避免重复渲染
flow { emit(1); emit(1); emit(2); emit(2); emit(1) }
.distinctUntilChanged()
.collect { println(it) } // 输出:1, 2, 1
// debounce:防抖——在静默期过后才发射
// 典型场景:搜索框,用户停止输入 300ms 后才发起请求
searchQueryFlow
.debounce(300L)
.flatMapLatest { query -> searchApi(query) }
.collect { results -> updateUI(results) }
// sample:节流——每隔固定时间取一个最新值
// 典型场景:传感器读数,每 100ms 刷新一次 UI,丢弃中间值
sensorFlow
.sample(100L)
.collect { value -> updateGauge(value) }
线程切换与 flowOn 的内部机制
flowOn 只影响上游
flow {
println("上游执行线程: ${Thread.currentThread().name}") // IO
emit(heavyComputation())
}
.map {
println("map 执行线程: ${Thread.currentThread().name}") // IO
it * 2
}
.flowOn(Dispatchers.IO) // ← 只影响 flowOn 左侧(上游)的所有操作
.collect { result ->
println("collect 执行线程: ${Thread.currentThread().name}") // Main
updateUI(result)
}
这是 Flow 与 RxJava 的一个重要设计差异:
| 工具 | 线程切换模型 |
|---|---|
| RxJava | subscribeOn 指定上游生产线程,observeOn 指定下游消费线程,两套机制 |
| Kotlin Flow | flowOn 只影响其上游,collect 在调用者线程执行,单一机制 |
flowOn 的设计更自然:你在哪里 collect,就在哪里消费。通过 flowOn 声明生产者应该在哪个调度器上运行,就像给生产环境贴标签——消费者不需要关心生产发生在哪里。
flowOn 的内部实现:隐式 Channel 桥接
flowOn 切换 Dispatcher 时,无法在同一个协程中完成——不同 Dispatcher 意味着不同线程。因此,flowOn 在内部创建了一个隐式的 Channel 缓冲区,将上游(IO 线程)和下游(Main 线程)桥接起来:
flowOn 内部架构图:
IO 线程 Main 线程
┌──────────────────────┐ ┌───────────────────────┐
│ 上游协程(生产者) │ │ 下游协程(消费者) │
│ │ │ │
│ flow { emit(x) } │──值──► │ Channel(默认 64) │──► collect { }
│ .map { } │ │ │
└──────────────────────┘ └───────────────────────┘
↑ │
└────────── 背压信号─────────────────┘
(Channel 满时,上游挂起,等待消费者消化)
当你在一条链上写了多个 flowOn 时,Kotlin 会进行操作符融合(Operator Fusion)——相邻的同类操作符合并成一个 Channel,避免多余的上下文切换开销。
背压处理:buffer、conflate、collectLatest
"背压"(Backpressure)是指生产者速度快于消费者速度时的处理策略。这是所有流式系统都必须面对的根本问题。
问题的根源
// 模拟:生产者每 100ms 发一个值,消费者每次处理需要 300ms
flow {
repeat(10) { i ->
emit(i)
delay(100)
}
}
.collect { value ->
delay(300) // 耗时处理
println("收到: $value")
}
// 问题:生产者积压了,等待消费者处理完才能继续发射
// 总耗时约 3000ms(10 * 300ms),生产者速度优势完全被浪费
默认的 Flow 是完全顺序的:emit 会等待 collect 处理完才返回。这保证了安全,但牺牲了吞吐量。当生产者远快于消费者时,有以下三种策略:
buffer:隔离生产与消费,允许生产者跑在前面
flow {
repeat(10) { i ->
emit(i)
delay(100)
}
}
.buffer(capacity = 5) // 分配一个容量为 5 的 Channel 做缓冲
.collect { value ->
delay(300)
println("收到: $value")
}
// 效果:生产者最多比消费者领先 5 个值
// 生产者不会因消费者慢而每次等待 300ms
buffer 的内部机制:它在上游和下游之间插入一个 Channel,启动一个独立协程运行上游,主协程继续运行消费者。当 Channel 满时(capacity = 5 时最多积压 5 个),上游协程自动挂起——这就是天然的背压信号:消费者的处理能力上限制约了生产速度,而不是生产者无限制地堆积数据。
buffer() 的默认容量(不传参数)是 Channel.BUFFERED,JVM 上默认为 64。
conflate:只保留最新值,丢弃中间值
flow {
repeat(10) { i ->
emit(i)
delay(100)
}
}
.conflate() // 等价于 buffer(capacity = 1, onBufferOverflow = DROP_OLDEST)
.collect { value ->
delay(300)
println("收到: $value") // 输出:0, 3, 6, 9(中间值被丢弃)
}
conflate 适用于只有最新状态有意义的场景——比如实时位置坐标、UI 状态帧。你不需要每一帧,只需要最新一帧。
collectLatest:新值到达时取消正在处理的旧值
flow {
repeat(5) { i ->
emit(i)
delay(100)
}
}
.collectLatest { value ->
// 当新值到来时,这里的 delay(300) 会被取消,重新启动处理新值
delay(300)
println("处理完成: $value") // 只有最后一个值能完整处理
}
// 输出:处理完成: 4(前 4 个全部被取消)
conflate 和 collectLatest 的核心区别:
| 策略 | 丢弃位置 | 机制 | 适用场景 |
|---|---|---|---|
conflate |
丢弃缓冲区中的旧值(生产侧丢弃) | Channel 溢出策略 | 只需最新状态快照 |
collectLatest |
取消正在进行的旧处理(消费侧取消) | 协程取消重启 | 处理耗时但结果必须基于最新值 |
conflate像快递驿站:满了就扔最老的包裹,腾出空间给新包裹。collectLatest像猫狗大战:新消息一来,工人立刻放下手里的旧工作,转身处理新消息。
StateFlow 深度解析
为什么 Google 推荐 StateFlow 取代 LiveData
LiveData 是 Jetpack 的早期解决方案,但它有几个根本性的局限:
| 痛点 | 描述 |
|---|---|
| 受 Android 框架绑定 | 只能在 LifecycleOwner(Activity/Fragment)中使用,无法在纯 Kotlin 模块测试 |
| 无法替代协程线程切换 | LiveData 没有 flowOn 等线程调度原语 |
| 无操作符能力 | 没有 map、filter、combine 等丰富的转换能力(Transformations.map 非常有限) |
| 初始值模糊 | LiveData 没有初始值的强制约束,新观察者可能收到 null |
StateFlow 解决了所有这些问题:它是一个热流(Hot Flow),持有当前状态,可以有多个收集者,与 Android 框架完全解耦,还拥有 Flow 的全套操作符。
MutableStateFlow 的线程安全实现:Flat Combining
MutableStateFlow 是 SharedFlow 的一个特化版,配置为 replay = 1,onBufferOverflow = DROP_OLDEST。它的核心是等式冲突(Equality-based Conflation):
val stateFlow = MutableStateFlow(0)
// 状态相同时,不触发下游收集
stateFlow.value = 0 // ← 等于当前值,下游不会收到任何通知
// 只有实际改变才触发
stateFlow.value = 1 // ← 下游收到 1
这个判断基于 Any.equals()——因此对于自定义数据类,equals 的实现至关重要:
// ⚠️ 注意:data class 的 equals 是结构相等
data class User(val name: String, val age: Int)
val userFlow = MutableStateFlow(User("Alice", 30))
userFlow.value = User("Alice", 30) // equals 返回 true → 下游不收通知
userFlow.value = User("Alice", 31) // equals 返回 false → 下游收到通知
并发更新的原子性:MutableStateFlow 使用了一种称为 Flat Combining 的无锁并发技术。内部维护一个偶数/奇数序列号(Sequence Counter):
StateFlow 内部更新协议:
序列号为偶数(quiescent状态):
线程 A 更新 value → 序列号变奇数(表示正在通知中)
→ 遍历所有订阅者,挂起中的 Continuation 排队恢复
→ 通知完成 → 序列号变偶数
序列号为奇数(正在通知):
线程 B 又来更新 value → 仅更新 value 字段,递增序列号
→ 线程 A 的通知循环检测到序列号变化,重新循环
→ 避免了"通知期间遗漏更新"的问题
这种机制保证了即使高频并发更新,订阅者也总是能看到最终一致的最新值。
多线程下安全地更新复合状态,必须用 update 扩展函数:
// ❌ 非原子的读-改-写(多线程下可能丢失更新)
stateFlow.value = stateFlow.value + 1
// ✅ 使用 update——内部用 CAS 循环保证原子性
stateFlow.update { currentValue ->
currentValue + 1 // 如果 CAS 失败(有竞争),自动重试
}
Android 中的 StateFlow 最佳实践
class LoginViewModel : ViewModel() {
// 私有可变 StateFlow——只在 ViewModel 内部可以修改
private val _uiState = MutableStateFlow<LoginUiState>(LoginUiState.Idle)
// 对外暴露不可变的 StateFlow
val uiState: StateFlow<LoginUiState> = _uiState.asStateFlow()
fun login(username: String, password: String) {
viewModelScope.launch {
_uiState.value = LoginUiState.Loading
try {
val user = withContext(Dispatchers.IO) {
authRepository.login(username, password)
}
_uiState.value = LoginUiState.Success(user)
} catch (e: Exception) {
if (e is CancellationException) throw e
_uiState.value = LoginUiState.Error(e.message ?: "未知错误")
}
}
}
}
// Fragment 中收集——必须用 repeatOnLifecycle 防止后台收集
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
updateUI(state)
}
}
}
为什么不能用
lifecycleScope.launch { viewModel.stateFlow.collect { } }?
因为lifecycleScope在 ActivityonDestroy时才取消,但 Activity 在onStop后界面已不可见——这段时间内 Flow 仍在后台更新 UI,浪费资源,甚至触发崩溃(View可能已经不可用)。repeatOnLifecycle(STARTED)在onStop时取消收集,onStart时重新启动——精确匹配 UI 的可见窗口。
SharedFlow 深度解析
三个参数的精确语义
MutableSharedFlow<T>(
replay: Int = 0, // 重放缓存大小
extraBufferCapacity: Int = 0, // 额外缓存容量
onBufferOverflow: BufferOverflow = SUSPEND // 缓冲满时的策略
)
SharedFlow 的内部缓冲区是一个统一的环形数组,总容量 = replay + extraBufferCapacity:
SharedFlow 内部缓冲区示意(replay=2, extraBufferCapacity=3):
┌────────────────────────────────────────────────────┐
│ 环形数组,总容量 = 2 + 3 = 5 │
│ │
│ [旧值] [旧值] [新值] [新值] [新值] │
│ └─────────────┘ └──────────────────┘ │
│ replay 区域 extraBuffer 区域 │
│ (新订阅者重放) (给慢速消费者缓冲) │
└────────────────────────────────────────────────────┘
replay 参数:新的订阅者 collect 时,会立即收到最近 replay 个值。这对需要"不错过历史"的场景至关重要:
val events = MutableSharedFlow<Event>(replay = 3)
// 发射 5 个事件
events.emit(Event.A)
events.emit(Event.B)
events.emit(Event.C)
events.emit(Event.D)
events.emit(Event.E)
// 新订阅者:立刻收到最近 3 个(C, D, E),然后等待后续事件
scope.launch {
events.collect { event -> handleEvent(event) }
// 立即触发:C → D → E,然后等待...
}
extraBufferCapacity 参数:给发射者提供"喘息空间"。即使消费者还没准备好,发射者也不会立刻挂起:
// replay=0, extraBufferCapacity=64:发射者最多比消费者领先 64 个值
val eventBus = MutableSharedFlow<Event>(
replay = 0,
extraBufferCapacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST // 满了丢最旧的
)
onBufferOverflow 参数:当 replay + extraBufferCapacity 个槽都被占满时的策略:
| 策略 | 行为 | 适用场景 |
|---|---|---|
SUSPEND(默认) |
emit 挂起,等待消费者 |
不允许丢失任何值 |
DROP_OLDEST |
丢弃最旧的值 | 只需最新状态,旧值无用 |
DROP_LATEST |
丢弃当前正在发射的新值 | 生产者优先级低于已有数据 |
零缓冲约束:当
replay = 0且extraBufferCapacity = 0时,onBufferOverflow必须是SUSPEND——逻辑上不存在缓冲区,自然无法选择"丢弃"策略。
用 SharedFlow 构建事件总线
SharedFlow 是替代 EventBus 和 LiveData 单次事件的理想方案:
// 应用级别的事件总线——替代 EventBus
object AppEventBus {
// replay=0:不重放历史事件(点击事件不应重复触发)
// extraBufferCapacity=1024:高吞吐,不阻塞发射者
// DROP_OLDEST:极端情况下丢弃最旧的事件
private val _events = MutableSharedFlow<AppEvent>(
replay = 0,
extraBufferCapacity = 1024,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
// 对外暴露只读的 SharedFlow
val events: SharedFlow<AppEvent> = _events.asSharedFlow()
// tryEmit 是非挂起的,适合在非协程上下文发送事件
fun publish(event: AppEvent) {
_events.tryEmit(event)
}
// 如果在协程中发送,用 emit(遵守背压)
suspend fun publishSuspend(event: AppEvent) {
_events.emit(event)
}
}
// 发射事件(任意线程)
AppEventBus.publish(AppEvent.UserLoggedOut)
// 收集事件(在 ViewModel 或 Fragment 中)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
AppEventBus.events
.filterIsInstance<AppEvent.UserLoggedOut>()
.collect {
navigateToLogin()
}
}
}
为什么不用 StateFlow 做事件总线?
StateFlow 有两个问题:1) 它的 equals 去重会"吞掉"相同内容的事件(如用户连续点击两次"刷新");2) 它的 replay = 1 会让新订阅者重复收到上一个历史事件。对于一次性事件,应使用 SharedFlow(replay=0)。
Channel vs Flow:何时用哪个
Channel 是 Flow 的底层支撑,但它们面向不同的使用场景。混淆这两者是 Kotlin 并发 Bug 的常见来源。
Channel 的本质:点对点通信原语
Channel 是协程间通信的工具——类似于 Go 语言的 channel 或 Java 的 BlockingQueue(但是非阻塞的)。它的关键特性是:值被某个消费者取走后,其他消费者不会再看到这个值。
val channel = Channel<Int>(capacity = 5)
// 生产者协程
launch {
repeat(5) { i ->
channel.send(i) // 通道满时挂起
}
channel.close() // 必须手动关闭!否则消费者永远等待
}
// 消费者协程
launch {
for (value in channel) { // receiveAsFlow() 或 for 循环消费
println("收到: $value")
}
}
Channel 与 Flow 的本质差异一张表说清楚:
| 维度 | Channel |
Flow |
|---|---|---|
| 温度 | 热(Hot),独立于消费者存在 | 冷(Cold),每次 collect 重新执行 |
| 通信模式 | 点对点(Unicast)——一个值只被一个消费者消费 | 广播(Multicast)——每个收集者收到完整的独立流 |
| 生命周期 | 手动管理(必须 close()) |
自动绑定到 CoroutineContext |
| 操作符 | 无内置操作符(需手动实现) | 丰富的操作符链 |
| 适用层次 | 底层并发通信 | 业务逻辑和 UI 层 |
决策框架
你的需求是什么?
│
├── 在两个协程之间传递任务/数据(点对点通信)?
│ └── 用 Channel(Producer-Consumer 模式)
│
├── 处理随时间产生的值(有顺序、需要变换)?
│ ├── 每个收集者需要独立、完整的数据序列?
│ │ └── 用普通 Flow(冷流)
│ │
│ ├── 多个收集者共享同一数据,且数据代表状态?
│ │ └── 用 StateFlow(持有最新状态,新订阅者立即收到)
│ │
│ └── 多个收集者共享事件流,不代表持久状态?
│ └── 用 SharedFlow(可配置重放和缓冲策略)
│
└── 需要在 Flow 中进行并发 emit(冲破 SafeCollector 的顺序约束)?
└── 用 channelFlow { } 构建器
channelFlow:桥接并发与流
当你需要在 flow { } 块中并发发射值(比如同时发起多个请求并合并结果),普通的 flow { } 会抛出 IllegalStateException。这时用 channelFlow:
// channelFlow 内部使用 Channel,允许并发 emit
fun fetchFromMultipleSources(): Flow<Result> = channelFlow {
// 并发发起三个请求,无论哪个先完成都向下游发射结果
launch {
val result = fetchFromCache()
send(result) // 用 send 代替 emit(因为是 Channel 发送)
}
launch {
val result = fetchFromNetwork()
send(result)
}
launch {
val result = fetchFromLocalDB()
send(result)
}
// channelFlow 会自动等待所有 launch 完成后关闭 channel
}
channelFlow是冷的(每次 collect 重新执行),但内部send是并发安全的——它突破了SafeCollector的顺序约束,代价是引入了 Channel 的缓冲和调度开销。
操作符的性能优化:融合与短路
操作符融合
多个相邻的 Channel 类操作符(flowOn、buffer、conflate)会被融合成一个 Channel,避免不必要的中间缓冲:
flow { ... }
.buffer(10)
.flowOn(Dispatchers.IO)
// 内部优化:不会创建两个 Channel,而是融合为一个配置了 IO dispatcher 且容量为 10 的 Channel
take 操作符与协程取消的结合
// 只收集前 3 个值,然后自动停止(触发协程取消)
flow {
repeat(1_000_000) {
emit(it) // 发射第 3 个后,flow 被取消,循环立即停止
}
}
.take(3)
.collect { println(it) }
// 输出:0, 1, 2
take 的实现依赖协程取消机制:取到足够数量后,下游在内部抛出 FlowCollectionException(一种特殊的 CancellationException),上游中断执行,整个 Flow 收集干净结束——不会真正执行 100 万次循环。
Flow 测试策略
在单元测试中收集 Flow 需要特别注意生命周期问题:
// 推荐:使用 turbine 库(专为 Flow 测试设计)
// 或者使用 toList() 收集有限流
@Test
fun `stateFlow should emit loading then success`() = runTest {
val viewModel = LoginViewModel(testRepository)
// 使用 backgroundScope 让热流在测试完成后自动取消
val states = mutableListOf<LoginUiState>()
backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
viewModel.uiState.collect { states.add(it) }
}
viewModel.login("alice", "password123")
// 推进虚拟时间(runTest 提供的能力)
advanceUntilIdle()
assertEquals(LoginUiState.Idle, states[0])
assertEquals(LoginUiState.Loading, states[1])
assertTrue(states[2] is LoginUiState.Success)
}
本章小结
本文从 SafeFlow 的源码出发,完整剖析了 Kotlin Flow 的设计哲学与内部机制:
| 知识点 | 核心结论 |
|---|---|
| 冷流语义 | flow { } 只在 collect 时执行,每个收集者独立触发一次完整执行 |
| SafeCollector | 强制顺序发射约束(Sequential Emission),禁止在 flow { } 内 withContext 切换线程 |
| 操作符实现 | 每个操作符创建一个新 Flow,在其 collect 中收集上游——纯惰性求值,无中间缓存 |
| flowOn | 只影响上游操作符;内部用隐式 Channel 桥接不同 Dispatcher;相邻操作符可融合 |
| 背压三策略 | buffer 用容量限流;conflate 只保留最新值(DROP_OLDEST);collectLatest 取消旧处理 |
| StateFlow | SharedFlow(replay=1)的特化;等式冲突避免重复通知;Flat Combining 保证并发安全 |
| SharedFlow 缓冲区 | 统一环形数组,总容量 = replay + extraBufferCapacity;三种溢出策略分别对应不同需求 |
| Channel vs Flow | Channel 是点对点通信原语(热/Unicast/手动管理);Flow 是声明式处理管道(冷/Broadcast/自动管理);channelFlow 桥接并发发射需求 |
Flow 是 Kotlin 协程生态的最终形态——它将 suspend 的非阻塞特性、结构化并发的生命周期管理、以及函数式流处理的表达力,融合成一套一致的、可组合的 API。掌握了 Flow,你就掌握了以声明式方式描述任意异步数据管道的能力。