线程池底层原理与源码剖析
在上一篇文章中,我们从宏观上了解了 ThreadPoolExecutor 的参数和执行流程。本文我们将深入到源码层面,揭开线程池内部运行的神秘面纱。
阅读源码之前,我们可以先将线程池想象成一家出租车公司。公司有几个核心要素:
- 老板的账本:记录公司当前营业状态(营业中还是已停业),以及当前有多少名司机在岗。(
ctl变量) - 司机:真正干活的人,他们不断地接单、跑活。(
Worker类) - 调度中心:负责接收客户订单,并分配给司机或者放入等待队列。(
execute()方法) - 派单逻辑:司机跑完一单后,去哪里接下一单?没单子了怎么办?(
getTask()和runWorker()方法)
一、老板的账本:ctl 变量
线程池需要管理两个极其重要的状态:
- 运行状态(runState):比如 RUNNING(正常营业)、SHUTDOWN(停止接客)等。
- 线程数量(workerCount):当前线程池里有多少个工作线程。
在并发环境下,如果用两个独立的变量来分别保存它们,当发生状态流转时,必须加重量级锁才能保证两个变量的原子更新。为了追求极致性能,Doug Lea(JUC 的作者)将这两个状态打包到了同一个 32 位的 int 变量 ctl 中。
// ctl 初始状态:RUNNING 且线程数为 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 约 5 亿
// 运行状态保存在高 3 位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 拆解和组装 ctl 的方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 取高 3 位
private static int workerCountOf(int c) { return c & CAPACITY; } // 取低 29 位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 将状态和数量合并
为什么这么设计?
这种设计的核心目的是无锁并发。通过将两个逻辑状态合并成一个物理变量,线程池就可以直接使用 AtomicInteger 的 CAS 操作来同时更新运行状态和线程数量,从而避免了加锁的开销。
二、调度中心:execute() 提交任务
当我们调用 execute() 方法提交任务时,就相当于给调度中心发了一个订单。调度中心会严格按照我们在上篇文章中提到的“三步走”战略来处理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 第 1 步:如果工作线程数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 直接尝试添加一个核心线程(Worker)来执行任务
if (addWorker(command, true))
return; // 添加成功就结束
c = ctl.get(); // 并发下添加失败,重新获取 ctl
}
// 第 2 步:如果核心线程满了,且线程池还在 RUNNING,尝试把任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查:如果入队后线程池不是 RUNNING 了,把任务移出队列并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果池里没有线程了(比如 corePoolSize 设为 0,或者核心线程被回收了)
// 添加一个非核心线程去消费队列
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第 3 步:如果队列也满了,尝试添加非核心线程
else if (!addWorker(command, false))
// 如果非核心线程也加不进去(达到了 maximumPoolSize),执行拒绝策略
reject(command);
}
这里有一个精妙的**双重检查(Double-Check)**机制:在任务成功放入队列后,又检查了一次线程池的状态。为什么?因为在判断状态和任务入队这两个步骤之间,存在时间差。在这个时间差内,线程池可能被其他线程调用了 shutdown()。如果不做复查,这个任务就会永远停留在队列里,得不到执行。
三、司机:Worker 类的设计
在 execute() 中反复调用的 addWorker() 方法,内部实际上是创建了一个 Worker 对象,并启动它绑定的线程。
Worker 类的设计非常巧妙:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // Worker 绑定的物理线程
Runnable firstTask; // 初始化时分配的第一个任务(如果有的话)
Worker(Runnable firstTask) {
setState(-1); // 初始化 AQS 状态,禁止在启动前被中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 利用工厂创建线程
}
public void run() {
runWorker(this); // 线程启动后,进入主循环
}
// AQS 方法,用于实现非重入锁
protected boolean tryAcquire(int unused) { ... }
protected boolean tryRelease(int unused) { ... }
}
为什么 Worker 要继承 AQS?
你可能会问:既然需要锁,为什么不直接使用内置的 ReentrantLock 呢?
核心区别在于:ReentrantLock 是可重入锁,而 Worker 实现的是一把非重入锁。
在线程池中,我们需要一种机制来判断一个线程当前是否正在执行任务(忙碌)。
- 获取到锁的 Worker = 正在执行任务 = 忙碌。
- 未获取到锁的 Worker = 正在等待获取任务 = 空闲。
如果使用可重入锁,那么在 setCorePoolSize() 等方法尝试动态调整参数、中断空闲线程时,如果它恰好被同一个线程嵌套调用,可重入锁就会放行,导致本不该被中断的“正在干活”的线程被误杀。不可重入的锁可以严格划清界限:正在执行任务的线程绝不能获取第二遍锁,也不允许被当作空闲线程中断。
四、接单与跑活:runWorker() 和 getTask()
Worker 的 run() 方法直接调用了 runWorker(),这就是线程池中每一个线程生命周期的核心循环。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断(将 state 从 -1 变为 0)
boolean completedAbruptly = true; // 标记是否因为异常退出
try {
// 核心循环:不断地拿任务执行。
// getTask() 是去队列里拿任务的阻塞方法。
while (task != null || (task = getTask()) != null) {
w.lock(); // 执行任务前加锁,标记自己"正在忙碌"
// 确保线程池是 STOP 状态时,线程一定被中断
if ((runStateAtLeast(ctl.get(), STOP) || ... ) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 提供给子类的扩展点
Throwable thrown = null;
try {
task.run(); // 真正执行我们提交的业务代码
} catch (RuntimeException x) {
thrown = x; throw x;
} finally {
afterExecute(task, thrown); // 提供给子类的扩展点
}
} finally {
task = null;
w.completedTasks++; // 统计完成的任务数
w.unlock(); // 任务完成,解锁,标记为"空闲"
}
}
completedAbruptly = false; // 正常退出循环(getTask 返回 null)
} finally {
// 线程退出时的清理工作
processWorkerExit(w, completedAbruptly);
}
}
线程是如何被回收的?看 getTask()
如果线程池里的线程一直拿不到任务,它们是如何根据 keepAliveTime 被回收的呢?秘密全在 getTask() 方法里。
private Runnable getTask() {
boolean timedOut = false; // 标记上一次从队列拿任务是否超时
for (;;) {
int c = ctl.get();
// 检查线程池状态,如果是 SHUTDOWN 且队列空,或 STOP,则返回 null 让线程死掉
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed 变量决定这个线程是不是有寿命限制的
// 如果开启了核心线程超时,或者当前线程数大于核心线程数,那么获取任务就需要超时时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 判断是否需要销毁线程
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // 返回 null 意味着 runWorker 的 while 循环结束,线程死亡
continue;
}
try {
// 根据 timed 决定是阻塞等待拿任务,还是带超时的阻塞等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时拿不到就返回 null
workQueue.take(); // 死等,直到拿到任务
if (r != null)
return r; // 成功拿到任务,回去执行
// 执行到这里,说明 poll 超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在这里,BlockingQueue 的特性被利用得淋漓尽致:
- 如果该线程不受超时限制(例如它是核心线程,且没开启核心线程超时),就调用
workQueue.take()。这会让线程处于阻塞状态,不占用 CPU,直到有新任务入队唤醒它。 - 如果该线程受超时限制,就调用
workQueue.poll(keepAliveTime, TimeUnit)。如果超过这个时间还是没拿到任务,poll会返回 null。getTask下一轮循环时检测到timedOut == true,就会返回 null 给runWorker,从而结束线程的生命周期。
总结
ThreadPoolExecutor 的底层设计处处体现着高并发下的性能考量:
- 位运算与状态打包:用一个
AtomicInteger组合运行状态和线程数量,无锁化保证状态更新的原子性。 - Double-Check 与 CAS:在
execute、addWorker、getTask的各个流程中,充分利用了无锁并发的思想和双重检查机制应对竞态条件。 - 巧用 AQS 实现非重入锁:通过实现非重入锁,清晰地切分了“工作”与“空闲”两种状态,从而安全地实现了动态调参和优雅停机。
- 阻塞队列的完美配合:将线程的管理(存活与销毁)完全依托于
BlockingQueue的take和带超时的poll的特性。
弄懂了这些源码细节,你在处理诸如“为什么核心线程也会被回收”、“如何动态修改线程池参数”、“线程池到底是怎么捕获异常的”等问题时,将会胸有成竹。