本文参考

Java线程池实现原理及其在美团业务中的实践 (qq.com)

硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理 | Throwable (throwx.cn)

池化思想

线程池运用了池化的思想,类似的还有连接池、对象池,具有以下的一些好处:

  • 减少线程创建、销毁的损耗
  • 更快的响应任务请求
  • 更好的管理线程以及监控线程

线程池创建方式

Executors

Executors类的四种JDK定义好的线程池:底层还是封装的ThreadPoolExecutor

主要的区别就在于线程数和队列的配置,无论是哪种都会有OOM的风险(线程数过多或者队列的任务过多)

  • SingleThreadPool:单实例的线程池image-20240817161956449
  • FixedThreadPool:SingleThreadPool的多线程版本image-20240817162010491
  • CachedThreadPool:不能把任务暂存到同步队列image-20240817162023025
  • ScheduledThreadPool:根据任务放入时间先后,存储任务到堆中,不断从堆顶取任务image-20240817162037530

ThreadPoolExecutor

顶层父接口Executor只定义了一个方法execute()用于执行任务,ExecutorService实现了Executor接口,并对它进行拓展,如submit()方法、shutdown()方法等,ThreadPoolExecutor是子类实现,有许多方法都是在ThreadPoolExecutor具体实现的。

image-20240817162307402

七大参数:

  • 核心线程数:
  • 最大线程数:
  • 非核心线程能够空闲的最长时间:
  • 时间单位:
  • 任务队列:一般为BlockingQueue的子类
    • ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
    • LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
    • SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
    • DelayedWorkQueue:内部是堆的结构,按照延迟的时间长短排序,还有扩容机制(1/2时扩容)
  • 线程工厂
  • 任务拒绝策略:抛异常一般是比较重要的任务,需要被感知;丢弃任务的话一般是非核心任务如日志
    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

image-20240511191444695

线程池提交任务

有execute()和submit()两种方式提交任务,sunmit()方法有返回值,可以通过Future.get()来获取异步任务的执行结果

1
2
3
4
5
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
1
void execute(Runnable command);

深入线程池工作流程

主要关注一下几个方面

  1. 线程池自身状态
  2. 任务管理
  3. 线程管理

线程池自身状态

线程池内部用一个AtomicInteger的变量ctl来维护线程池状态(RunState)和线程数量(WorkCount),类似的也可以用一个Integer保存一个小写字符串中哪些字母出现过。

1
2
3
4
5
6
7
AtomicInteger ctl = rs(高三位)+ wc(线程数量) 
// 获取线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池的工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根据工作线程数和线程池状态获取 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池有5个状态:

  • RUNNING:正常运行状态
  • SHUTDOWN(温和关机):不接受新的任务,可以处理队列还未处理的任务
  • STOP(强制关机):不接受新的任务,也不处理队列的任务,同时中断在执行任务的线程
  • TIDYING:所有任务终止,没有工作线程
  • TERMINATED:结束
状态 位图 十进制值 描述
RUNNING 111 -536870912 可以接收新的任务和执行任务队列中的任务
SHUTDOWN 000 0 不再接收新的任务,但是会执行任务队列中的任务
STOP 001 536870912 不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务
TIDYING 010 1073741824 所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated()
TERMINATED 011 1610612736 钩子方法terminated()执行完毕

任务管理

任务执行流程

image-20240412192936326

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl
int c = ctl.get();
// 判断当前线程数是否 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 尝试添加一个有任务的新核心线程,添加成功直接返回
if (addWorker(command, true))
return;
// 添加失败再次获取ctl,因为ctl可能变化
c = ctl.get();
}
// 线程池是否正常状态,同时尝试把任务加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 添加成功需要double-check,看看当前线程池是否正常 && 是否需要创建新的非核心线程去执行队列内任务
int recheck = ctl.get();
// 如果线程池状态不正常,将任务从阻塞队列里面移除,启用拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 是否需要创建新的非核心线程去执行队列内任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列已满 && 当前线程数 < 最大线程数,尝试添加新的非核心线程,如果添加失败启用拒绝策略
else if (!addWorker(command, false))
reject(command);
}

任务缓冲

通过任务队列缓冲解耦任务和线程,实现经典的消费者-生产者模式,当核心线程都在工作,线程池先把任务添加到阻塞队列,当核心线程结束任务,会从阻塞队列拉取新的任务执行。

