Thread Pool Under the Hood & Source Code Analysis
In the previous article, we explored the parameters and execution flow of ThreadPoolExecutor from a high-level perspective. In this article, we will dive deep into the source code to uncover the mysteries of how a thread pool operates internally.
Before reading the source code, we can imagine the thread pool as a taxi company. The company has a few core elements:
- The Boss's Ledger: Records the company's current operating status (open or closed) and how many drivers are currently on duty. (
ctlvariable) - Drivers: The people who actually do the work, constantly taking orders and driving. (
Workerclass) - Dispatch Center: Responsible for receiving customer orders and assigning them to drivers or putting them into a waiting queue. (
execute()method) - Dispatch Logic: Where does a driver go for the next order after finishing one? What happens if there are no more orders? (
getTask()andrunWorker()methods)
1. The Boss's Ledger: The ctl Variable
A thread pool needs to manage two extremely important states:
- Running State (runState): For example, RUNNING (operating normally), SHUTDOWN (stopped taking new tasks), etc.
- Worker Count (workerCount): How many worker threads are currently in the thread pool.
In a concurrent environment, if two independent variables are used to store them separately, a heavy lock must be applied to guarantee their atomic updates when a state transition occurs. To pursue ultimate performance, Doug Lea (the author of JUC) packed these two states into the same 32-bit int variable, ctl.
// Initial state of ctl: RUNNING and worker count is 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29 bits
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // Approx. 500 million
// runState is stored in the high-order 3 bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Methods for packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // Get high 3 bits
private static int workerCountOf(int c) { return c & CAPACITY; } // Get low 29 bits
private static int ctlOf(int rs, int wc) { return rs | wc; } // Combine state and count
Why Design It This Way?
The core purpose of this design is lock-free concurrency. By combining two logical states into a single physical variable, the thread pool can directly use the CAS operations of AtomicInteger to update the run state and thread count simultaneously, thereby avoiding the overhead of locking.
2. Dispatch Center: Submitting Tasks with execute()
When we call the execute() method to submit a task, it is equivalent to sending an order to the dispatch center. The dispatch center will strictly follow the "three-step" strategy mentioned in the previous article.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// Step 1: If worker threads < core pool size
if (workerCountOf(c) < corePoolSize) {
// Directly try to add a core thread (Worker) to execute the task
if (addWorker(command, true))
return; // Exit if successful
c = ctl.get(); // Re-read ctl if addition failed due to concurrency
}
// Step 2: If core threads are full and the pool is RUNNING, try enqueuing the task
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Double-check: If the pool is no longer RUNNING after enqueuing, remove the task and execute the reject policy
if (! isRunning(recheck) && remove(command))
reject(command);
// If there are no threads in the pool (e.g., corePoolSize is 0, or core threads were reclaimed)
// Add a non-core thread to consume from the queue
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// Step 3: If the queue is also full, try adding a non-core thread
else if (!addWorker(command, false))
// If non-core threads cannot be added (reached maximumPoolSize), execute the reject policy
reject(command);
}
There is an elegant Double-Check mechanism here: after the task is successfully enqueued, the thread pool state is checked one more time. Why? Because there is a time gap between checking the state and enqueuing the task. During this gap, the thread pool might have been shutdown() by another thread. If no recheck is performed, this task would stay in the queue forever and never be executed.
3. The Driver: Design of the Worker Class
The addWorker() method, repeatedly called inside execute(), actually creates a Worker object internally and starts the thread bound to it.
The design of the Worker class is very clever:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // The physical thread bound to this Worker
Runnable firstTask; // The first task assigned upon initialization (if any)
Worker(Runnable firstTask) {
setState(-1); // Initialize AQS state, prevent interrupts before starting
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // Create thread using factory
}
public void run() {
runWorker(this); // Enter the main loop once the thread starts
}
// AQS methods used to implement a non-reentrant lock
protected boolean tryAcquire(int unused) { ... }
protected boolean tryRelease(int unused) { ... }
}
Why Must Worker Inherit From AQS?
You might ask: Since a lock is needed, why not just use the built-in ReentrantLock?
The core difference is: ReentrantLock is a reentrant lock, while Worker implements a non-reentrant lock.
In a thread pool, we need a mechanism to determine whether a thread is currently executing a task (busy).
- Worker holding the lock = executing a task = busy.
- Worker not holding the lock = waiting for a task = idle.
If a reentrant lock were used, then during methods like setCorePoolSize() that dynamically adjust parameters or interrupt idle threads, if the method is nestedly called by the same thread, the reentrant lock would let it pass. This would result in the thread currently "doing work" being mistakenly interrupted. A non-reentrant lock draws a strict boundary: a thread executing a task can never acquire the lock a second time, nor is it allowed to be interrupted as an idle thread.
4. Taking Orders & Working: runWorker() and getTask()
The Worker's run() method directly calls runWorker(), which is the core loop of every thread's lifecycle in the thread pool.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Allow interrupts (change state from -1 to 0)
boolean completedAbruptly = true; // Mark whether exited due to exception
try {
// Core loop: Continuously fetch tasks to execute.
// getTask() is a blocking method to fetch tasks from the queue.
while (task != null || (task = getTask()) != null) {
w.lock(); // Lock before executing the task, marking itself as "busy"
// Ensure that if the pool is in a STOP state, the thread is interrupted
if ((runStateAtLeast(ctl.get(), STOP) || ... ) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // Extension point for subclasses
Throwable thrown = null;
try {
task.run(); // Actually execute the business code we submitted
} catch (RuntimeException x) {
thrown = x; throw x;
} finally {
afterExecute(task, thrown); // Extension point for subclasses
}
} finally {
task = null;
w.completedTasks++; // Accumulate completed task count
w.unlock(); // Task done, unlock, mark as "idle"
}
}
completedAbruptly = false; // Exited loop normally (getTask returned null)
} finally {
// Cleanup work when thread exits
processWorkerExit(w, completedAbruptly);
}
}
How Are Threads Reclaimed? Look at getTask()
If threads in the thread pool cannot grab tasks for a long time, how are they reclaimed based on keepAliveTime? The secret lies in the getTask() method.
private Runnable getTask() {
boolean timedOut = false; // Mark whether the last fetch from the queue timed out
for (;;) {
int c = ctl.get();
// Check pool state: if SHUTDOWN and queue is empty, or STOP, return null to let the thread die
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 'timed' determines if this thread is subject to a lifespan limit
// If core thread timeout is enabled, or current threads > corePoolSize, fetching tasks requires a timeout
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// Determine if thread needs to be destroyed
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // Returning null ends the while loop in runWorker, killing the thread
continue;
}
try {
// Depending on 'timed', do a blocking wait or a timed blocking wait
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // Return null if timeout
workQueue.take(); // Wait indefinitely until task available
if (r != null)
return r; // Successfully fetched task, return to execute it
// Reaching here means poll timed out
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Here, the features of BlockingQueue are fully utilized:
- If the thread is not subject to timeout limits (e.g., it is a core thread and core thread timeout is not enabled), it calls
workQueue.take(). This puts the thread in a blocking state, consuming no CPU, until a new task is enqueued to wake it up. - If the thread is subject to timeout limits, it calls
workQueue.poll(keepAliveTime, TimeUnit). If it still hasn't grabbed a task after this period,pollreturns null. On the next iteration of the loop ingetTask, detectingtimedOut == true, it will return null torunWorker, thereby ending the thread's lifecycle.
Summary
The low-level design of ThreadPoolExecutor reflects performance considerations under high concurrency everywhere:
- Bitwise Operations & State Packing: A single
AtomicIntegercombines the running state and thread count, ensuring atomic state updates lock-free. - Double-Check & CAS: Throughout
execute,addWorker, andgetTask, lock-free concurrency principles and double-check mechanisms are fully utilized to handle race conditions. - Clever Use of AQS for Non-Reentrant Locks: By implementing a non-reentrant lock, it cleanly separates "working" and "idle" states, safely realizing dynamic parameter adjustments and graceful shutdowns.
- Perfect Synergy with BlockingQueues: Thread management (survival and destruction) relies completely on the characteristics of
BlockingQueue'stakeand timedpollmethods.
Once you understand these source code details, you will feel very confident when addressing questions like "why are core threads reclaimed?", "how to modify thread pool parameters dynamically?", or "how does a thread pool actually catch exceptions?".