“Ten men banded together in love can do what ten thousand separately would fail in.”
1. 为什么需要使用线程池?
在实际使用中,线程是很占用系统资源的,如果对线程管理不善很容易导致系统问题。因此,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有如下好处:
降低资源消耗。通过复用已经存在的线程和降低线程关闭的次数来尽可能地来降低系统性能消耗;提升系统响应速度。通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度;提高线程的可管理性。线程是稀缺资源如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此,需要使用线程池来管理线程。
2. 线程池的工作原理
当一个并发任务提交给线程池,线程池分配线程去执行任务的过程如下图所示:

从图中我们可以看出,线程池执行所提交的任务过程主要有这样几个阶段:
- 先判断线程池中
核心线程池所有的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。否则,如果核心线程池里的线程都在执行任务,则进入下一个流程。 - 判断当前
阻塞队列是否已经满,如果未满,则将提交的任务放置在阻塞队列中;否则,则进入第3步; - 判断
线程池中所有的线程是否都在执行任务,如果没有,则创建一个新的线程来执行任务,否则,则交给饱和策略进行处理。
3. 线程池的创建
创建线程池主要是由ThreadPoolExecutor类来完成,ThreadPoolExecutor类有许多重载的构造方法,我们通过参数最多的构造函数来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor的构造方法为:
1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//...
}
下面对参数进行说明:
- corePoolSize: 表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到corePoolSize,则会创建新的线程来执行所提交的任务,
即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了corePoolSize,则不再重新创建线程。如果调用了线程池的prestartCoreThread或者prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。 - maximumPoolSize: 表示线程池能创建线程的最大个数,如果阻塞队列已满时,并且当前线程池线程个数没有超过maximumPoolSize的话,就会创建新的线程来执行任务。
- keepAliveTime: 空闲线程存活时间。如果当前线程池的个数已经超过了corePoolSize,并且线程空闲时间超过了keepAliveTime的话,就会将这些空闲线程销毁,这样可以尽可能降低系统资源消耗。
- unit: 时间单位。为keepAliveTime指定的时间单位。
- workQueue: 阻塞队列。用于保存任务的阻塞队列,可以使用
ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue。 - threadFactory: 创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置有意义的名字,如果出现并发问题,也方便查找问题原因。
- handle: 饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况,并抛出RejectedExecutionException异常。
- AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常;
- CallerRunsPolicy: 只用调用者所在线程来执行任务;
- DiscardPolicy: 不处理直接丢弃掉任务;
- DiscardOldestPolicy: 丢弃掉阻塞队列中存放时间最久的任务,执行当前任务。
线程池执行逻辑
通过ThreadPoolExecutor创建线程后,提交任务后执行过程是怎样的,下面直接来看看源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//如果线程池的线程个数小于corePoolSize,则创建新线程执行当前任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程个数大于corePoolSize或者创建线程失败,则将任务存放在阻塞队列workQueue中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果当前任务无法存放进入阻塞队列,则创建新的线程来执行任务,如果addWorker返回false,则调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
下图给出了ThreadPoolExecutor的execute方法的执行示意图:

execute方法执行逻辑有这样几种情况:
- 如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务;
- 如果运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue之中;
- 如果当前workQueue队列已经满了的话,则会创建新的线程来执行任务;
- 如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理。
需要注意的是,线程池的设计思想就是使用了核心线程池corePoolSize,阻塞队列workQueue和线程池maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在并发框架中都会使用。
4. 线程池的关闭
关闭线程池,可以通过shutdown和shutdownNow这两个方法。它们的原理都是遍历线程池中所有的线程,然后依次中断线程。shutdown和shutdownNow还是有不一样的地方:
- shutdownNow首先将线程池的状态设置为STOP,然后尝试
停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表; - shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
可以看出shutdown方法会将正在执行的任务继续执行完,而shutdownNow会直接中断正在执行的任务。调用了这两个方法的任意一个,isShutdown方法都会返回true,当所有的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated方法才会返回true.
5.合理配置线程池参数
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高、中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU的个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
并且,阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。
6. 源码分析
我们首先来看一下线程池的类图:

