Java OpenResty Spring Spring Boot MySQL Redis MongoDB PostgreSQL Linux Android Nginx 面试 小程序 Arthas JVM AQS juc Kubernetes Docker 诊断工具


Java 并发编程之 AQS ReentrantLock await signal 源码解析

Java juc AQS 大约 4316 字

说明

ConditionObject中维护的队列称为条件队列

AQS中维护的Node队列称为同步队列

await

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    private Node addConditionWaiter() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }

        Node node = new Node(Node.CONDITION);

        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
}

final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

addConditionWaiter

创建一个节点,加入到条件队列中。

isHeldExclusively:如果没有上锁就直接调用await方法,将直接抛出IllegalMonitorStateException异常。

新创建一个waitStatus等于-2CONDITION)的节点,如果firstWaiter为空,则将新创建的节点赋值给firstWaiter,反之,将新创建的节点赋值给lastWaiter节点的nextWaiter,并将lastWaiter置为新创建的节点。

fullyRelease

可能有锁重入的情况,所以全部释放锁,将状态值改为0,并且唤醒同步队列中的下一个节点去竞争锁(自身已经是抢得锁正在执行代码的节点,已经不在同步队列中了,竞争锁是可能是非公平锁)。

isOnSyncQueue

如果已经被唤醒,进入了同步队列了则返回true,正常await生成的NodewaitStatus都等于CONDITION,所以返回false,取反,进入while循环,暂停了当前线程。

signal

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

(firstWaiter = first.nextWaiter) == null

firstWaiter的下一个等待节点赋值给firstWaiter并且判空,如果为空,条件队列已经为空了,将lastWaiter也置为空,

firstWaiter = first.nextWaiter的作用是将原先的firstWaiter移除条件队列。

first.nextWaiter = null

将原先的等待节点的nextWaiter置为空,帮助GC

transferForSignal

将这个从条件队列移除的节点,转移到同步队列中。

!node.compareAndSetWaitStatus(Node.CONDITION, 0):如果发生了中断等情况,CAS失败,继续while条件的后续判断,判断的是条件队列的下一个节点是否能唤醒。

如果CAS成功,则enq(node)将队列放入到同步队列中,返回的对象是入队前的队尾节点。即当前node变量的prev前驱节点。

如果前驱节点已经取消了或者CAS置为SIGNAL失败后,直接恢复线程的运行。

然后返回true,取反,结束while循环,唤醒流程结束。

注意

ReentrantReadWriteLock中的读锁不能使用newCondition()方法,直接抛出了异常。(写锁可使用)

阅读 1822 · 发布于 2021-10-07

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批