本文主要探讨tomcat线程池以及dubbo线程池和JDK原生线程池的区别

SpringBoot内置tomcat线程池的实现类

1
org.apache.catalina.core.StandardThreadExecutor

初始化方法

startInternal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void startInternal() throws LifecycleException {
// 创建一个任务队列,容量为Integer.MAX_VALUE
// TaskQueue继承了LinkedBlockingQueue
this.taskqueue = new TaskQueue(this.maxQueueSize);

// 线程工厂
TaskThreadFactory tf = new TaskThreadFactory(this.namePrefix, this.daemon, this.getThreadPriority());

// new一个tomcat自定义的ThreadPoolExecutor,核心线程数25,最大线程数200,非核心线程销毁时间60s
this.executor = new ThreadPoolExecutor(this.getMinSpareThreads(), this.getMaxThreads(), (long)this.maxIdleTime, TimeUnit.MILLISECONDS, this.taskqueue, tf);

// 设置重建线程时间 1s
this.executor.setThreadRenewalDelay(this.threadRenewalDelay);

// 设置parent,关联线程池对象
this.taskqueue.setParent(this.executor);

// 设置线程池状态为START
this.setState(LifecycleState.STARTING);
}

请求执行方法

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void execute(Runnable command) {
if (this.executor != null) {
// 执行下面的ThreadPoolExecutor的execute()
this.executor.execute(command);
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}

public void execute(Runnable command) {
// 这个字段统计的是队列里的任务和线程执行中的任务,原子类CAS自增
this.submittedCount.incrementAndGet();
try {
// 执行类似JDK的executeInternal()
this.executeInternal(command);
} catch (RejectedExecutionException var4) {
// 拒绝策略,线程达到最大线程数并且任务队列也满了
if (getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue) getQueue();
// force()尝试把这个线程放到队列
if (!queue.force(command)) {
// 放入队列失败,回滚提交计数
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}

private void executeInternal(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get();
// 当前线程数小于核心线程数,创建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
// 尝试添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
// 任务队列添加失败则创建新的线程
else if (!addWorker(command, false)) {
reject(command);
}
}

调整JDK对任务的处理办法

JDK对任务的处理办法是

  1. 先判断有没有到达核心线程数,没有就创建核心线程执行任务
  2. 否则放入队列,如果队列也满了,判断是否到达最大线程数,没有就创建非核心线程执行任务
  3. 否则执行拒绝策略

tomcat更换了处理办法,核心线程满了之后,如果任务数大于当前线程数,考虑先创建非核心线程,再放入队列

tomcat自定义TaskQueue,覆盖第2步 workQueue.offer(command)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public boolean offer(Runnable runnable) {
// 调用 LinkedBlockingQueue 的 offer(),把任务放入队列
if (parent==null) {
return super.offer(o);
}
// 当前线程数到达最大线程数,不可能创建非核心线程,所以任务加入队列
if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
// 其他情况说明,可能可以创建非核心线程

// 提交任务数 < 当前线程的数量,线程可以直接复用,直接丢到队列让空闲的运行线程去拉去就行,没必要新建线程执行
if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {
return super.offer(o);
}
// 到这里说明任务数比当前线程数多了,又当前线程数 < 最大线程数,返回false,就去调用原生JDK去创建非核心线程了
if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {
return false;
}
// if we reached here, we need to add it to the queue
return super.offer(o);
}

添加任务失败后先尝试把任务放进队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public boolean force(Runnable o) {
// 线程池状态处于SHUTDOWN状态,抛异常
if (parent == null || parent.isShutdown()) {
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
}
// 调用 LinkedBlockingQueue 的 offer()
return super.offer(o);
}

// 把任务放进队列
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 当前在队列或者线程运行中的所有任务数量,和submittedCount一个东西
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
final int c;
final Node<E> node = new Node<E>(e);
// 获取锁,保证只有一个线程的任务可以放入队列
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() == capacity)
return false;
// 任务放入队列
enqueue(node);
// count++
c = count.getAndIncrement();
// 如果队列还有空位置,通过AQS内的Condition的signal()唤醒其他阻塞的线程来放入任务到队列中
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 不是太懂
if (c == 0)
signalNotEmpty();
return true;
}

再来看看dubbo的线程池实现

Dubbo线程池的其中一个实现类

1
org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor

初始化方法

1
2
// 调用了ThreadPoolExecutor的构造器
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

请求执行方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}

try {
// 调用原生JDK ThreadPoolExecutor的excute()
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
throw new RejectedExecutionException(x);
}
}
}

private void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get();
// 当前线程数小于核心线程数,创建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
// 尝试添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
// 任务队列添加失败则创建新的线程
else if (!addWorker(command, false)) {
reject(command);
}
}

调整JDK对任务的处理办法

可以看到,只有两种可能,就是创建非核心线程,或者加入队列

那我们判断什么时候加入队列

  • 当前线程数 == 最大线程数,这时不能再创建非核心线程,任务只能加入队列
  • 提交线程数 < 当前线程数,说明有线程闲着,任务直接放进队列,让空闲线程拉取就行

其他情况下,一般来说,当前线程数不够提交线程数,基本就是创建非核心线程了

提交线程数的意思就是,当前正在执行任务的线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

// currentPoolThreadSize是当前线程数
int currentPoolThreadSize = executor.getPoolSize();

// 提交线程数 < 当前线程数,有空闲线程,直接放入队列
if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}

// 当前线程数 < 最大线程数,返回false,调用JDK的offer流程,创建非核心线程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}

// 当前线程数 == 最大线程数,加入队列
return super.offer(runnable);
}

添加任务失败后先尝试把任务放进队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
// 调用 LinkedBlockingQueue 的 offer()
return super.offer(o, timeout, unit);
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final int c;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 上可打断锁,保证每次只有一个线程可以任务放入队列
putLock.lockInterruptibly();
try {
// 队列满了,循环等待,直到任务可以入队,或者等待超时返回
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 任务放入队列
enqueue(node);
// count++
c = count.getAndIncrement();
// 如果队列还有空位置,通过AQS内的Condition的signal()唤醒其他阻塞的线程来放入任务到队列中
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}