序列(Sequence)的惰性求值原理
问题的根源:集合操作链的"瀑布代价"
在前一篇文章《Kotlin 集合框架的设计与实现》中,我们提到了集合操作链的一个性能陷阱:每个中间操作都会立刻遍历全部元素,并创建一个新的中间集合。
以一个典型场景为例:从十万条用户记录中找出前 5 个符合条件的用户名(全大写)。
val result = users // 10 万条记录
.filter { it.isActive } // ① 遍历 10 万条,新建 ArrayList(假设 6 万条)
.map { it.name.uppercase() } // ② 遍历 6 万条,新建 ArrayList(6 万条)
.take(5) // ③ 从 6 万条中取前 5 个
最终我们需要的只有 5 条数据,但程序实际执行了:
- 第 ①步:遍历 10 万次
isActive检查 → 分配一块能装约 6 万元素的ArrayList - 第 ②步:遍历 6 万次字符串转换 → 再分配一块 6 万元素的
ArrayList - 第 ③步:从 6 万元素中取前 5 个
绝大多数工作都是徒劳的。这就是及早求值(Eager Evaluation) 的代价:它无法感知下游操作的意图,每一步都必须把当前阶段的工作做到 100% 才会停下来。
Sequence 就是为了解决这个问题而生的。它采取了截然相反的策略:惰性求值(Lazy Evaluation)——不到最后一刻决不动手。
Sequence 的核心哲学:纵向流动,而非横向扫描
理解 Sequence 的关键,在于理解两种不同的元素处理顺序。
用一个流水线工厂的比喻来说:
集合的"批处理"模式:把所有原材料先全部做完"打磨"工序,堆成一座山;再把这座山全部做完"抛光"工序,堆成另一座山;最后才从这座山上挑出 5 件成品。中间堆起来的两座山是纯粹的浪费。
序列的"流水线"模式:第一块原材料进来,先"打磨",再"抛光",如果它符合要求就直接进入成品箱;然后第二块原材料进来,重复这个过程。装满 5 件成品后,流水线立刻停机,剩余原材料根本不需要碰。
用技术语言来描述这两种模式:
【集合的横向处理(Horizontal Processing)】
原始数据: [A, B, C, D, E, F, G, H ...]
↓ filter(全量扫描)
中间集合1: [A, C, E, G ...] ← 新分配内存
↓ map(全量扫描)
中间集合2: [a, c, e, g ...] ← 新分配内存
↓ take(2)
最终结果: [a, c]
【序列的纵向处理(Vertical Processing)】
原始数据: [A, B, C, D, E, F, G, H ...]
↓
A → filter → 通过 → map → a → take → 计数1
B → filter → 过滤 ✗
C → filter → 通过 → map → c → take → 计数2 → 停机!
最终结果: [a, c]
每个元素一次性穿过整条操作链,中间不存储任何临时结果。
Sequence 的实现原理:装饰器链
接口定义:和 Iterable 如出一辙
Sequence<T> 的接口定义极其简单:
// kotlin/sequences/Sequences.kt
public interface Sequence<out T> {
public operator fun iterator(): Iterator<T>
}
和 Iterable<T> 几乎一样,都只有一个 iterator() 方法。那么,为什么行为会截然不同?
差别不在接口定义,而在契约(Contract):
Iterable的iterator()被期望返回一个已有集合的迭代器。Sequence的iterator()被设计为按需产生元素的迭代器,只有当外部调用next()时,才真正去计算并返回下一个元素。
中间操作:惰性包装,构造器链
当你在 Sequence 上调用 filter { } 时,没有任何元素被处理。发生的事情是:返回一个新的 Sequence 实例,这个实例包裹(wrap)了原来的序列,并把 predicate lambda 存储起来,留待将来使用。
这正是**装饰器模式(Decorator Pattern)**的经典应用。
以 filter 为例,查看 Kotlin 标准库源码:
// kotlin/sequences/Sequences.kt(简化)
public fun <T> Sequence<T>.filter(predicate: (T) -> Boolean): Sequence<T> {
return FilteringSequence(this, true, predicate)
}
// FilteringSequence 是一个内部包装类
internal class FilteringSequence<T>(
private val sequence: Sequence<T>, // 持有对上游序列的引用
private val sendWhen: Boolean = true,
private val predicate: (T) -> Boolean // 存储 predicate,稍后使用
) : Sequence<T> {
override fun iterator(): Iterator<T> = object : Iterator<T> {
val iterator = sequence.iterator() // 向上游索要 Iterator
var nextState: Int = -1 // -1: 未确定, 0: 没有下一个, 1: 有下一个
var nextItem: T? = null
private fun calcNext() {
while (iterator.hasNext()) {
val item = iterator.next() // 从上游拉一个元素
if (predicate(item) == sendWhen) { // 检查 predicate
nextItem = item
nextState = 1
return
}
}
nextState = 0 // 上游耗尽
}
override fun next(): T {
if (nextState == -1) calcNext()
// ...返回 nextItem
}
override fun hasNext(): Boolean {
if (nextState == -1) calcNext()
return nextState == 1
}
}
}
map 操作也是类似的道理:
public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R> {
return TransformingSequence(this, transform)
}
internal class TransformingSequence<T, R>(
private val sequence: Sequence<T>,
private val transformer: (T) -> R
) : Sequence<R> {
override fun iterator(): Iterator<R> = object : Iterator<R> {
val iterator = sequence.iterator()
override fun next(): R = transformer(iterator.next()) // 拉上游元素,即时变换
override fun hasNext(): Boolean = iterator.hasNext()
}
}
当你写下一条操作链:
list.asSequence()
.filter { it > 3 }
.map { it * it }
.take(2)
实际上在内存中构建了一条包装器链:
TakeSequence
└── TransformingSequence (map)
└── FilteringSequence (filter)
└── IterableSequence (来自 asSequence())
└── 原始 List
此时,没有任何元素被处理过。整个链只是一堆存储了 lambda 的小对象。
终端操作:触发"拉动"
当你调用 toList()、first()、forEach() 等**终端操作(Terminal Operation)**时,真正的工作才开始。
val result = sequence.toList()
toList() 内部调用了最外层(TakeSequence)的 iterator(),而 TakeSequence.iterator() 又调用了 TransformingSequence.iterator(),一直递归到最底层的 IterableSequence。每次外层调用 hasNext() / next(),请求就像水波一样向上游传递,直到最底层的 List 真正地拿出一个元素,然后这个元素逐层向下游流过整条操作链。
用拉动(Pull)这个词来描述非常形象:终端操作"拉动"了整条链,而不是由源头"推送"数据。这与 Java 8 Stream 的 push 模型有细微差异(虽然 Kotlin Sequence 和 Java Stream 在语义上都是 pull-based,但实现机制各有侧重)。
asSequence() 背后的 ConstrainedOnceSequence
将一个集合转成序列最常用的方式是 asSequence():
val seq = listOf(1, 2, 3).asSequence()
其实现如下:
// kotlin/collections/Collections.kt
public fun <T> Iterable<T>.asSequence(): Sequence<T> {
return Sequence { this.iterator() }
}
这里用了一个 SAM 转换,Sequence { this.iterator() } 实际上是:
object : Sequence<T> {
override fun iterator(): Iterator<T> = this@asSequence.iterator()
}
每次调用 iterator() 都会从原始 Iterable 获取一个全新的 Iterator,所以这个序列可以被多次迭代——每次都重新遍历底层集合。
而 sequence { ... } 构建器产生的序列则不同,它通常只能被迭代一次(因为协程状态机只能向前推进,不能回头)。如果你尝试对它迭代两遍,会得到一个异常。Kotlin 用 ConstrainedOnceSequence 来包装它,确保二次迭代时抛出 IllegalStateException,而不是产生难以追查的静默错误行为。
构建自定义序列的两种方式
generateSequence:函数式风格
generateSequence 提供了一种简洁的方式来定义序列的"下一个元素"规则:
// 从 1 开始,每次乘以 2,到超过 1000 时停止
val powersOfTwo = generateSequence(1) { prev ->
if (prev < 512) prev * 2 else null // 返回 null 时序列终止
}
println(powersOfTwo.toList()) // [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
它的实现原理是维护一个内部状态变量,每次调用 next() 时执行 nextFunction,把结果存储为新的当前值:
// 简化版实现逻辑
internal class GeneratorSequence<T : Any>(
private val getInitialValue: () -> T?,
private val getNextValue: (T) -> T?
) : Sequence<T> {
override fun iterator(): Iterator<T> = object : Iterator<T> {
var nextItem: T? = null
var nextState: Int = -1 // 未计算
private fun calcNext() {
nextItem = if (nextState == -1)
getInitialValue() // 第一次:取种子值
else
nextItem?.let { getNextValue(it) } // 后续:用 nextFunction 计算
nextState = if (nextItem == null) 0 else 1
}
// ... hasNext / next 实现
}
}
generateSequence 不依赖协程,是纯粹的同步实现。它适合"每个元素只依赖前一个元素"的简单场景:斐波那契数列(简单版)、等差数列、链表遍历等。
sequence { yield() }:协程驱动的状态机
对于无法用"上一个 → 下一个"简单函数描述的复杂序列,可以使用 sequence 构建器:
// 生成斐波那契数列(无限序列)
val fibonacci = sequence {
var a = 0
var b = 1
while (true) {
yield(a) // ← 暂停,把 a 交给消费方
val next = a + b
a = b
b = next
// 消费方请求下一个元素时,这里继续执行
}
}
println(fibonacci.take(10).toList()) // [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
这里隐藏着一个编译器的魔法:yield 是一个挂起函数(suspending function)。sequence { ... } 内部使用协程机制,将整个 lambda 编译成一个状态机。
每次执行到 yield(value) 时:
- 当前函数状态(局部变量
a、b等)被保存到状态机对象中。 - 将
value返回给消费方(即调用iterator.next()的地方)。 - 消费方处理完这个值,下次再调用
next()时,状态机从保存的位置恢复执行。
这和前置文章《Kotlin 协程的工作原理》中描述的 CPS(续延传递风格)转换如出一辙。本质上,sequence { } 是协程机制在同步场景下的一次精妙复用。
【sequence builder 执行流程】
消费方: iterator.next()
↓ 恢复协程
序列体: yield(a=0) → 暂停,返回 0 给消费方
...
消费方: iterator.next()
↓ 恢复协程
序列体: a=1, b=1, yield(a=1) → 暂停,返回 1
...
重要约束:
sequence { }内部运行在一个**受限协程作用域(SequenceScope)**中。你只能调用yield和yieldAll,不能调用任意挂起函数(如delay、网络请求等)。这是刻意的设计:序列是同步的,如果需要异步产生数据,应该使用Flow(参见《Flow 响应式数据流深度解析》)。
用 yieldAll 简化嵌套结构
yieldAll 可以一次性产出一个 Iterable、Iterator 或另一个 Sequence 的全部元素:
// 遍历树结构,产出所有叶子节点值(深度优先)
val treeNodes = sequence {
fun visit(node: TreeNode) {
if (node.isLeaf) {
yield(node.value) // 产出叶子
} else {
for (child in node.children) {
yieldAll(generateSequence { /* 递归 */ null }) // 简化示意
// 实际可以直接 yieldAll(visit sequence)
}
}
}
visit(root)
}
这种写法用看似顺序的命令式代码表达了深度优先遍历的逻辑,不需要手动维护栈。
何时用 Sequence,何时不用
适合用 Sequence 的场景
1. 操作链长 + 数据量大
链式操作步骤越多、数据集越大,Sequence 比集合节省的内存和时间就越可观。一般来说,数据量超过数千条、操作链超过 3 步时,转用 Sequence 是合理的选择。
2. 存在短路操作
first()、find()、any()、takeWhile()、take(n) 这类操作一旦满足条件就会停止遍历。Sequence 的惰性特性使这类"早停"操作效率极高:
// 在百万条记录中找第一个符合条件的——Sequence 找到即停
val hit = hugeList.asSequence()
.filter { it.isValid() }
.map { it.toDto() }
.firstOrNull { it.priority > 8 } // 找到第一个就停下来
3. 无限序列
集合无法表示无限数据,但 Sequence 可以轻松建模无限流,结合 take(n) 来获取前 N 个:
// 无限质数序列(仅示意)
val primes = generateSequence(2) { n ->
var next = n + 1
while (true) {
if ((2 until next).none { next % it == 0 }) return@generateSequence next
next++
}
next
}
println(primes.take(10).toList()) // [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
不适合用 Sequence 的场景
1. 数据量小
每个 filter { }、map { } 调用都会创建一个新的 FilteringSequence / TransformingSequence 对象,并在终端操作时建立迭代器链。这条链的反射式调用有固定开销。当集合只有几十、几百个元素时,这些开销可能反超创建几个小 ArrayList 的开销(GC 友好,JIT 易优化)。
// ❌ 对小集合使用 Sequence——引入不必要开销
val small = listOf(1, 2, 3, 4, 5)
.asSequence()
.filter { it > 2 }
.map { it * 2 }
.toList()
// ✅ 小集合直接用集合操作——inline 函数内联后效率更高
val small = listOf(1, 2, 3, 4, 5)
.filter { it > 2 }
.map { it * 2 }
2. 需要有状态的操作
sorted()、distinct()、groupBy() 等操作本质上需要"看到所有元素"才能完成计算——sorted() 不知道最后一个元素就不知道第一个该排第几。这类操作即使在 Sequence 中,也会在内部触发全量遍历(变成"状态型操作"节点),惰性优化在这里失效。
// 注意:sorted() 是有状态操作,它会强制遍历所有元素
val result = list.asSequence()
.filter { it.isActive }
.sorted() // ← 这里会等所有 filter 后的元素都拿到,才开始排序
.take(5)
.toList()
如果操作链中有状态型操作,需要在设计时评估:惰性带来的收益是否还能抵消 Sequence 包装的额外开销。
字节码层面:Sequence 操作 vs 集合操作
对比以下两段 Kotlin 代码的字节码产物,可以更直观地看清差异。
集合版本(通过 filter):
// Kotlin 源码
val result = list.filter { it > 3 }
// 反编译字节码中的等效 Java(filter 是 inline 函数,lambda 被内联)
List result = new ArrayList();
for (Integer item : list) {
if (item > 3) { // ← lambda 被内联为 if 判断
result.add(item);
}
}
filter 是 inline 函数,lambda 在编译期被内联,没有额外的函数对象分配,整个操作变成了一个简单的循环。这就是小集合用 Kotlin 集合操作比 Stream 快的原因之一。
Sequence 版本(通过 filter):
// Kotlin 源码
val seq = list.asSequence().filter { it > 3 }
// 反编译字节码等效 Java
Sequence seq = new FilteringSequence(
CollectionsKt.asSequence(list), // ← 包装原始 List
true,
new Function1<Integer, Boolean>() { // ← lambda 作为对象传入构造器
@Override
public Boolean invoke(Integer it) {
return it > 3;
}
}
);
// 此时没有任何迭代发生,seq 只是一个装好 lambda 的 FilteringSequence 对象
Sequence 的中间操作不是 inline 的(因为中间操作返回一个新对象,需要把 lambda 存下来,无法内联),所以 lambda 会变成一个函数对象实例。这就是 Sequence 相比集合操作有一定固定开销的字节码层原因。
Sequence vs Java Stream:设计哲学的分野
Kotlin 的 Sequence 和 Java 8 的 Stream 都提供了惰性计算集合管道,但在设计哲学上有清晰的分野:
| 维度 | Kotlin Sequence | Java Stream |
|---|---|---|
| 多平台 | ✅ JVM / JS / Native 统一抽象 | ❌ 仅 JVM |
| 并行处理 | ❌ 无内置并行 | ✅ .parallelStream() |
| 基本类型优化 | ❌ 需要装箱 | ✅ IntStream / LongStream 避免装箱 |
| 空值处理 | ✅ Kotlin 原生空安全 | ▶ Optional<T> 包装(更繁琐) |
| API 统一性 | ✅ 与集合操作同名,.asSequence() 无缝切换 |
❌ 独立 API 体系,.stream() 切入 |
| lambda 内联 | ❌ 中间操作 lambda 无法内联 | ❌ 同样无法内联 |
| 终端操作后可复用 | ✅ 大多数 Sequence 可多次迭代 | ❌ Stream 一旦终端操作后不可复用 |
| 调试友好 | ▶ 一般 | ▶ IntelliJ/Java 11+ 有专门调试支持 |
选型建议:
- 绝大多数场景:优先用 Kotlin
Sequence,API 简洁,与集合操作无缝切换,Kotlin null 安全集成。 - 需要并行处理大数据集:切换到
list.stream().parallel()(Java Stream 的parallelStream是 Sequence 无法比拟的优势)。 - 大量原始类型运算(如信号处理、数值计算):Java Stream 的
IntStream避免了 Kotlin Sequence 对基本类型的装箱开销。
常见陷阱:Sequence 的"一次性"问题
sequence { } 构建器产生的序列只能被迭代一次。这是因为它的内部状态机在首次遍历后就已推进至末尾,无法倒带:
val seq = sequence {
yield(1)
yield(2)
yield(3)
}
println(seq.toList()) // [1, 2, 3] ✅
println(seq.toList()) // 抛出 IllegalStateException: This sequence can be consumed only once.
Sequence 的文档明确说明:某些序列实现可能限制只能迭代一次。在设计 API 时,如果需要一个可重复消费的惰性序列,应当使用 generateSequence 或 asSequence()(从 Iterable 转换),而非 sequence { } 构建器。
另一个常见陷阱是在 Sequence 上调用有状态操作后误以为还是惰性的:
val result = list.asSequence()
.filter { it > 0 }
.sorted() // ← 有状态操作:强制全量遍历,惰性在此中断
.take(5) // ← 此处的 take 只对 sorted 后的全量结果生效
sorted() 被调用时,Sequence 被迫收集所有 filter 后的元素并排序,结果变成一个普通集合再继续。如果你的目标是"过滤后取最小的 5 个",应该考虑使用 minOfN 算法或 sortedBy 结合后续操作,或者分析有状态操作是否真的必要。
总结
Sequence 通过装饰器链 + 惰性迭代器实现了真正的按需计算:
- 中间操作(
filter、map、take等)不执行任何元素处理,而是将自身和 lambda 封装成一个新的Sequence包装器,层层叠加形成处理链。 - 终端操作(
toList、first、forEach等)触发外层Sequence的iterator(),请求沿链向上传递,底层Iterator一次只生产一个元素,逐个穿过整条链。 - 构建方式:
asSequence()让现有集合惰性化;generateSequence用函数式方式定义元素规则;sequence { yield() }借助协程状态机实现任意复杂的元素生成逻辑。 - 适用边界:数据量大、操作链长、存在短路操作或需要无限序列时,
Sequence显著优于集合操作链;数据量小时,集合操作的inline内联往往更快。 Sequence解决的是同步惰性计算问题;异步场景(如网络数据流)则属于Flow的领地——尽管sequence { yield() }复用了协程机制,但它的本质仍是同步阻塞的。