Java Atomic系列

并发环境下计数

需求


count++表面上看只有一个操作,其实在底层是有3个操作,读出-自增-写回
这3个操作在并发环境下会被打断,这种非原子操作造成了结果错误
这个原子操作问题可以被java.util.concurrent.atomic包通过CAS方式解决

数值类型原子性


以AtomicInteger实现为例,对int进行CAS原子化包装
静态初始化块初始化了unsafe对象,以及获取value属性的内存位置偏移值,为CAS做准备

AtomicInteger.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class AtomicInteger extends Number implements java.io.Serializable {
// 获取Unsafe对象
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
// 通过反射获取要操作的字段,通过unsafe获取字段的地址
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//...
}

被CAS操作的字段要使用volatile保证可见性,如果是long还可以保证多字节赋值完整性

AtomicInteger.java
1
2
3
4
5
6
7
8
9
private volatile int value;

public final int get() {
return value;
}

public final void set(int newValue) {
value = newValue;
}

在原子操作方面,委派给Unsafe类实现相关功能
针对写入操作,提供直接写入和CAS操作先比较再写入

AtomicInteger.java
1
2
3
4
5
6
7
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

针对count++/count–这种先返回再增减的操作

AtomicInteger.java
1
2
3
4
5
6
7
8
9
10
11
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}

public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}

针对++count/–count这种先增减再返回的操作,实际上和上面那组方法一样,底层上都是getAndAddInt实现

AtomicInteger.java
1
2
3
4
5
6
7
8
9
10
11
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}

public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

对象类型原子性


基本类型中的boolean,int,long可以分别由AtomicBoolean,AtomicInteger和AtomicLong提供原子操作
如果是自定义的类,那么就需要AtomicReference负责,实现套路和上面基本一致,依然是volatile和unsafe协同实现
不同的是这里采用了泛型支持自定义类型,并且没有数值型的增减操作

AtomicReference.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class AtomicReference<V> implements java.io.Serializable {
private volatile V value;
//...
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

@SuppressWarnings("unchecked")
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
//...
}

CAS仅限对单个变量进行原子操作,如果同时操作多个变量,难以实现,可以考虑将多个变量作为一个整体组成一个对象,分散操作转变为一次整体操作,把旧的对象整体替换成新对象,效果上实现多变量原子操作

ABA问题


CAS在比较时只能判断是不是相等,并一定能确认是不是被改变过。比如值从A改到B再改回A,如果CAS看到了A,它也不知道是否中间经历了B值。
在基本类型上,因为只是一个数值,很多情况下值相等即可,改没改变无关紧要。然而和基本类型不同,引用型在比较时可能出现问题。
按照普通的CAS逻辑,引用没变就认为对象没改过,但是实际上修改对象属性并不改变对象引用,CAS无法正确判断对象内部是否修改,这会导致更新丢失。

对象类型原子性和辅助标记


既然单纯从对象引用无法判断是否修改,解决方案就是增加额外的判断内容,比如版本号。
AtomicStampedReference内部除了对象引用reference,还绑定了一个数值stamp,形成Pair结构
比较时单纯的对象引用比较是不够的,同时进行数值比较。

AtomicStampedReference.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

private volatile Pair<V> pair;

// 同时比较引用和数值
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
Pair<V> current = pair;
return expectedReference == current.reference && expectedStamp == current.stamp &&
//没变或者替换
((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
//...
}

如果不想绑定数值,AtomicMarkableReference提供了绑定布尔值的结构,相当于一个简化版

AtomicMarkableReference.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class AtomicMarkableReference<V> {
private static class Pair<T> {
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}

private volatile Pair<V> pair;
//...
}

数组原子性


直接把数组当成一个引用,并不能保证每个数组元素的原子性
原子包提供AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray三个版本

以AtomicIntegerArray为例,不出意外,底层是个int数组,但是不同的是没有volatile,而是一个final修饰,这是因为目标是提供操作数组元素的原子性,而不是改变数组引用本身
每个操作都提供了数组索引,操作单个元素。这样单个元素就相当于AtomicInteger,可以进行各种操作

AtomicIntegerArray.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AtomicIntegerArray implements java.io.Serializable {
//final也可以保障可见性
private final int[] array;

//传入数组被复制,因此原子操作不会影响原数组
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}

public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
//...
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}
//...
}

对象属性原子性


对象内可以包含一些volatile属性,为了保证单个属性可以进行原子操作,可以简单的把每个属性替换成原子对象版本
原子包内还提供另外一种实现方式,基于反射构建的属性更新器静态工具类,提供AtomicIntegerFieldUpdater,AtomicLongFieldUpdater和AtomicReferenceFieldUpdater三个版本

属性更新器是由静态方法创建,提供对象类型,属性类型,属性名三个参数。更新器只和类型有关,和具体对象实例无关,可以被复用
使用时传入操作对象和属性值即可,因为更新器已知该操作哪个属性

AtomicReferenceFieldUpdater.java
1
2
3
4
5
6
7
8
public abstract class AtomicReferenceFieldUpdater<T,V> {
@CallerSensitive
public static <U,W> AtomicReferenceFieldUpdater<U,W> newUpdater(Class<U> tclass, Class<W> vclass, String fieldName) {
return new AtomicReferenceFieldUpdaterImpl<U,W>(tclass, vclass, fieldName, Reflection.getCallerClass());
}
//...
public abstract boolean compareAndSet(T obj, V expect, V update);
}

比如更新Sample类中的value字段

Sample
1
2
3
4
5
6
7
8
9
class Sample{
volatile int value = 0;
}

public static void main(String[] args) {
Sample s = new Sample();
AtomicIntegerFieldUpdater<Sample> updater = AtomicIntegerFieldUpdater.newUpdater(Sample.calss, "value");
updater.compareAndSet(s,0,1);
}

Adder类


Java8新增的原子操作类,包括LongAdder和DoubleAdder两个版本。
AtomicLong和LongAdder两者功能类似,都可以用来计数,但是在高竞争下,LongAdder性能更好
性能区别在于实现原理不同,AtomicLong底层只维护一个变量,而LongAdder维护了一个动态大小的变量数组,分散了竞争

实现上LongAdder继承了Striped64,即把单一变量操作分散到变量数组的公共实现
大体流程是各线程选择数组不同元素操作,尽可能避免竞争,取值时再对数组结果进行汇总。对外界透明,感觉在操作单一变量

LongAdder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class LongAdder extends Striped64 implements Serializable {
//加减1作为特例
public void increment() {
add(1L);
}
public void decrement() {
add(-1L);
}

//累加多值
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//选取一个Cell,对其进行CAS操作
//选取过程是一个哈希处理,getProbe是线程相关的,即同一个线程操作同一个变量,减少竞争
if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

//因为是变量组,因此取值其实是相加汇总
public long longValue() {
return sum();
}

//求和是在base基础上,再遍历数组而成
//base变量用于无竞争条件下,数组是竞争条件下,结合起来
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
//...
}