Java 并发编程之 AQS Semaphore 源码解析
Java juc AQS 大约 3810 字init
// java.util.concurrent.Semaphore.Sync#Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
}
Semaphore
初始化时将AQS
的state
设置为构造方法中的值。
acquire
// java.util.concurrent.Semaphore#acquire(int)
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
Semaphore
判断是否需要进入同步队列的条件比较宽松,如果剩余许可小于0
了,不进行CAS
操作,直接进行入队操作,如果还是没有获取到就进行阻塞。
此处很多逻辑与ReentrantLock
、ReentrantReadWriteLock
中的读锁相似不再赘述。
release
// java.util.concurrent.Semaphore#release(int)
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// java.util.concurrent.Semaphore.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
此处的doReleaseShared
释放锁流程与ReentrantReadWriteLock
中的读锁释放一样。恢复了一个节点运行后,在setHeadAndPropagate
中,会继续调用doReleaseShared
,释放连续的共享锁。
阅读 1517 · 发布于 2021-10-11
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb扫描下方二维码关注公众号和小程序↓↓↓

昵称:
随便看看
换一批
-
Git 删除指定文件的所有记录阅读 486
-
OpenResty 中的几种防止 SQL 注入的方法阅读 5421
-
微信小程序添加公众号关注组件阅读 3797
-
Alpine Linux 安装 Docker阅读 3379
-
Java 环境变量 JDK_JAVA_OPTIONS 与 JAVA_TOOL_OPTIONS 差别阅读 567
-
Linux 命令之 Vim 显示行号阅读 2005
-
Docker 部署 KeyCloak阅读 825
-
Nginx 配置之 worker_processes阅读 4392
-
使用 kind 在 Docker 中部署 Kubernetes阅读 589
-
Spring Boot RabbitMQ Execution of Rabbit message listener failed阅读 7816