在上一篇的文章中,简单的讲了线程池的创建和使用,本文则会深入源码对线程池的实现进行分析。
Table of contents
准备知识
ThreadPoolExecutor 类中有一个名为 ctl 的原子类型的整数,一共 32 位,高 3 位用于表示线程池的状态,低 29 位用于记录当前线程池中的线程数,所以理论上线程池中的最大线程数为 536870911 个,完全够了。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
本文中会提到的 ThreadPoolExecutor 中的关键方法
public void execute(Runnable command) { /*...*/ }
private boolean addWorker(Runnable firstTask, boolean core) { /*...*/ }
final void runWorker(Worker w) { /*...*/ }
private Runnable getTask() { /*...*/ }
以及关键对象
private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<Worker>();
任务提交和执行过程
当我们创建好了一个线程池,并提交一个任务的时候,线程池内部是怎么工作的呢?
(这里假设线程池处于 Running 状态,工作线程数量也未达到 corePoolSize)
首先从 ThreadPoolExecutor 类中的 execute 方法入手
public void execute(Runnable command) {
// ...
int c = ctl.get();
// 1.
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.
else if (!addWorker(command, false))
// 4.
reject(command);
}
简单来说,提交任务时
如果线程池的工作线程数小于 corePoolSize,则新建一个核心线程来执行任务
如果达到了 corePoolSize 则将任务放入阻塞队列
队列满了的话,会新建非核心线程来执行,直到线程数达到 maximumPoolSize
如果总的线程数大于 maximumPoolSize,则会拒绝当前任务。
这里 addWorker 是关键,看看具体的实现
private boolean addWorker(Runnable firstTask, boolean core) {
// 1. 第一部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 主要为了干这里的事情
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// 2. 第二部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建 Worker,封装线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加进入 HashSet
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 这里调用了 start 方法哦
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
上面大部分代码是在做校验和检查,做的事情其实很简单,主要分成两个部分:
第一部分是使用 CAS 的方式更新线程数量,这里会对线程池状态,已有的线程数量进行判断,会返回 false;
第二部分是新建一个 Worker,然后添加进 workers 集合,这里通过加锁保证同步,然后调用线程的 start 方法开始执行任务。
上面的 Worker 类,是 ThreadPoolExecutor 中的一个内部类,用于封装线程,最后也是通过 Worker 来执行任务的,在 ThreadPoolExecutor 类中使用了一个 HashSet 保存所有的 Worker。
Worker 类如下所示(省略了暂时不用关心的内容)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
//...
// 构造函数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// ...
}
该类持有一个 Thread 变量,在构造函数执行时,由线程工厂创建。
同时持有一个 Runnable 变量,也就是提交的任务。
在上面的 addWorker 方法中,调用了 Worker 类持有的 thread 的 start 方法,所以接下来就是 Worker 的 run 方法会被新建的线程执行,最后是执行了 runWorker 方法,接着往下看
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 这里是关键点,判断当前任务或者去队列中拿任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 这里执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里我们看到了最终执行任务的地方,整个任务的提交和执行的流程就完整了。
从上面的代码,还可以看出线程池有一个扩展点,就是 beforeExecute 方法和 afterExecute 方法,可以实现传递一些 ThreadLocal 对象,让线程池执行某些任务的时候可以获取到指定的 ThreadLocal 对象。
线程的复用 回到我们文章开头的疑问,当线程执行完当前任务后,线程为什么没有被销毁呢?
从上面的代码可以看出,整个代码是在一个 while 循环中,任务执行完后,会掉用 getTask 方法获取线程,接下来就看看该方法内部的实现:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 这里是关键
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
代码中注释了关键点,线程会去阻塞队列拿任务,如果拿到了就会返回并执行,如果没拿到会一直等待任务,这就是线程运行完任务,没有被回收的原因。
前面在讲 execute 方法的时候,当线程数达到了 corePoolSize 的时候,会将任务放入队列,如下
public void execute(Runnable command) {
// ...
// 这里将任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
// ...
}
// ...
}
任务被放入队列后,会唤醒等待的线程去执行任务,这样整个流程就对应上了。
好了,整个流程就是这样的。