Java OpenResty Spring Spring Boot MySQL Redis MongoDB PostgreSQL Linux Android Nginx 面试 算法 小程序 Arthas JVM juc AQS Docker DevOps


Java 并发编程之 AQS ReentrantReadWriteLock 读写锁源码解析

Java juc AQS 大约 5446 字

Read Lock

public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /** Returns the number of shared holds represented in count. */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** Returns the number of exclusive holds represented in count. */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

}

写锁占state16位,读锁占state16

sharedCount:返回读锁的数量

exclusiveCount:返回写锁的数量

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
public void lock() {
    sync.acquireShared(1);
}

@ReservedStackAccess
protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 省略了计数
        return 1;
    }
    return fullTryAcquireShared(current);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared:如果写锁数量不等于0且持有锁的线程不是当前线程,则返回-1进行入队等待。如果读锁的数量未超出最大数量且CAS累加成功(暂不考虑读锁阻塞)返回1加锁成功继续执行代码。

读写锁就两种情况:-1写锁占用着,读锁返回1

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

如果tryAcquireShared返回-1则进入队列,添加节点为共享模式,同ReentrantLock等其他AQS类一样,将头节点的waitStatus改为-1表示唤醒时会unpark头节点的next节点的线程。

然后parkAndCheckInterrupt将线程暂停。

setHeadAndPropagate:在线程恢复运行后,还会去检查下一个节点是不是共享模式,如果是则将下一个节点也同样恢复运行。

Read Unlock

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock
public void unlock() {
    sync.releaseShared(1);
}

@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
    // 省略了更改计数
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

tryReleaseShared:将state减去1并判断是否等于0

doReleaseShared:恢复后继节点线程的运行。

Write Lock

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
    sync.acquire(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

写锁的加锁流程与ReentrantLock的加锁流程一致(排它锁)。不再重复。

Write Unlock

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {
    sync.release(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

写锁的解锁流程同样与ReentrantLock的解锁流程一致。

阅读 239 · 发布于 2021-10-08

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批