Dart 异步编程:事件循环、Future、Stream 与 Isolate
Dart 的并发模型
Dart 是单线程的。这听起来像限制,实际上是设计:单线程消除了多数竞态条件,代码推理更简单。但单线程如何处理 IO 等耗时操作?靠事件循环。
事件循环
Dart 的事件循环可以类比成一家餐厅的运转:你(主线程)只有一个身体,但不是每道菜都得亲自盯着做——下单给厨房(注册异步操作),然后继续招呼其他客人,等厨房喊「好了」(事件完成)再去取菜(回调执行)。
两个队列
Dart 内部有两个任务队列:
┌─────────────────────────────────────────┐
│ 主执行栈(Main) │
│ 当前正在执行的同步代码 │
└────────────────────┬────────────────────┘
│ 主栈为空时,按优先级取任务
┌────────────▼────────────┐
│ MicroTask Queue │ ← 高优先级(Future.then、scheduleMicrotask)
│ (每轮事件前先清空) │
└────────────┬────────────┘
│ MicroTask 为空时
┌────────────▼────────────┐
│ Event Queue │ ← 普通优先级(IO、Timer、用户输入)
│ (一次取一个事件) │
└─────────────────────────┘
关键规律:
- 主栈执行完毕后,先清空所有 MicroTask,再执行一个 Event
- 每执行完一个 Event,再次先清空 MicroTask,然后取下一个 Event
Future.then的回调放入 MicroTask;Timer、IO 放入 Event Queue
执行顺序示例
void main() {
print('1'); // 同步,立即执行
Future(() => print('4')); // Event Queue
Future.microtask(() => print('2')); // MicroTask
Future.value(3).then((v) => print(v)); // MicroTask(.then 放 MicroTask)
print('5'); // 同步,立即执行
}
// 输出顺序:1 → 5 → 2 → 3 → 4
// 解析:
// 同步:1, 5
// 主栈清空后,清 MicroTask:2(microtask),3(Future.value.then)
// 再取 Event:4
Future
Future<T> 代表一个未来某时刻会产生 T 类型值的异步操作,类似 JavaScript 的 Promise。
基本用法
// 创建 Future
Future<String> fetchUser(int id) async {
final response = await http.get(Uri.parse('/users/$id'));
return response.body;
}
// 消费 Future
fetchUser(1)
.then((data) => print('Got: $data'))
.catchError((e) => print('Error: $e'))
.whenComplete(() => print('Done'));
// 或者用 await(更推荐)
try {
final data = await fetchUser(1);
print('Got: $data');
} catch (e) {
print('Error: $e');
}
async/await 的本质
async/await 是语法糖,编译器会把它转换成 .then 链:
// 你写的
Future<String> load() async {
final a = await step1();
final b = await step2(a);
return b;
}
// 编译器理解为(简化版)
Future<String> load() {
return step1().then((a) {
return step2(a).then((b) {
return b;
});
});
}
理解这个转换,就能理解为什么 await 是「让出控制权」而不是「阻塞线程」——它只是把后续代码包装进了 .then,主线程继续跑事件循环。
Future.wait 并发执行
// ❌ 串行:总耗时 = 2s + 3s = 5s
final a = await fetchA(); // 2s
final b = await fetchB(); // 3s
// ✅ 并发:总耗时 = max(2s, 3s) = 3s
final [a, b] = await Future.wait([fetchA(), fetchB()]);
Future.wait 同时启动所有 Future,等待所有完成才返回。任意一个失败就整体失败。
Stream
如果 Future 是「一个快递」,那 Stream 就是「快递订阅服务」:可以持续收到多个数据,直到流结束或取消订阅。
Future: ●────────────────────→ [单个值]
Stream: ●──●──●──●──●──────→ X [多个值,最终结束或出错]
两种 Stream
// 单订阅 Stream(默认):只能被订阅一次
Stream<int> countStream() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i; // yield 逐个发出值
}
}
// 广播 Stream:可以被多次订阅
final broadcastStream = someStream.asBroadcastStream();
async* 与 yield
async* 创建异步生成器,yield 逐个发出值:
Stream<String> watchFile(String path) async* {
while (true) {
await Future.delayed(Duration(seconds: 1));
final content = await File(path).readAsString();
yield content; // 每秒发出一次文件内容
}
}
// 监听
await for (final content in watchFile('config.txt')) {
print('File changed: $content');
}
StreamController
手动控制 Stream 的发送:
final controller = StreamController<int>();
// 发送数据
controller.add(1);
controller.add(2);
controller.addError(Exception('oops'));
controller.close(); // 流结束
// 订阅
controller.stream.listen(
(data) => print('Data: $data'),
onError: (e) => print('Error: $e'),
onDone: () => print('Stream closed'),
);
Stream 操作符
Stream 支持类似 Rx 的操作符:
Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5]);
numbers
.where((n) => n % 2 == 0) // 过滤偶数
.map((n) => n * 10) // 转换
.timeout(Duration(seconds: 5)) // 超时
.listen(print); // 输出:20, 40
Isolate:真正的并行
Dart 单线程解决了竞态,但 CPU 密集型任务(图像处理、JSON 解析大文件)仍会阻塞主线程,导致 UI 卡顿。解决方案是 Isolate。
Isolate 是 Dart 的「轻量级进程」:拥有独立的内存堆,不共享内存,通过消息传递通信。不能共享内存意味着不会有竞态,但也意味着传递大对象时有序列化开销。
Main Isolate Worker Isolate
│ │
│──── SendPort ─────────▶│
│ (发消息) │ 执行计算
│◀──── SendPort ─────────│
│ (收结果) │
│ │
UI 不卡顿 CPU 密集工作
使用 compute(推荐)
Flutter 提供了 compute 函数,封装了 Isolate 的创建/销毁:
// 必须是顶层函数或静态方法(Isolate 不能传递闭包)
List<int> _sortHeavy(List<int> data) {
data.sort();
return data;
}
// 在 UI 线程调用,实际在新 Isolate 执行
final sorted = await compute(_sortHeavy, largeList);
手动管理 Isolate
import 'dart:isolate';
Future<void> runInIsolate() async {
final receivePort = ReceivePort();
final isolate = await Isolate.spawn(
_worker,
receivePort.sendPort, // 把接收端口传给 worker
);
// 等待结果
final result = await receivePort.first;
print('Result: $result');
isolate.kill();
receivePort.close();
}
// worker 函数:在独立 Isolate 中运行
void _worker(SendPort sendPort) {
// 执行耗时计算
int sum = List.generate(1000000, (i) => i).reduce((a, b) => a + b);
sendPort.send(sum); // 发回结果
}
Isolate.run(Dart 2.19+)
// 更简洁的一次性 Isolate
final result = await Isolate.run(() {
// 在新 Isolate 中执行,结果自动传回
return expensiveComputation();
});
总结对比
| 机制 | 适用场景 | 是否并行 | 内存共享 |
|---|---|---|---|
| Future/await | IO 操作、网络请求 | ❌(并发非并行) | ✅ |
| Stream | 持续数据流、WebSocket | ❌(并发非并行) | ✅ |
| Isolate/compute | CPU 密集型计算 | ✅(真并行) | ❌ |
并发 vs 并行:Future 是并发(交替执行,还是单线程),Isolate 是并行(真正同时执行,多线程/核心)。