Java 并发编程之 ThreadPoolExecutor 线程池源码解析

Java juc 大约 6661 字

构造函数

注意:只有在阻塞队列满的前提下才会创建非核心线程。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2, // corePoolSize 核心线程数
        5, // maximumPoolSize 最大线程数
        5, // keepAliveTime 非核心线程或者设置了核心线程超时时间的 最大空闲时间
        TimeUnit.SECONDS, // timeUnit 空闲时间单位
        new LinkedBlockingQueue<>(2), // workQueue 阻塞队列及容量
        new ThreadFactory() { // threadFactory 自定义线程工厂类
            private final AtomicInteger mThreadNum = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
                System.out.println(t.getName() + " has been created");
                return t;
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy() // handler 拒绝策略
);

线程池状态

ThreadPoolExecutor使用int的高3位表示线程池状态,低29位表示线程池数量。

状态名 高 3 位 接收新任务 阻塞队列任务 说明
RUNNING 111 Y Y -
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终结状态

参数说明

从数值大小排序:TERMINATED>TIDYING>STOP>SHUTDOWN>RUNNING

这些信息存储在ctl原子变量中,目的是将线程池状态与线程个数合二为一,这样就k可以用一次CAS原子操作进行赋值。

rsrunning statue

wcworking count

ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))

private static int ctlOf(int rs, int wc) { return rs | wc; }

execute 执行步骤

  1. 如果运行的线程数少于corePoolSize,尝试使用给定的Runnable任务启动一个新线程作为其第一个任务。对addWorker的调用以原子方式检查runStateworkerCount,从而通过返回false来防止在不应该添加线程时出现误报。
  2. 如果任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为自上次检查以来现有线程已死亡)或进入此方法后线程池关闭了。因此,我们重新检查状态,并在必要时在停止时回滚入队,如果没有则启动一个新线程。
  3. 如果任务不能排队,那么我们尝试添加一个新线程。如果它失败了,我们知道我们已经关闭或者阻塞队列已经饱和,因此拒绝该任务。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

添加工作线程

workerCount不大于最大容量CAPACITY或者不大于corePoolSizemaximumPoolSize时,计数加1

ReentrantLock上锁后加入到HashSetWorkers集合,并把标志位workerAdded改为true表示添加成功。

ReentrantLock解锁后开启线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

线程阻塞和回收

allowCoreThreadTimeOut || wc > corePoolSize:判断当前工作线程是否超出核心线程,超出则需要回收线程,即不用阻塞队列的阻塞获取而用立刻返回的方法。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
阅读 75 · 发布于 2021-11-11

————        END        ————

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批