Java 并发编程之 LongAdder 源码解析
Java juc About 9,061 words基本用法
public class LongAdderDemo {
public static void main(String[] args) {
LongAdder adder = new LongAdder();
adder.increment();
System.out.println(adder.sum());
}
}
源码解析
cells和base是LongAdder的父类Striped64中的原子成员变量。
cells 未初始化情况
// java.util.concurrent.atomic.LongAdder
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
(cs = cells) != null:将cells赋值给cs局部变量并判断是否为空。首次进行add操作cells未初始化,为空,进行下一个判断。
!casBase(b = base, b + x):将base赋值给b局部变量(base是long类型的成员变量初始值为0),与b+x进行cas比较并交换,如果成功则add方法执行结束,没有交换成功则进入if方法体。
cs == null:首次add肯定为空,进入if方法体。
longAccumulate:执行Striped64中的长整形累加方法。
// java.util.concurrent.atomic.Striped64
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// ...
}
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
}
private static final VarHandle THREAD_PROBE;
static {
THREAD_PROBE = l.findVarHandle(Thread.class, "threadLocalRandomProbe", int.class);
}
// java.lang.Thread
@jdk.internal.vm.annotation.Contended("tlr")
int threadLocalRandomProbe;
getProbe():获取当前线程的Probe哈希值,如果等于0,则用ThreadLocalRandom.current()方法初始化,并再次获取Probe的值赋值给h。
// java.util.concurrent.atomic.Striped64
transient volatile int cellsBusy;
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
// ...
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {//...}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base, (fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}
final boolean casCellsBusy() {
return CELLSBUSY.compareAndSet(this, 0, 1);
}
第一次进入longAccumulate方法时,cells还没创建,所以不走第一个if判断。
cellsBusy:CAS自旋锁的标志位,用于Cells数组创建或扩容。初始为0。
cells == cs:在第一个if判断时已经赋值,此处再次判断是为了防止其他线程已经初始化完成了。
casCellsBusy():自旋锁上锁,成功就进行cells数组初始化。
cells == cs:再次判断是否已经被初始化了,防止在CAS期间有改动。
rs[h & 1] = new Cell(x):默认创建的cells数组长度为2,第一个Cell元素放置在线程的Probe值&1的位置(&1等于%2,对2取模)
finally中解锁。
casBase(v = base, (fn == null):如果cellsBusy已经被抢占或者cells == cs被改动或者casCellsBusy()抢占失败则进行对base成员变量的cas赋值,失败则再次循环,成功则方法longAccumulate执行结束(意味着LongAdder中的add方法也执行结束)
cells 初始化完成情况
// java.util.concurrent.atomic.LongAdder
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
((cs = cells) != null:cells数据已经初始化完毕了,所以每次都会进入if语句体。
(m = cs.length - 1) < 0:初始化的cells数组长度为2,此时m=1,所以此条件也不成立。
(c = cs[getProbe() & m]) == null:判断Probe&m索引上的Cell是否为空,为空则进行longAccumulate。(此处m=1等于判断0或1索引的Cell)
!(uncontended = c.cas(v = c.value, v + x)):如果上一步取到的Cell对象不为空,则在Cell上进行CAS赋值,赋值失败则将uncontended置为false,并进行longAccumulate。
// java.util.concurrent.atomic.Striped64
transient volatile int cellsBusy;
static final int NCPU = Runtime.getRuntime().availableProcessors();
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
// ...
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// ...
}
collide:为真表示最后一个槽位都不为空了,即数组所有索引位置上的元素都不为空了。
(c = cs[(n - 1) & h]) == null:此时n等于数组长度等于2,同样是取0或1索引位置的Cell元素,如果为空,则上自旋锁,new出一个Cell对象赋值到该索引上。
!wasUncontended:LongAdder中进行的CAS失败了的所以contended是false,改为true并且调用advanceProbe更换计算索引位置的h进行下一次循环。
c.cas(v = c.value...:对索引位置上的Cell元素进行CAS赋值,成功则方法整体运行完成,失败则进入下一个if判断。
n >= NCPU || cells != cs:数组的长度超出了CPU个数,或者发生了扩容,则将collide该为false。调用advanceProbe更换计算索引位置的h进行下一次循环。
!collide:上述条件都不成立(已经循环了几次了),则说明数组位置都有元素了,将标志位置为真。调用advanceProbe更换计算索引位置的h进行下一次循环。
cellsBusy == 0 && casCellsBusy():上自旋锁,对Cell数组进行扩容,每次扩容的大小为2的n次方。扩容完成将collide置为false,继续循环。
sum 方法
sum计算的是当前cells和base的快照的总和,在并发环境下调用此方法,等到的值不一定准确。改方法适合在所有竞争线程都执行完成后的获取总数。
// java.util.concurrent.atomic.LongAdder
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
高并发场景
在高并发场景下,Cell数组已经扩容到CPU个数的长度后。更多的执行的情况如下,删去了不会执行到的代码和永远不成立的代码。
// java.util.concurrent.atomic.LongAdder
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null) {
boolean uncontended = true;
// 随机获取一个 Cell 并赋值给 c ,c 肯定不为空,进行 || 运算
// 对 c 进行 CAS 赋值,失败进入 longAccumulate,uncontended = false
if ((c = cs[getProbe() & m]) == null || !(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
// java.util.concurrent.atomic.Striped64
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h; // 随机数,取索引用
// 高并发场景下该字段已经没有用处
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
// 数组一定不为空,所以一直进这个 if 条件
if ((cs = cells) != null && (n = cs.length) > 0) {
// 随机获取一个 Cell 并赋值给 c ,c 肯定不为空,进入 else if 判断
if ((c = cs[(n - 1) & h]) == null) {}
// 上面 LongAdder 的 add 方法中执行中带入的 uncontended = false,取反,赋值为 true
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 再次 CAS 赋值 cell 的值,成功就退出循环,结束方法
else if (c.cas(v = c.value, (fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
// 上面 CAS 不成功后,每次都会进入此 if 条件中
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
// 每次进入 if 条件没有 break 掉的都要执行更换随机数的操作
h = advanceProbe(h);
}
}
}
备注
LongAdder的reset和sumThenReset方法都不应该用在并发场景下。
————        END        ————
Give me a Star, Thanks:)
https://github.com/fendoudebb/LiteNote扫描下方二维码关注公众号和小程序↓↓↓