Java 并发编程之 ThreadPoolExecutor 线程池源码解析
Java juc 面试 About 6,661 words构造函数
注意:只有在阻塞队列满的前提下才会创建非核心线程。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize 核心线程数
5, // maximumPoolSize 最大线程数
5, // keepAliveTime 非核心线程或者设置了核心线程超时时间的 最大空闲时间
TimeUnit.SECONDS, // timeUnit 空闲时间单位
new LinkedBlockingQueue<>(2), // workQueue 阻塞队列及容量
new ThreadFactory() { // threadFactory 自定义线程工厂类
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // handler 拒绝策略
);
线程池状态
ThreadPoolExecutor
使用int
的高3
位表示线程池状态,低29
位表示线程池数量。
状态名 | 高 3 位 | 接收新任务 | 阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | - |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
参数说明
从数值大小排序:TERMINATED
>TIDYING
>STOP
>SHUTDOWN
>RUNNING
这些信息存储在ctl
原子变量中,目的是将线程池状态与线程个数合二为一,这样就k可以用一次CAS
原子操作进行赋值。
rs
:running statue
wc
:working count
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
private static int ctlOf(int rs, int wc) { return rs | wc; }
execute 执行步骤
- 如果运行的线程数少于
corePoolSize
,尝试使用给定的Runnable
任务启动一个新线程作为其第一个任务。对addWorker
的调用以原子方式检查runState
和workerCount
,从而通过返回false
来防止在不应该添加线程时出现误报。 - 如果任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为自上次检查以来现有线程已死亡)或进入此方法后线程池关闭了。因此,我们重新检查状态,并在必要时在停止时回滚入队,如果没有则启动一个新线程。
- 如果任务不能排队,那么我们尝试添加一个新线程。如果它失败了,我们知道我们已经关闭或者阻塞队列已经饱和,因此拒绝该任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
添加工作线程
workerCount
不大于最大容量CAPACITY
或者不大于corePoolSize
或maximumPoolSize
时,计数加1
。
ReentrantLock
上锁后加入到HashSet
的Workers
集合,并把标志位workerAdded
改为true
表示添加成功。
ReentrantLock
解锁后开启线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程阻塞和回收
allowCoreThreadTimeOut || wc > corePoolSize
:判断当前工作线程是否超出核心线程,超出则需要回收线程,即不用阻塞队列的阻塞获取而用立刻返回的方法。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Views: 2,059 · Posted: 2021-11-11
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓
Loading...