本文主要探讨tomcat线程池以及dubbo线程池和JDK原生线程池的区别
SpringBoot内置tomcat线程池的实现类
1
| org.apache.catalina.core.StandardThreadExecutor
|
初始化方法
startInternal()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| protected void startInternal() throws LifecycleException { this.taskqueue = new TaskQueue(this.maxQueueSize); TaskThreadFactory tf = new TaskThreadFactory(this.namePrefix, this.daemon, this.getThreadPriority()); this.executor = new ThreadPoolExecutor(this.getMinSpareThreads(), this.getMaxThreads(), (long)this.maxIdleTime, TimeUnit.MILLISECONDS, this.taskqueue, tf); this.executor.setThreadRenewalDelay(this.threadRenewalDelay); this.taskqueue.setParent(this.executor); this.setState(LifecycleState.STARTING); }
|
请求执行方法
execute()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public void execute(Runnable command) { if (this.executor != null) { this.executor.execute(command); } else { throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); } }
public void execute(Runnable command) { this.submittedCount.incrementAndGet(); try { this.executeInternal(command); } catch (RejectedExecutionException var4) { if (getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue) getQueue(); if (!queue.force(command)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } else { submittedCount.decrementAndGet(); throw rx; } } }
private void executeInternal(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) { reject(command); } }
|
调整JDK对任务的处理办法
JDK对任务的处理办法是
- 先判断有没有到达核心线程数,没有就创建核心线程执行任务
- 否则放入队列,如果队列也满了,判断是否到达最大线程数,没有就创建非核心线程执行任务
- 否则执行拒绝策略
tomcat更换了处理办法,核心线程满了之后,如果任务数大于当前线程数,考虑先创建非核心线程,再放入队列
tomcat自定义TaskQueue,覆盖第2步 workQueue.offer(command)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public boolean offer(Runnable runnable) { if (parent==null) { return super.offer(o); } if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) { return super.offer(o); } if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) { return super.offer(o); } if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) { return false; } return super.offer(o); }
|
添加任务失败后先尝试把任务放进队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public boolean force(Runnable o) { if (parent == null || parent.isShutdown()) { throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); } return super.offer(o); }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() == capacity) return false; enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
|
再来看看dubbo的线程池实现
Dubbo线程池的其中一个实现类
1
| org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor
|
初始化方法
1 2
| super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
请求执行方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); }
try { super.execute(command); } catch (RejectedExecutionException rx) { final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { throw new RejectedExecutionException(x); } } }
private void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) { reject(command); } }
|
调整JDK对任务的处理办法
可以看到,只有两种可能,就是创建非核心线程,或者加入队列
那我们判断什么时候加入队列
- 当前线程数 == 最大线程数,这时不能再创建非核心线程,任务只能加入队列
- 提交线程数 < 当前线程数,说明有线程闲着,任务直接放进队列,让空闲线程拉取就行
其他情况下,一般来说,当前线程数不够提交线程数,基本就是创建非核心线程了
提交线程数的意思就是,当前正在执行任务的线程数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } int currentPoolThreadSize = executor.getPoolSize(); if (executor.getActiveCount() < currentPoolThreadSize) { return super.offer(runnable); }
if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; }
return super.offer(runnable); }
|
添加任务失败后先尝试把任务放进队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException("Executor is shutdown!"); } return super.offer(o, timeout, unit); }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final int c; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0L) return false; nanos = notFull.awaitNanos(nanos); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
|