Java 并发编程之 AQS ReentrantLock await signal 源码解析

Java juc AQS 大约 4316 字

说明

ConditionObject中维护的队列称为条件队列

AQS中维护的Node队列称为同步队列

await

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    private Node addConditionWaiter() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }

        Node node = new Node(Node.CONDITION);

        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
}

final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

addConditionWaiter

创建一个节点,加入到条件队列中。

isHeldExclusively:如果没有上锁就直接调用await方法,将直接抛出IllegalMonitorStateException异常。

新创建一个waitStatus等于-2CONDITION)的节点,如果firstWaiter为空,则将新创建的节点赋值给firstWaiter,反之,将新创建的节点赋值给lastWaiter节点的nextWaiter,并将lastWaiter置为新创建的节点。

fullyRelease

可能有锁重入的情况,所以全部释放锁,将状态值改为0,并且唤醒同步队列中的下一个节点去竞争锁(自身已经是抢得锁正在执行代码的节点,已经不在同步队列中了,竞争锁是可能是非公平锁)。

isOnSyncQueue

如果已经被唤醒,进入了同步队列了则返回true,正常await生成的NodewaitStatus都等于CONDITION,所以返回false,取反,进入while循环,暂停了当前线程。

signal

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

(firstWaiter = first.nextWaiter) == null

firstWaiter的下一个等待节点赋值给firstWaiter并且判空,如果为空,条件队列已经为空了,将lastWaiter也置为空,

firstWaiter = first.nextWaiter的作用是将原先的firstWaiter移除条件队列。

first.nextWaiter = null

将原先的等待节点的nextWaiter置为空,帮助GC

transferForSignal

将这个从条件队列移除的节点,转移到同步队列中。

!node.compareAndSetWaitStatus(Node.CONDITION, 0):如果发生了中断等情况,CAS失败,继续while条件的后续判断,判断的是条件队列的下一个节点是否能唤醒。

如果CAS成功,则enq(node)将队列放入到同步队列中,返回的对象是入队前的队尾节点。即当前node变量的prev前驱节点。

如果前驱节点已经取消了或者CAS置为SIGNAL失败后,直接恢复线程的运行。

然后返回true,取反,结束while循环,唤醒流程结束。

注意

ReentrantReadWriteLock中的读锁不能使用newCondition()方法,直接抛出了异常。(写锁可使用)

阅读 47 · 发布于 2021-10-07

————        END        ————

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批