Java 并发编程之 AQS ReentrantReadWriteLock 读写锁源码解析
Java juc AQS About 5,446 wordsRead Lock
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count. */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count. */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
写锁占state
低16
位,读锁占state
高16
位
sharedCount
:返回读锁的数量
exclusiveCount
:返回写锁的数量
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
public void lock() {
sync.acquireShared(1);
}
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 省略了计数
return 1;
}
return fullTryAcquireShared(current);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared
:如果写锁数量不等于0
且持有锁的线程不是当前线程,则返回-1
进行入队等待。如果读锁的数量未超出最大数量且CAS
累加成功(暂不考虑读锁阻塞)返回1
加锁成功继续执行代码。
读写锁就两种情况:-1
写锁占用着,读锁返回1
。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如果tryAcquireShared
返回-1
则进入队列,添加节点为共享模式,同ReentrantLock
等其他AQS
类一样,将头节点的waitStatus
改为-1
表示唤醒时会unpark
头节点的next
节点的线程。
然后parkAndCheckInterrupt
将线程暂停。
setHeadAndPropagate
:在线程恢复运行后,还会去检查下一个节点是不是共享模式,如果是则将下一个节点也同样恢复运行。
Read Unlock
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock
public void unlock() {
sync.releaseShared(1);
}
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
// 省略了更改计数
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
tryReleaseShared
:将state
减去1
并判断是否等于0
。
doReleaseShared
:恢复后继节点线程的运行。
Write Lock
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
写锁的加锁流程与ReentrantLock
的加锁流程一致(排它锁)。不再重复。
Write Unlock
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {
sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
写锁的解锁流程同样与ReentrantLock
的解锁流程一致。
Views: 1,480 · Posted: 2021-10-08
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓
Loading...