Java OpenResty Spring Spring Boot MySQL Redis MongoDB PostgreSQL Linux Android Nginx 面试 小程序 Arthas JVM AQS juc Kubernetes Docker 诊断工具


Java 并发编程之 AQS ReentrantLock 非公平锁源码解析

Java juc AQS 大约 6141 字

说明

本文基于Java11

部分源码

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            // 将新节点的 prev 字段设置为老的 node 节点
            node.setPrevRelaxed(oldTail);
            // 将 tail 字段设置为 node 
            if (compareAndSetTail(oldTail, node)) {
                // CAS 设置成功后,将老的 node 节点的 next 字段设置为新的节点
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

private final void initializeSyncQueue() {
    Node h;
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return TAIL.compareAndSet(this, expect, update);
}


/*
     +------+  prev +-----+       +-----+
head |      | <---- |     | <---- |     |  tail
     +------+       +-----+       +-----+
*/
static final class Node {
    static final Node EXCLUSIVE = null;
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;

    final void setPrevRelaxed(Node p) {
        // Unsafe/VarHandle set prev
        PREV.set(this, p);
    }
}

tryAcquire()尝试获取锁,返回了true说明上锁成功,失败后进行acquireQueued入队操作。

入队操作前先进行addWaiter初始化等待节点。

ReentrantLock中的非公平锁的实现。

// java.util.concurrent.locks.ReentrantLock.Sync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

getState()获取当前锁的状态赋值给变量c

如果c0表示没有人上锁使用CAS修改状态值竞争锁并且设置独占锁线程为当前线程,返回trueCAS失败返回false准备入等待队列。

如果c不为0判断是否是当前线程持有了锁,如果是,则锁重入,设置新的状态值,ReentrantLock最大的锁重入次数是Integer的最大值。

加入等待队列时head节点为哨兵节点,哨兵节点的next字段指向需要入队的新节点,tail节点指向入队的新节点。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

正式加入队列前,还会将节点的前一个节点的waitStatus0改为-1

ReentrantLocklock()方法为不可能打断模式,外部调用Threadinterrupte()方法,只是会再次进行acquireQueued抢夺锁,但基本都是再次进入LockSupportpark()方法。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁时,将Syncstate改为0(暂时不考虑重入的情况),将头节点的waitStatue改为0,并unpark头节点的下一个节点。

当释放锁后,acquireQueued中原先停在了parkAndCheckInterrupt代码,被unpark后继续执行,for循环中已经是前驱节点是头节点了,在tryAcquire抢得锁后,将当前节点设置为头节点,意味着上锁成功,继续执行锁中的代码。

阅读 1626 · 发布于 2021-10-04

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批