Table of contents
线程池的创建
线程池最主要的类是 ThreadPoolExecutor 这个类,该类的继承图如下所示:
ThreadPoolExecutor --> AbstractExecutorService --> ExecutorService --> Executor
其中 Executor 和 ExecutorService 接口为线程池重要的接口。
构造函数
ThreadPoolExecutor 类有 4 个构造函数,其中包含全部传入参数的构造函数如下:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//...
}
一共 7 参数:
- corePoolSize: 线程池中保持的线程数量,即使它们处于空闲状态。
- maximumPoolSize: 线程池中允许存在的最大的线程数量。
- keepAliveTime: 当线程池中的数量超过 corePoolSize 时,这是空闲状态下的线程等待进入终止状态的最长时间。
- unit: keepAliveTime 的时间单位。
- workQueue: 用于保存等待被执行的任务的队列。
- threadFactory: 创建新的线程池时使用的线程工厂。
- handler: 线程池中的线程全被占用以及等待队列被占满时,用于处理后续提交的任务的处理器。
其他的构造函数分别为上面这个构造函数的重载形式,使用了默认的 threadFactory 或者 handler,不再一一细说。
线程工厂
默认的线程工厂为 Executors 中的内部类 DefaultThreadFactory,通常情况下使用该工厂创建线程即可。
当然也可以自己实现,只需要实现 ThreadFactory 接口,可以自定义线程的命名,优先级,线程组,是否为守护进程等。
拒绝执行处理器
在 ThreadPoolExecutor 内部提供了 4 个内置的处理器,其中默认是使用的 AbortPolicy
- CallerRunsPolicy: 该策略是当任务被拒绝时,直接使用调用线程执行该任务。
- AbortPolicy: 任务被拒绝时,抛出 RejectedExecutionException 异常。
- DiscardPolicy: 任务被拒绝时,不做任何处理。
- DiscardOldestPolicy: 任务被拒绝时,让等待队列的最前面的任务出列并抛弃,然后再次提交本次任务。
自己实现 RejectedExecutionHandler 接口,实现 rejectedExecution 方法可以自定义任务拒绝策略。
Executors 类
为了方便线程池的使用,Executors 类还提供了创建几种典型线程池的工厂方法
- newFixedThreadPool: 固定线程数的线程池,即 maximumPoolSize 等于 corePoolSize。
- newSingleThreadExecutor: 只有一个线程的线程池。
- newCachedThreadPool: corePoolSize 为 0,即线程池中所有线程只要空闲 60 秒都会进入终止状态。
线程池的状态
首先说明的是线程池的状态,需要与线程的状态区分开来。
本文使用的 Java 版本为 1.8.0_171,源码中提供的线程池的状态为以下 5 种
RUNNING: 可以接受新的任务并且会执行排队中的任务
SHUTDOWN: 不会接受新的任务,但是会执行已经提交的任务
STOP: 不会接受新的任务,也不会执行排队中的任务,并且会中断正在执行中的任务
TIDYING: 所有任务已经终止,worker 数量为 0,当状态过渡到 tidying 时,会执行 terminated() 函数。
TERMINATED: terminated() 函数执行完成
线程池的生命周期并不一定会经过每个状态,整个状态的转移过程可能为下面的情况之一
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue and pool are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
线程池的使用
讲解了所有的概念,下面来讲解一下一个线程池常规的工作流程。
首先创建一个线程池,corePoolSize 为 5,maximumPoolSize 为 8,keepAliveTime 为 60 秒,阻塞队列使用的是 ArrayBlockingQueue,队列大小为 20,使用默认的线程工厂,并且使用的拒绝策略处理器不做任何事情,直接丢弃任务。
ExecutorService pool = new ThreadPoolExecutor(
5, 8, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
然后向线程池提交任务。
for (int i = 0; i < 100; i++) {
pool.execute(() -> {
try {
Thread.sleep(2000);
Thread.currentThread().getName();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
当我们提交任务的时候,线程池中会依次创建新的线程直到线程数量到 corePoolSize,后续的任务会被放入等待队列中,当队列排满后,线程池会判断线程数量是否达到了 maximumPoolSize,若没有,则新建线程来处理任务。注意此时新线程处理的是当前的任务,而不是排在队列头的任务。
当新建的线程数达到了 maximumPoolSize,就不会再继续新建线程,此时如果队列依然是满的,则会开始拒绝任务,这里我使用的拒绝策略是直接丢弃,不做任何处理,可以根据业务需要自定义拒绝策略。
当 shutdown() 函数被调用,线程池就进入 SHUTDOWN 状态,不会接收新的任务,但是会将已经提交的任务运行完毕。
pool.shutdown();
等到所有已提交的任务运行完成,线程池简单的使用流程就结束了。
有返回值的任务
在上面的例子中提交的任务就是让线程睡眠两秒钟,再打印一下线程名称,不需要任何的返回值。
但是在实际的使用中,很多任务是有返回值的,而 ExecutorService 提供了这样的接口,即 submit 方法,传入一个 Callable 对象,并返回一个 Future 对象。这样就可以实现对线程池任务的返回值进行获取了,如下:
Future<String> future = pool.submit(() -> Thread.currentThread().getName());
String result = future.get();
Future 的 get() 方法会阻塞当前线程,直到获取到运行线程的返回值为止。
批量提交任务
结合 Java8 的流的特性,使用 invokeAll() 方法可以进行批量任务的提交以及结果的获取。
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
executor.shutdown();
线程池的常规操作就到这里,下次讲解内部的具体实现。