并发工具类
mediumCountDownLatchCyclicBarrierSemaphoreCompletableFuture并发工具
JDK 提供了一系列开箱即用的并发工具类,用于解决线程间的协调与通信问题。不需要自己用 wait/notify 造轮子——理解这些工具的适用场景,直接用就好。
CountDownLatch(倒计数门闩)
一次性的:一个(或多个)线程等待其他 N 个线程完成后再继续。
主线程:await() 阻塞
│
┌──────────┼────────────┐
│ │ │
线程A 线程B 线程C
│ │ │
countDown() countDown() countDown()
│ │ │
└──────────┼────────────┘
│
计数归零,主线程继续
典型场景
CountDownLatch latch = new CountDownLatch(3);
// 启动 3 个工作线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
doWork();
} finally {
latch.countDown(); // 完成后计数减 1
}
}).start();
}
latch.await(); // 主线程阻塞,直到计数为 0
System.out.println("所有任务完成");
// 也可以设置超时
latch.await(10, TimeUnit.SECONDS);
注意:CountDownLatch 是一次性的,计数归零后不能重置。如果需要重复使用,用 CyclicBarrier。
实际应用
- 并行初始化:等待多个服务初始化完成后启动主服务
- 并发测试:所有线程准备就绪后同时开始(反向用法)
- 分批处理:等待一批任务完成后处理下一批
// 反向用法:模拟并发——所有线程等待统一信号后同时执行
CountDownLatch startSignal = new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
startSignal.await(); // 所有线程在这里等待
doStressTest();
}).start();
}
startSignal.countDown(); // 一声令下,100 个线程同时执行
CyclicBarrier(循环栅栏)
N 个线程互相等待,全部到齐后一起继续。可以重复使用。
线程A ──────► 到达栅栏 ──── 等待 ────┐
线程B ──────► 到达栅栏 ──── 等待 ────┤ 全部到齐
线程C ──────► 到达栅栏 ──── 等待 ────┘ │
▼
(可选)执行 barrierAction
│
▼
所有线程同时继续
← 栅栏自动重置,可再次使用 →
用法
// 3 个线程到齐后执行汇总
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("全部到齐,执行汇总");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
doPartialWork();
barrier.await(); // 等待其他线程
// 所有线程从这里继续
}).start();
}
CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 等待方向 | 一个线程等待 N 个线程 | N 个线程互相等待 |
| 可重用 | ❌ 一次性 | ✅ 自动重置 |
| 回调 | 无 | 支持 barrierAction |
| countDown 与 await | 分离(不同线程) | 合一(同一线程) |
| 实现 | 基于 AQS 共享模式 | 基于 ReentrantLock + Condition |
Semaphore(信号量)
控制同时访问某个资源的线程数量。像停车场一样——总共 N 个车位,满了就排队等。
Semaphore semaphore = new Semaphore(3); // 3 个许可
// 每个线程获取一个许可
semaphore.acquire(); // 获取许可,若无许可则阻塞
try {
accessResource(); // 最多 3 个线程同时执行
} finally {
semaphore.release(); // 释放许可
}
典型应用
- 限流:控制对数据库连接池、API 的并发访问数
- 资源池:控制同时使用的资源数量
- 互斥锁:
new Semaphore(1)等同于互斥锁(但信号量没有"所有者"概念,任何线程都可以 release)
// 限制数据库并发连接数为 10
Semaphore dbSemaphore = new Semaphore(10);
public Connection getConnection() throws InterruptedException {
dbSemaphore.acquire();
try {
return dataSource.getConnection();
} catch (Exception e) {
dbSemaphore.release(); // 获取失败也要释放
throw e;
}
}
public void releaseConnection(Connection conn) {
conn.close();
dbSemaphore.release();
}
Exchanger(交换器)
两个线程在同步点交换数据。相对冷门,但在特定场景很有用。
Exchanger<String> exchanger = new Exchanger<>();
// 线程 A
new Thread(() -> {
String dataA = "来自 A 的数据";
String fromB = exchanger.exchange(dataA); // 阻塞等待 B
System.out.println("A 收到: " + fromB);
}).start();
// 线程 B
new Thread(() -> {
String dataB = "来自 B 的数据";
String fromA = exchanger.exchange(dataB); // 阻塞等待 A
System.out.println("B 收到: " + fromA);
}).start();
典型场景:双缓冲区——生产者填一个缓冲区,消费者读另一个缓冲区,填满/读完后交换。
CompletableFuture(JDK 8+)
Future 的增强版,支持链式调用和组合操作,是 Java 中异步编程的核心工具。
Future 的局限
Future<String> future = executor.submit(() -> fetchData());
String result = future.get(); // 阻塞!
// Future 不支持回调,不能组合多个异步任务
CompletableFuture 基本用法
// 创建异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return fetchFromDB(); // 在 ForkJoinPool.commonPool() 中执行
});
// 链式处理结果(不阻塞)
future
.thenApply(data -> process(data)) // 转换结果
.thenAccept(result -> save(result)) // 消费结果
.exceptionally(ex -> { // 异常处理
log.error("失败", ex);
return null;
});
核心 API 分类
创建
CompletableFuture.supplyAsync(() -> "有返回值", executor);
CompletableFuture.runAsync(() -> doWork(), executor);
CompletableFuture.completedFuture("已完成的值");
转换(thenApply 系列)
future.thenApply(s -> s.toUpperCase()); // 同步转换
future.thenApplyAsync(s -> s.toUpperCase()); // 异步转换
消费(thenAccept / thenRun 系列)
future.thenAccept(s -> System.out.println(s)); // 消费结果
future.thenRun(() -> System.out.println("完成")); // 不关心结果
组合(两个任务的关系)
// 串行:A 完成后用 A 的结果创建 B
future.thenCompose(a -> fetchDetail(a)); // flatMap 语义
// 并行:A 和 B 都完成后合并结果
futureA.thenCombine(futureB, (a, b) -> a + b);
// A 或 B 任一完成(取最快的)
futureA.applyToEither(futureB, result -> result);
批量操作
// 等待所有完成
CompletableFuture.allOf(f1, f2, f3).join();
// 等待任一完成
CompletableFuture.anyOf(f1, f2, f3).join();
实际案例:并行调用多个服务
public UserProfile getUserProfile(long userId) {
// 并行调用三个服务
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() -> userService.getById(userId));
CompletableFuture<List<Order>> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.getByUserId(userId));
CompletableFuture<Integer> scoreFuture =
CompletableFuture.supplyAsync(() -> creditService.getScore(userId));
// 等待全部完成并合并
return userFuture.thenCombine(orderFuture, (user, orders) -> {
UserProfile profile = new UserProfile(user);
profile.setOrders(orders);
return profile;
}).thenCombine(scoreFuture, (profile, score) -> {
profile.setCreditScore(score);
return profile;
}).join(); // 阻塞等待最终结果
}
三个远程调用并行执行,总耗时 ≈ max(三个调用耗时),而非三者之和。
异常处理
future
.handle((result, ex) -> {
if (ex != null) {
return "默认值"; // 有异常用默认值
}
return result;
})
.whenComplete((result, ex) -> {
// 类似 finally,无论成功失败都执行
log.info("任务完成: {}", result);
});
| 方法 | 有异常时 | 无异常时 | 返回值 |
|---|---|---|---|
exceptionally |
执行,提供默认值 | 不执行 | 有 |
handle |
执行 | 也执行 | 有 |
whenComplete |
执行 | 也执行 | 无(不改变结果) |
Phaser(JDK 7+)
CyclicBarrier 的增强版,支持动态调整参与者数量和分阶段执行。
Phaser phaser = new Phaser(3); // 3 个初始参与者
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 第 1 阶段
doPhase1();
phaser.arriveAndAwaitAdvance(); // 等待其他线程
// 第 2 阶段
doPhase2();
phaser.arriveAndAwaitAdvance();
// 退出
phaser.arriveAndDeregister(); // 动态减少参与者
}).start();
}
工具类选型指南
| 需求 | 推荐工具 |
|---|---|
| 等待 N 个任务完成 | CountDownLatch |
| N 个线程互相等待(可重复) | CyclicBarrier |
| 限制并发数 | Semaphore |
| 异步编程 + 链式调用 | CompletableFuture |
| 两个线程交换数据 | Exchanger |
| 动态参与者 + 分阶段同步 | Phaser |
| 定时/周期任务 | ScheduledExecutorService |
生产环境核心踩坑点
| 问题 | 答案要点 |
|---|---|
| CountDownLatch 和 CyclicBarrier 的区别? | 前者一对多/一次性,后者多对多/可重复 |
| Semaphore 的作用? | 控制并发数,限流 |
| CompletableFuture 有哪些核心方法? | supplyAsync, thenApply, thenCombine, allOf, exceptionally |
| CompletableFuture 默认用什么线程池? | ForkJoinPool.commonPool(),I/O 任务应指定自定义线程池 |
| allOf 和 anyOf 的区别? | allOf 等全部完成,anyOf 等任一完成 |