Skip to content
Go back

Java 线程池介绍

Updated:  at  07:25 PM

Table of contents

Open Table of contents

线程池的创建

线程池最主要的类是 ThreadPoolExecutor 这个类,该类的继承图如下所示:

ThreadPoolExecutor --> AbstractExecutorService --> ExecutorService --> Executor

其中 Executor 和 ExecutorService 接口为线程池重要的接口。

构造函数

ThreadPoolExecutor 类有 4 个构造函数,其中包含全部传入参数的构造函数如下:

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    //...
}

一共 7 参数:

其他的构造函数分别为上面这个构造函数的重载形式,使用了默认的 threadFactory 或者 handler,不再一一细说。

线程工厂

默认的线程工厂为 Executors 中的内部类 DefaultThreadFactory,通常情况下使用该工厂创建线程即可。

当然也可以自己实现,只需要实现 ThreadFactory 接口,可以自定义线程的命名,优先级,线程组,是否为守护进程等。

拒绝执行处理器

在 ThreadPoolExecutor 内部提供了 4 个内置的处理器,其中默认是使用的 AbortPolicy

自己实现 RejectedExecutionHandler 接口,实现 rejectedExecution 方法可以自定义任务拒绝策略。

Executors 类

为了方便线程池的使用,Executors 类还提供了创建几种典型线程池的工厂方法

线程池的状态

首先说明的是线程池的状态,需要与线程的状态区分开来。

本文使用的 Java 版本为 1.8.0_171,源码中提供的线程池的状态为以下 5 种

RUNNING: 可以接受新的任务并且会执行排队中的任务

SHUTDOWN: 不会接受新的任务,但是会执行已经提交的任务

STOP: 不会接受新的任务,也不会执行排队中的任务,并且会中断正在执行中的任务

TIDYING: 所有任务已经终止,worker 数量为 0,当状态过渡到 tidying 时,会执行 terminated() 函数。

TERMINATED: terminated() 函数执行完成

线程池的生命周期并不一定会经过每个状态,整个状态的转移过程可能为下面的情况之一

RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()

(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()

SHUTDOWN -> TIDYING
When both queue and pool are empty

STOP -> TIDYING
When pool is empty

TIDYING -> TERMINATED
When the terminated() hook method has completed

线程池的使用

讲解了所有的概念,下面来讲解一下一个线程池常规的工作流程。

首先创建一个线程池,corePoolSize 为 5,maximumPoolSize 为 8,keepAliveTime 为 60 秒,阻塞队列使用的是 ArrayBlockingQueue,队列大小为 20,使用默认的线程工厂,并且使用的拒绝策略处理器不做任何事情,直接丢弃任务。

ExecutorService pool = new ThreadPoolExecutor(
    5, 8, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20),
    Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

然后向线程池提交任务。

for (int i = 0; i < 100; i++) {
    pool.execute(() -> {
        try {
            Thread.sleep(2000);
            Thread.currentThread().getName();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

当我们提交任务的时候,线程池中会依次创建新的线程直到线程数量到 corePoolSize,后续的任务会被放入等待队列中,当队列排满后,线程池会判断线程数量是否达到了 maximumPoolSize,若没有,则新建线程来处理任务。注意此时新线程处理的是当前的任务,而不是排在队列头的任务。

当新建的线程数达到了 maximumPoolSize,就不会再继续新建线程,此时如果队列依然是满的,则会开始拒绝任务,这里我使用的拒绝策略是直接丢弃,不做任何处理,可以根据业务需要自定义拒绝策略。

当 shutdown() 函数被调用,线程池就进入 SHUTDOWN 状态,不会接收新的任务,但是会将已经提交的任务运行完毕。

pool.shutdown();

等到所有已提交的任务运行完成,线程池简单的使用流程就结束了。

有返回值的任务

在上面的例子中提交的任务就是让线程睡眠两秒钟,再打印一下线程名称,不需要任何的返回值。

但是在实际的使用中,很多任务是有返回值的,而 ExecutorService 提供了这样的接口,即 submit 方法,传入一个 Callable 对象,并返回一个 Future 对象。这样就可以实现对线程池任务的返回值进行获取了,如下:

Future<String> future = pool.submit(() -> Thread.currentThread().getName());

String result = future.get();

Future 的 get() 方法会阻塞当前线程,直到获取到运行线程的返回值为止。

批量提交任务

结合 Java8 的流的特性,使用 invokeAll() 方法可以进行批量任务的提交以及结果的获取。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    () -> "task1",
    () -> "task2",
    () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

executor.shutdown();

线程池的常规操作就到这里,下次讲解内部的具体实现。



Previous Post
Java 类加载机制详解
Next Post
Git 操作初体验