6.1 Executor框架接口
Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将“任务提交”与“任务如何运行”分离开的机制。
J.U.C中有三个Executor接口:
- Executor:一个运行新任务的简单接口;
- ExecutorService: 扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
- ScheduleExecutorService: 扩展了ExecutorService。支持Future和定期执行任务。
下面来简要分析一下这三个接口:
Executor接口
1
2
3
public interface Executor {
void execute(Runnable command);
}
Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法,例如,使用Thread来创建并启动线程的代码如下:
1
2
Thread t = new Thread();
t.start();
而使用Executor来启动线程执行任务的代码如下:
1
2
Thread t = new Thread();
executor.execute(t);
对于不同的Executor实现,execute()方法可能是创建一个新的线程并立即启动,也有可能是使用已有的工作线程来运行传入的任务,也可能是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入到阻塞队列中或者拒绝接收传入的线程。
ExecutorService接口
ExecutorService接口继承至Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成Future的方法。其增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。
ScheduledExecutorService接口
ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
6.2 ThreadPoolExecutor分析
ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。我们来看看AbstractExecutiorService的实现。
AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。
这个抽象类实现了 invokeAny 方法和 invokeAll 方法,这里的两个 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。
定义于最上层接口 Executor中的 void execute(Runnable command) 由于不需要获取结果,不会进行 FutureTask 的包装。
“需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。”
下面,我将一行一行源码地来分析这个类,跟着源码来看看其实现吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
// 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行,execute 方法由具体的子类来实现(FutureTask 间接实现了Runnable 接口。)
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
// 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
// 第二个参数timed代表是否设置超时机制,超时时间为第三个参数
// 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// 任务数
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// ExecutorCompletionService 不是一个真正的执行器,参数 this 才是真正的执行器
// 它对执行器进行了包装,每个任务结束后,将结果保存到内部的一个 completionQueue 队列中
// 这也是为什么这个类的名字里面有个 Completion 的原因吧。
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
// 用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
// 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交
futures.add(ecs.submit(it.next()));
--ntasks;
// 正在执行的任务数(提交的时候 +1,任务结束的时候 -1)
int active = 1;
for (;;) {
// ecs 上面说了,其内部有一个 completionQueue 用于保存执行完成的结果
// BlockingQueue 的 poll 方法不阻塞,返回 null 代表队列为空
Future<T> f = ecs.poll();
// 为 null,说明刚刚提交的第一个线程还没有执行完成
// 在前面先提交一个任务,加上这里做一次检查,也是为了提高性能
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 这里的 active == 0,说明所有的任务都执行失败,那么这里是 for 循环出口
else if (active == 0)
break;
// 这里也是 else if。这里说的是,没有任务了,但是设置了超时时间,这里检测是否超时
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 这里是 else。说明,没有任务需要提交,但是池中的任务没有完成,还没有超时(如果设置了超时)
// take() 方法会阻塞,直到有元素返回,说明有任务结束了
else
f = ecs.take();
}
/*
* 我感觉上面这一段并不是很好理解,这里简单说下。
* 1. 首先,这在一个 for 循环中,我们设想每一个任务都没那么快结束,
* 那么,每一次都会进到第一个分支,进行提交任务,直到将所有的任务都提交了
* 2. 任务都提交完成后,如果设置了超时,那么 for 循环其实进入了“一直检测是否超时”
这件事情上
* 3. 如果没有设置超时机制,那么不必要检测超时,那就会阻塞在 ecs.take() 方法上,
等待获取第一个执行结果
* 4. 如果所有的任务都执行失败,也就是说 future 都返回了,
但是 f.get() 抛出异常,那么从 active == 0 分支出去
*/
// 有任务结束了
if (f != null) {
--active;
try {
// 返回执行结果,如果有异常,都包装成ExecutionException
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 方法退出之前,取消其他的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// 执行所有的任务,返回任务结果。
// 先不要看这个方法,我们先想想,其实我们自己提交任务到线程池,也是想要线程池执行所有的任务
// 只不过,我们是每次 submit 一个任务,这里以一个集合作为参数提交
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
// 包装成 FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 提交任务
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// 这是一个阻塞方法,直到获取到值,或抛出了异常
// 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的
// 可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 这个方法返回,不像其他的场景,返回 List<Future>,其实执行结果还没出来
// 这个方法返回是真正的返回,任务都结束了
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
// 带超时的 invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
到这里,我们发现,这个抽象类包装了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法还没出现,需要等具体执行器来实现这个最重要的部分,这里我们要说的就是 ThreadPoolExecutor 类了。我们来具体地分析一下:
1. 重要字段
1
2
3
4
5
6
7
8
9
10
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount),这里可以看出,使用了Integer类型来保存,高3位保存runState,低29位保存workCount。COUNT_BITS就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workCount的上限值,大约是5亿。
下面再介绍一下线程池的运行状态。线程池一共有五种状态,分别是:
- RUNNING: -1«COUNT_BITS,即高3位为1,低29位为0,能接受新提交的任务,并且也能够处理阻塞队列中的任务;
- SHUTDOWN: 0«COUNT_BITS,即高3位为0,低29位为0,关闭状态,不再接受新提交的任务,但是却可以继续处理阻塞队列中已经保存的任务。在线程池处于RUNNING状态时,调用shutdown()方法会使得线程池进入到该状态。(finalize()方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP: 1«COUNT_BITS,即高3位为001,低29位为0,不能接受新任务,也不能处理队列中的任务,会中断正在处理任务的线程。在线程池处于RUNNING或SHUTDOWN状态时,调用shutdownNow()方法会使线程池进入到该状态;
- TIDYING: 2«COUNT_BITS,即高3位为010,低29位为0,如果所有的任务都已经终止了,workerCount(有效线程数)为0,线程池进入该状态后会调用terminated()方法进入TERMINATED状态。
- TERMINATED: 3«COUNT_BITS,即高3位为011,低29位为0,在terminated()方法执行完后进入该状态,默认terminated()方法中什么也没有做。进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或者TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
下面为线程池的转换过程:

2. ctl的相关方法
这里还有几个对ctl进行计算的方法:
1
2
3
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- runStateOf: c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态。
- workerCountOf: c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量,即获取活动线程数。
- ctlOf: 参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl,即获取运行状态和活动线程数的值。
3. 构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法中的字段含义如下:(前面已经介绍过,这里再回顾一下)
- corePoolSize: 核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:
- 如果运行的线程少于corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于corePoolSize且小于maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
- 如果设置的corePoolSize和maximumPoolSize相同,则创建的线程池大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从WorkQueue中取任务并处理;
- 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务。
所以,任务提交时,判断的顺序是corePoolSize -> workQueue -> maximumPoolSize
- maximumPoolSize: 最大线程数量。
- workQueue: 等待队列,当任务提交时如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列。当我们提交一个新的任务到线程池之后,线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,其中主要有以下几种处理方式:
- 直接切换: 这种方式常用的队列就是SynchronousQueue,其为一个不存储元素的阻塞队列。每个插入操作必须等待另外一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
- 使用无界队列: 一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不起作用了。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入到等待队列。
- 使用有界队列: 一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能降低资源的消耗,但同时这种方式也使得线程池的调度变得更困难,因为线程池和队列的容量都是有限的值,所以想要使线程池处理任务的吞吐率达到一个相对合理的范围,又想使得线程调度相对简单,并且还要尽可能降低线程池对资源的消耗,就需要合理的设置这两个数量。
- 如果想要降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等),可以设置较大的队列容量和较小的线程池容量,但这样也会降低线程处理任务的吞吐量。
- 如果提交的任务经常发生阻塞,那么可以考虑通过调用setMaximumPoolSize()方法来重新设定线程池的容量。
- 如果队列的容量设置较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但是如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
- keepAliveTime: 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime。
- threadFactory: 它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory()来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
- handler: 它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务。
4. 源码解读之执行流程
我们先从线程池的任务提交方法execute()开始阅读,从execute()我们会发现线程池执行的核心方法是addWorker(),在addWorker()中我们发现启动线程调用了start()方法,调用start()方法之后会执行Worker类的run()方法,run里面调用runWorker(),运行程序的关键在于getTask()方法,getTask()方法之后就是此线程的关闭,整个线程池的工作流程也就完成了。
4.1 execute()方法
execute()方法用来提交任务,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl记录着runState和workerCounter
int c = ctl.get();
/*
* workerCount方法取出低29位的值,表示当前活动的线程数;
* 如果当前活动线程数小于corePoolSize,则新建一个线程放入到线程池中;
* 并把任务添加到该线程
*/
if (workerCountOf(c) < corePoolSize) {
// addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是根据maximumPooLSize来判断;
// 如果为true,根据corePoolSize来判断;
// 如果为false,则根据maximumPoolSize来判断
if (addWorker(command, true))
return;
// 如果添加失败,重新获取ctl值
c = ctl.get();
}
/*
* 如果当前线程池是运行状态并且任务添加到队列中成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl的值
int recheck = ctl.get();
// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,这时候需要移除该command
// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法,这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中创建一个线程,但是不去启动;
* 2. 第二个参数为false,将线程池的有限数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount大于0,则直接返回,在workCount中新增的command会在将来的某个时刻被执行
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是Running状态,但是workerCount >= corePoolSize并且workQueue已满
* 这时,再次调用addWorker方法,但是第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
简单的来说,在执行execute()方法时如果状态一直是RUNNING时的执行过程如下所示:
- 如果
workerCount < corePoolSize,则创建并启动一个新的线程来执行新提交的任务; - 如果
workerCount >= corePoolSize,且线程池的阻塞队列未满,则将任务添加到阻塞队列中; - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池的阻塞队列已经满,则创建并启动一个新的线程来执行新提交的任务; - 如果
workerCount >= maximumPoolSize, 并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。
这里需要注意一下addWorker(null,false);,也就是创建一个线程,但是并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null,false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute的执行流程如下所示:

代码执行逻辑如下图所示:

4.2 addWorker(Runnable firstTask, boolean core)方法
在execute()方法源码中,我们看到了addWorker()方法,其主要工作是在线程池中创建一个新的线程并执行。其中,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 自旋进行线程状态check
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 这个if进行判断:
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
* 接着判断以下3个条件,只要1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但是却可以继续阻塞处理阻塞队列中已经保存的任务
* 2. firstTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况,这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false
* 因为队列中已经没有任务了,不需要再添加线程了
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试CAS增加workerCount,如果成功,则跳出第一个for循环(因为retry: 是标记第一个for循环的,是用来操纵外层的for循环)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已经被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 代码执行到这里,说明worker的数量成功加1,则可以进行worker的构造过程
// worker是否已经启动
boolean workerStarted = false;
// 是否将这个worker添加到workers这个hashSet中
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程,worker内部真正用来执行任务的线程,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs 是RUNNING状态或者rs是SHUTDOWN状态并且firstTask是null,向线程池中添加线程。
// 因为在SHUTDOWN时不会再添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker里面的thread可不能是已经启动了的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到workers这个hashSet中
workers.add(w);
int s = workers.size();
// largeestPoolSize用于记录workers中的个数的最大值
// 因为workers是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
// 若线程没有启动,做一些清理工作,若前面 workCount 加了 1,将其减掉
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
注意一下这里的t.start()这个语句,启动时会调用Worker类中的run()方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
addWorker方法有4种传参的方式:
- addWorker(command,true)
- addWorker(command,false)
- addWorker(null,false)
- addWorker(null,true)
在execute()方法中使用了前3种,结合这个核心方法进行以下分析:
- 第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
- 第二个: 当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
- 第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
- 第四个: 这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行。
addWorker的执行流程如下所示:
- 判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以return false:
- 线程池状态 > shutdown,可能为stop、tidying、terminated,不能添加worker线程
- 线程池状态 == shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务;
- 线程池状态 == shutdown,firstTask == null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义。
- 线程池当前线程数量是否超过上限(coolPoolSize或maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步
- 在线程池的ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁,并启动worker线程,如果这一切都成功了,return true,如果新添加worker进入Set失败或者启动失败,调用addWorkerFailed逻辑。

4.3 Worker类
线程池中的每一个线程被封装成为一个Worker对象,ThreadPool维护的其实就是一组Worker对象,我们来看一下Worker的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Worker类大体上管理着运行线程的中断状态 和 一些指标
* Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁
* 这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程
* 解释:为什么不直接执行execute(command)提交的command,而要在外面包一层Worker呢??
* 主要是为了控制中断,用什么控制??
* 用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁
* 只有在等待从workQueue中获取任务getTask()时才能中断
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 利用ThreadFactory和worker这个Runnable创建的线程对象
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 设置AQS的同步状态,private volatile int state,是一个计数器,大于0代表锁已经被获取
// 在调用runWorker()前,禁止interrupt中断,在interruptIfStarted()方法中会判断getState() >= 0
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 根据当前worker创建一个线程对象,当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,而是调用当前worker.run()时调用firstTask.run()
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和Thread属性:firstTask用它保存传入的任务;thread是在调用构造方法的时候通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也是一个线程,所以一个Worker对象在启动的时候会调用Worker类中国的run()方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire()方法,它是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行shutdown方法或者tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池汇总的线程是否是空闲状态;
- 之所以设置不可重入,
是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时可以再次获取锁(重入)。给出解释如下所示:- setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock();
- 如果可重入,就可能会在对线程池操作的方法中中断线程。类似的方法还有setMaximumPoolSize()、setKeppAliveTime()、allowCoreThreadTimeOut()、shutdown()。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。此外,在构造方法中执行了setState(-1),把state变量设置为-1,这是因为AQS默认的state是0,如果刚刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断。
查看上面的tryAcquire()方法我们可以看到,其是根据state是否为0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.
4.4 runWorker()方法
在Worker类中的run()方法调用了runWorker方法来执行任务,其代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// worker线程启动后调用,while循环(即自旋)不断地从等待队列获取任务并执行
// worker初始化后,可以指定firstTask,那么第一个任务也就可以不需要从队列中获取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
//允许中断,new Worker()是设置state == -1,此处是调用worker类的tryRelease()方法,将state设置为0,而InterruptIfStarted()只有state >= 0才允许调用中断
w.unlock(); // allow interrupts
// 是否因为异常退出循环
boolean completedAbruptly = true;
try {
// 如果task为空,则通过getTask来获取任务
while (task != null || (task = getTask()) != null) {
//上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的worker
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/*
* clearInterruptsForTaskRun操作
* 确保只有在线程stoping时,才会设置中断标示,否则清除中断标示
* 1. 如果线程池状态 >= stop,且当前线程没有设置中断状态,wt.interrupt()
* 2. 如果一开始判断线程池状态< stop,但是Thread.Interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程状态是否>=stop
* 是,再次设置中断标示,wt.interrupt()
* 否, 不做操作,清除中断标示后进行后续步骤
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前(钩子函数,子类实现)
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后,也是一个钩子方法,将 task 和异常作为参数,留给需要的子类实现
afterExecute(task, thrown);
}
} finally {
//置空task,准备getTask下一个任务
task = null;
//累加完成的任务数
w.completedTasks++;
//释放掉worker的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 到这里,需要执行线程关闭
// 1.说明getTask返回null,也就是说,这个worker的使命结束了,执行关闭
// 2. 任务执行过程中发生可异常:
// 2.1 已经在代码中处理了将workCount减1,这个在getTask方法中细说
// 2.2 workCount没有处理,所以需要在processWorkerExit中处理
processWorkerExit(w, completedAbruptly);
}
}
这里需要说明一下第一个if判断,其目的是:
- 如果线程池正在停止,那么要保证当前线程是中断状态;
- 如果不是的话,则要保证当前线程不是中断状态。
这里就要考虑在执行if语句期间可能也执行了shutdownNow方法,shutdownNow方法会将状态设置为STOP,回顾一下STOP状态:
不能接受新任务,也不能处理队列中的任务,会中断正在处理任务的线程。在线程池处于RUNNING或SHUTDOWN状态时,调用shutdownNow()方法会使线程池进入到该状态。
STOP状态要中断线程池中的所有线程,而这里Thread.interrupted()用来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。
总结一下runWorker方法的执行过程:
- Worker线程启动后,通过Worker类的run()方法调用runWorker(this)
- 执行任务之前,首先worker.unlock(),将AQS的state置为0,允许中断当前worker线程
- 开始执行firstTask,调用task.run(),在执行任务前会上锁worker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断
- 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法
- 无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程
- 如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程
代码里的beforeExecute()方法和afterExecute()方法在ThreadPoolExecutor类中是空实现的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

4.5 getTask()方法
getTask()方法用来从阻塞队列中获取任务,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/* 此方法有三种可能
* 1. 阻塞直到获取任务返回。默认CorePoolSize之内的线程是不会被回收的,它们会一直等待任务
* 2. 超时退出。keepAliveTime起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
* 3. 如果发生了以下条件,必须返回null:
* 1. 池中有大于maximumPoolSize个workers存在(通过调用setMaximumPoolSize进行设置)
* 2. 线程池处于SHUTDOWN,而且workQueue是空的,前面说了,这种情况不接收新的任务
* 3. 线程池处于STOP,不仅不接收新的线程,连workQueue中的线程也不再执行
*/
private Runnable getTask() {
//timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
// 1. rs >= STOP,线程是否正在STOP;
// 2. 阻塞队列是否为空
// 如果满足上述条件,则将workerCount减去1并返回null。
// 因为如果当前线程池状态的值是SHUTDOWM或以上时,不允许再向阻塞队列中添加任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed变量用于判断是否需要进行超时控制
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,则返回重试
* 如果wc == 1,也就说明当前线程是线程池中唯一的一个线程了
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞知道队列不为空
* poll() - 使用 LockSupport.parkNanos(this, nanosTimeout) 挂起一段时间,interrupt()时不会抛异常,但会有中断响应
* take() - 使用 LockSupport.park(this) 挂起,interrupt()时不会抛异常,但会有中断响应
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
/*
* blockingQueue的take()阻塞使用LockSupport.park(this)进入wait状态的,对LockSupport.park(this)进行interrupt不会抛异常,但还是会有中断响应
* 但AQS的ConditionObject的await()对中断状态做了判断,会报告中断状态 reportInterruptAfterWait(interruptMode)
* 就会上抛InterruptedException,在此处捕获,重新开始循环
* 如果是由于shutdown()等操作导致的空闲worker中断响应,在外层循环判断状态时,可能return null
*/
timedOut = false; // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
}
}
}
这里最重要的部分是第二个if判断,目的是控制线程池内的有效线程数量。由上文的分析我们可以知道,在执行execute方法的时候,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满的时候,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
我们来总结一下执行流程:
- 首先判断是否可以满足从workQueue中获取任务的条件,不满足return null:
- 线程池是否满足:
- shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
- stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
- 线程数量是否超过maximumPoolSize 或 获取任务是否超时:
- 线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize
- 如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
- 线程池是否满足:
- 如果满足获取任务条件,根据是否需要定时获取调用不同方法:
- workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
- workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
- 在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。
4.6 processWorkerExit()方法
直接看代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1、worker数量-1
* 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
* 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
/**
* 2、从Workers Set中移除worker
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
/**
* 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
* 主要是判断线程池是否满足终止的状态
* 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
* 没有线程了,更新状态为tidying->terminated
*/
tryTerminate();
/*
* 4、是否需要增加worker线程
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//添加一个没有firstTask的worker
//只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
addWorker(null, false);
}
}
processWorkerExit(Worker w, boolean completedAbruptly)的两个参数:
- worker: 表示要结束的worker
- completedAbruptly; 是否突然完成(是否因为异常退出)
执行流程如下所示:
- worker数量是否减去1:
- 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
- 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
- 从Workers Set中移除worker,删除时需要上锁mainlock
- tryTerminate():在对线程池有负效益的时候,都需要“尝试终止”线程池,大致逻辑为,判断线程池是否满足终止的状态:
- 如果状态满足,但线程池中还有线程,尝试对其发出中断响应,使其能够进入退出流程;
- 没有线程了,更新状态为tidying->terminated
- 是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程。根据线程池状态是running或shutdown:
- 如果当前线程是突然终止的,addWorker()
- 如果当前线程不是突然终止的,但是当前线程数量小于要维护的线程数量,addWorker()
所以如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持这个corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程。
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute()方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask()获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如下图所示:

备注:tryTerminate()方法源码分析将在下一节进行详细介绍。
5. 源码分析之线程池终止
终止线程池主要有两个方法:shutdown()和shutdownNow()。
- shutdown()后线程池将变成SHUTDOWN状态,此时不接收新任务,但是会处理完正在运行的和在阻塞队列中等待处理的任务。
- shutdownNow()后线程池将变成STOP状态,此时不接收新任务,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程。
5.1 shutdown()
shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 开始一个有序的关闭,在关闭中,之前提交的任务会被执行(包含正在执行的,在阻塞队列中的),但新任务会被拒绝
* 如果线程池已经shutdown,调用此方法不会有附加效应
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* 当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 判断调用者是否拥有权限shutdown线程池
checkShutdownAccess();
// CAS+循环设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
shutdown()方法执行流程:
- 上锁,
mainLock是线程池的主锁,是可重入锁,当要操作workers Set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时也需要先获取mainLock; - 判断调用者是否有权限shutdown线程池;
- 使用CAS操作将线程池状态设置为SHUTDOWN,shutdown之后不再接收新任务
- 中断所有空闲线程(interruptIdleWorkers())
- onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
- 解锁
- 尝试终止线程池(tryTerminate())
可以看到shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown、中断所有空闲线程、tryTerminated()尝试终止线程池。
那么什么是空闲线程?,interruptIdleWorkers()是怎么中断空闲线程的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
/*
* onlyOne如果为true,最多interrupt一个worker
* 只有当终止流程已经开始,但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用
* (终止流程已经开始指的是:shutdown状态 且 workQueue为空,或者 stop状态)
* 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待
* 为保证线程池最终能终止,这个操作总是中断一个空闲worker
* 而shutdown()中断所有空闲worker,来保证空闲线程及时退出
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers()首先会获取mainLock锁,因为要迭代worker Set,在中断每个worker前,需要做两个判断:
- 线程是否已经被中断,是就什么也不做
- worker.tryLock()是否成功
第二个判断比较重要,因为Worker类除了实现了可执行Runnable,也继承了AQS,本身也是一把锁。tryLock()调用了Worker自身实现的tryAcquire()方法,这也是AQS规定子类需要实现的尝试获取锁的方法:
1
2
3
4
5
6
7
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire()先尝试将AQS的state从0–>1,返回true代表上锁成功,并设置当前线程为锁的拥有者。可以看到compareAndSetState(0, 1)只尝试了一次获取锁,且不是每次state+1,而是0–>1,说明锁不是可重入的。
但是为什么要worker.tryLock()获取worker的锁呢?
其实,这就是Worker类存在的价值之一,控制线程中断。在runWorker()方法中每次获取到task,task.run()之前都需要worker.lock()上锁,运行结束后解锁,即正在运行任务的工作线程都是上了worker锁的。我们在调用interruptIdleWorkers()中断之前需要先tryLock()获取worker锁,意味着正在运行的worker不能中断,因为worker.tryLock()失败,且锁是不可重入的。故shutdown()只有对能获取到worker锁的空闲线程(即正在从workQueue中getTask(),此时worker没有加锁)发送中断信号。
为此,我们也可以将worker划分为:
- 空闲worker: 正在从workQueue阻塞队列中获取任务的worker
- 运行中worker: 正在task.run()执行任务的worker
正阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务。捕获中断异常后,将继续循环到getTask()最开始的判断线程池状态的逻辑,当线程池是shutdown状态,且workQueue.isEmpty时,return null,进行worker线程退出逻辑。
某些情况下,interruptIdleWorkers()时多个worker正在运行,不会对其发出中断信号,假设此时workQueue也不为空,那么当多个worker运行结束后,会到workQueue阻塞获取任务,获取到的执行任务,没获取到的,如果还是核心线程,会一直workQueue.take()阻塞住,线程无法终止,因为workQueue已经空了,且shutdown后不会在接收新任务了。那么,在遇到这种情况的时候,这就需要在shutdown()之后,还可以发出中断信号。
Doug Lea大神巧妙的在所有可能导致线程池产生终止的地方安插了tryTerminated()方法,尝试去执行线程池终止的逻辑,并在其中判断线程池已经进入了终止流程,没有任务等待执行了,但线程池还有线程,中断唤醒一个空闲线程。
shutdown()方法的最后也调用了tryTerminated()方法,下面看看这个方法的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
* 在以下情况下将线程池变为TERMINATED终止状态:
* 1. shutdown 且正在运行的worker和workerQueue队列都empty
* 2. stop 且没有正在运行的worker
*
* 这个方法必须在任何可能导致线程池终止的情况下被调用:
* 减少worker的数量、shutdown时从queue中移除任务等。
*/
final void tryTerminate() {
// 这个for循环主要是和进入关闭线程池操作的CAS判断结合使用的
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RIUNNING,因为还在运行中,不能停止
* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了
* 3. SHUTDOWN并且等待队列非空,这时候要执行完workQueue中的task
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果线程数量不为0,则中断一个空闲的工作线程,并返回
/* 因为只有shutdown状态,且workqueue为空,或者stop状态才能够执行到这一步
*如果此时线程池中还有线程(正在执行任务,正在等待任务)
* 中断唤醒一个正在等任务的空闲worker
* 唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); //中断workers集合中的空闲任务,参数为true,只中断一个
return;
}
// 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated方法默认什么都不做,留给子类实现
terminated();
} finally {
// 设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用了 等待线程池终止的线程 awaitTermination()
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminated()执行流程:
- 判断线程池是否需要进入终止流程(只有当shutdown状态 + workQueue.isEmpty或者STOP状态,才需要)
- 判断线程池中是否还有线程,有则执行interruptIdleWorkers(ONLY_ONE)方法,进行中断一个空闲线程(
正是这个逻辑可以再次发出中断信号,中断阻塞在获取任务的线程); - 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没了,开始terminated。在这个阶段,会先上锁,将线程池设置为Tidying状态,之后调用需子类实现的terminated(),最后设置线程池为terminated状态,并唤醒所有等待线程池终止这个Condition的线程。
5.2 shutdownNow()
shutdownNow()方法与shutdown()方法类似,不同的地方在于:
- 设置状态为STOP;
- 中断所有工作线程,无论是否是空闲的;
- 取出阻塞队列中没有被执行的任务并返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
* 这个任务列表是从任务队列中排出(删除)的
*
* 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
*
* 除了尽力尝试停止运行中的任务,没有任何保证
* 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断调用者是否有权限shutdown线程池
checkShutdownAccess();
// CAS+循环设置线程池状态为stop
advanceRunState(STOP);
// 中断所有线程,包括正在运行任务的
interruptWorkers();
//将workQueue中的元素放入一个List并返回,取出队列中没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
// 返回workQueue中未执行的任务
return tasks;
}
shutdownNow() 和 shutdown()的大体流程相似,差别是:
- 将线程池更新为stop状态
- 调用 interruptWorkers() 中断所有线程,包括正在运行的线程
- 将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务。
来看看interruptWorkers()方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// ThreadPoolExecutor.Worker.interruptIfStarted()
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
interruptWorkers() 很简单,循环对所有worker调用 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()。需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束。
5.3 awaitTermination()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
awaitTermination()方法是等待线程池终止的方法,其返回值有两种:
- true: 线程池终止
- false: 超过timeout的时间
在发出一个shutdown()请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞:
- 所有任务完成执行
- 达到超时时间
- 当前线程被中断
1
private final Condition termination = mainLock.newCondition();
awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间。
termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待。在阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):
- 如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出;
- 如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败
- 如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞
故终止线程池并需要知道其是否终止可以用如下方式:
1
2
3
4
5
6
7
8
9
executorService.shutdown();
try{
while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for terminate");
}
}
catch (InterruptedException e) {
//中断处理
}
7. 线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:
- getTaskCount: 线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount: 线程池已完成的任务数量,该值小于等于taskCount;
- getLargestPoolSize: 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
- getPoolSize: 线程池当前的线程数量。
- getActiveCount: 当前线程池中正在执行任务的线程数量。
通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。
8. Executors类
Exectors工厂类提供了线程池的初始化接口,主要有如下几种:
8.1 newFixedThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
初始化生成一个固定大小的线程,其中corePoolSize == maximumPoolSize,即最大线程数设置与核心线程数相等。此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程)使用LinkedBlockingQueue作为阻塞队列,无界队列。
过程分析: 刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。(为此,它是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源)
8.2 newSingleThreadExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
初始化的时候生成只有一个线程的固定线程池,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交的任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
8.3 newCachedThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池。
- 初始化一个可以缓存的线程池,默认缓存60s,线程池的线程数可以达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
- 和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。
Attention:使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。
这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。下面针对于该线程池,我们来具体地分析一下过程:
过程分析:直接来看execute()方法吧,鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 SynchronousQueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中,否则就是和第一个任务一样,进到最后的 else if 分支创建新线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int c = ctl.get();
// corePoolSize 为 0,所以不会进到这个 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// offer 如果有空闲线程刚好可以接收此任务,那么返回 true,否则返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点
8.4 newScheduledThreadPool
1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
注意:
ScheduledExecutorService#scheduleAtFixedRate() 指的是“以固定的频率”执行,period(周期)指的是两次成功执行之间的时间。比如,scheduleAtFixedRate(command, 5, 2, second),第一次开始执行是5s后,假如执行耗时1s,那么下次开始执行是7s后,再下次开始执行是9s后。
而ScheduledExecutorService#scheduleWithFixedDelay()指的是“以固定的延时”执行,delay(延时)指的是一次执行终止和下一次执行开始之间的延迟。还是上面的例子,scheduleWithFixedDelay(command, 5, 2, second),第一次开始执行是5s后,假如执行耗时1s,执行完成时间是6s后,那么下次开始执行是8s后,再下次开始执行是11s后。
9. 总结
9.1 线程池有哪些关键属性?
- corePoolSize 到 maximumPoolSize 之间的线程会被回收,当然 corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))。
- workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。
- keepAliveTime 用于设置空闲时间,如果线程数超出了 corePoolSize,并且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操作。
- rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况,默认有抛出 RejectedExecutionException 异常、忽略任务、使用提交任务的线程来执行此任务和将队列中等待最久的任务删除,然后提交此任务这四种策略,默认为抛出异常。
9.2 线程池中的线程创建时机?
- 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
- 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
- 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
注意:如果将队列设置为无界队列,那么线程数达到 corePoolSize 后,其实线程数就不会再增长了。因为后面的任务直接往队列塞就行了,此时 maximumPoolSize 参数就没有什么意义。
9.3 任务执行过程中发生异常怎么处理?
如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。
9.4 什么时候会执行拒绝策略?
- workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行。
- workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。
参考文章:
Java线程池ThreadPoolExecutor使用和分析
深入理解Java线程池:ThreadPoolExecutor