Java OpenResty Spring Spring Boot MySQL Redis MongoDB PostgreSQL Linux Android Nginx 面试 小程序 Arthas JVM AQS juc Kubernetes Docker 诊断工具


Java 并发编程之 AQS CyclicBarrier 源码解析

Java juc AQS 大约 3284 字

说明

CyclicBarrierCountDownLatch的差别在于:CyclicBarrier可重复使用,而CountDownLatch在计数归0后就不能再用。

await

使用ReentrantLockCondition保证同步和等待。

// java.util.concurrent.CyclicBarrier#await()
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

nextGeneration:将计数重置,且唤醒所有await的线程。

count0时,调用CyclicBarrier中构造方法传入的RunnableRunnable中有异常未捕获时,会重置计数,且唤醒所有await线程,并标识已经被break了,之后再执行await会抛出BrokenBarrierException异常。举例:

public class CyclicBarrierRunnableExceptionDemo {

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = null;
        try {
            cyclicBarrier = new CyclicBarrier(1, () -> {
                int i = 1 / 0;
            });
            cyclicBarrier.await();
        } catch (Exception e) {
            System.out.println(e);
            cyclicBarrier.await();
        }
    }

}
阅读 1129 · 发布于 2021-10-13

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批