任务拒绝

当线程池的阻塞队列满了,并且线程数达到最大值,有新任务到来的时候,就会执行相应的拒绝策略,有4种常见的拒绝策略:

  • AbortPolicy抛出RejectedExecutionException异常,默认的策略,当出现问题容易让开发人员知道
  • DiscardPolicy丢弃任务,无异常抛出,适合一些边缘的业务
  • DiscardOldestPolicy丢弃队列中最早的任务,重新提交新任务
  • CallerRunsPolicy交给调用者线程执行,适合让所有任务都执行的情况

线程管理

Worker线程

Worker是ThreadPoolExecutor的一个内部类,继承了AQS并且实现了Runnable接口。

工作线程可以是创建的时候被分配了任务,也可以是空闲的时候通过getTask()去阻塞队列拉取任务。

1
2
3
4
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread; // Worker持有的线程
Runnable firstTask; // 初始化的任务,可以为null
}

线程创建

线程池通过addWorker()方法增加线程,需要先判断线程池状态,以及当前线程数,才会新建线程,新建线程的时候可能会给线程分配任务。

addWorker(Runnable firstTask, boolean core)

每个Worker内部持有一个线程,addWorker方法创建了一个Worker工作者,并且放入HashSet的容器中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 这个for就是在尝试把线程数 + 1,不重要
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

// 下面正式开始创建启动新线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 用内部类Worker包装了新线程和任务
w = new Worker(firstTask);
/* Worker的构造器,里面把state设置为-1,AQS为独占模式,并且没有可重入功能
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
*/
final Thread t = w.thread;
if (t != null) {
// 这里需要获取锁,才能访问修改workers和largestPoolSize
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把当前worker加入worker池(HashSet)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 添加线程回滚
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

线程回收

线程回收一般是指非核心线程的回收,线程池内部维护了一个线程引用的Hash表,当非核心线程在限定的存活时间内没有获取到新任务,Hash表会删除指向该线程的引用,在GC的时候就会回收该线程。

线程执行

在线程创建启动的时候,会调用Worker重写的run()方法,核心就是下面的runWorker()方法

空闲线程会在一个while循环里面不断地尝试获取队列的任务执行,对于核心线程是不限时的,而对于非核心线程是限时的。

runWorker(Worker w)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 把线程state从新建状态-1改为预备状态0,从而允许线程中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果Worker有任务则直接执行任务,否则循环获取队列中的任务
while (task != null || (task = getTask()) != null) {
// 禁止线程中断,此时state == 1
w.lock();
// double-check线程池正常 && 线程没有被打断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();

try {
// 钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 调用任务的run方法,正在执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法
afterExecute(task, thrown);
}
} finally {
// 清空task临时变量,这个很重要,否则while会死循环执行同一个task
task = null;
// 执行完成后自增当前工作线程执行的任务数量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 执行回收工作线程的逻辑
processWorkerExit(w, completedAbruptly);
}
}

getTask()

线程循环获取队列的任务,根据timed决定如何拉取任务,poll()是带超时时间的,take()是阻塞等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
// 上次poll()是否超时,一般只有非核心线程才会poll(),一般都是take()阻塞
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 线程状态不正常 && 阻塞队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

// 再次获取工作线程数
int wc = workerCountOf(c);

// Are workers subject to culling?
// allowCoreThreadTimeOut代表是否允许核心线程用poll(),默认为false
// timed一般就是指当前是否有非核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 拉取任务失败,返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 根据timed决定如何拉取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 到这里说明拉取任务失败,超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

线程池最佳实践

线程池的参数不好配置,特别是线程数要根据不同的场景(CPU密集N+1和IO密集2N)和机器的CPU核心数合理配置。

是否可以有比线程池更好的方案?

是否有可供参考的合理的参数配置方案?

动态化线程池配置

  • 关注核心参数:corePoolSize、maximumPoolSize、workQueue,并发性的场景主要是两种:
    • 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列;
    • 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,也要注意核心线程数不能太小,速度会太慢
  • 参数可动态修改:通过线程池提供的set方法可以动态修改参数
  • 增加线程池监控:当发生了抛出RejectedExecutionException异常,或者是线程数达到了阈值(阈值根据最大线程数设定)