“I don’t let myself down, because I have met you in my most gorgeous age.”
引言
Java从JDK 1.5开始提供了java.util.concurrent.atomic包(后面简称为Atomic包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式。
因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(即字段)。Atomic包里的类基本都是使用了Unsafe实现的包装类。那什么是原子操作呢?
定义:原子操作是指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何线程上下文切换。
其中,原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执行其中的一部分,将整个操作视作一个整体是原子类的核心特征。
1. 原子操作实现的原理
接下来我们看看在Intel处理器和Java里是如何实现原子操作的。
先来熟悉一下几个术语:
- Cache line (缓存行)
- Compare and Swap(比较并交换)
- cpu pipeline (cpu流水线)
- memory order violation(内存顺序冲突)
1.1 处理器实现原子操作
当处理器读取内存的一个字节时,其他处理器不能访问这个字节的内存地址,最新的处理器能够自动保证处理器对同一个缓存行里进行16/32/64位的操作是原子的。处理器提供了总线锁定和缓存锁定的机制保证了复杂内存操作的原子性。
1.总线锁保证原子性
使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞,那么该处理器就能独自共享内存。
2.缓存锁保证原子性
“缓存锁定”指内存区域如果被缓存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作写回内存时,处理器不用在总线上声言LOCK#信号,而是修改内部的内存地址,通过缓存一致性保证操作的原子性。(例外:当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行,处理器会调用总线锁定。)
缓存一致性:缓存一致性会阻止同时修改两个以上处理器的内存区域数据,当其他处理器回写被锁定的缓存行数据时,会使其他处理器的缓存行无效。
1.2 Java原子操作实现
在Java中可以通过锁和循环CAS的方式实现原子操作。
CAS: jvm的CAS操作是基于处理器的CMPXCHG指令实现的,其CAS存在三个问题:
- ABA问题
- 循环时间开销大
- 只能保证一个共享变量的原子操作
锁:锁机制保证了只有获得锁的线程才能操作锁定的内存区域,具体实现可以参考java synchronized的内容
2. 原子更新基本类型类
使用原子的方式更新基本类型,Atomic包提供了以下3个类。
- AtomicBoolean: 原子更新布尔类型。
- AtomicInteger: 原子更新整型。
- AtomicLong: 原子更新长整型。
以上3个类提供的方法几乎一模一样。为此,我们主要以AtomicInteger为例进行分析。AtomicInteger常用的方法如下所示:
- int addAndGet(int delta): 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
- boolean compareAndSet(int expect, int update): 如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。
- int getAndIncrement(): 以原子方式将当前值加1,注意,这里返回的是自增前的值。
- void lazySet(int newValue): 最终会设置成newValue,使用lazySet设置后,可能导致其他线程在之后的一小段时间内还是可以读取到旧的值。
- int getAndSet(int newValue): 以原子方式设置为newValue的值,并返回旧值。
下面,进行AtomicInteger的源码分析:
1. 主要属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 获取Unsafe的实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 标识value字段的偏移量
private static final long valueOffset;
// 静态代码块,通过unsafe获取value的偏移量
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 存储int类型值的地方,使用volatile修饰
private volatile int value;
- 使用int类型的value存储值,且使用volatile修饰,volatile主要是保证可见性,即一个线程修改对另一个线程立即可见,主要的实现原理是内存屏障。
- 调用Unsafe的objectFieldOffset()方法获取value字段在类中的偏移量,用于后面CAS操作时使用。
2. 主要方法
1.getAndIncrement()
1
2
3
4
5
6
7
8
9
10
11
12
13
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//Unsafe中的方法
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
getAndIncrement()方法底层是调用的Unsafe的getAndAddInt()方法,通过源码可以看出,这个方法主要有三个参数:
- 操作的对象;
- 对象中字段的偏移量;
- 要增加的值。
查看Unsafe的getAndAddInt()方法的源码,可以看到它是先获取当前的值,然后再调用compareAndSwapInt()尝试更新对应偏移量处的值,如果成功了就跳出循环,如果不成功就再重新尝试,直到成功为止,这可不就是(CAS+自旋)的乐观锁机制。
2.compareAndSet()方法
1
2
3
4
5
6
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// Unsafe中的方法
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
调用Unsafe.compareAndSwapInt()方法实现,这个方法有四个参数:
- 操作的对象;
- 对象中字段的偏移量;
- 原来的值,即期望的值;
- 要修改为的值
可以看到,这是一个native方法,底层是使用C/C++写的,主要是调用CPU的CAS指令来实现,它能够保证只有当对应偏移量处的字段值是期望值时才更新,即类似下面这样的两步操作:
1
2
3
if(value == expect){
value = newValue;
}
通过CPU的CAS指令可以保证这两步操作是一个整体,也就不会出现多线程环境中可能比较的时候value值是a,而到真正赋值的时候value值可能已经变成b了的问题。
AtomicInteger中的其它方法几乎都是类似的,最终会调用到Unsafe的compareAndSwapInt()来保证对value值更新的原子性。
3. 原子更新数组
通过原子的方式更新数组里的某个元素,Atomic包提供了以下3个类:
- AtomicIntegerArray: 原子更新整型数组里的元素。
- AtomicLongArray: 原子更新长整形数组里的元素。
- AtomicReferenceArray: 原子更新引用类型数组里的元素。
AtomicIntegerArray类主要是提供原子的方式更新数组里的整型,其常用方法如下:
- int addAndGet(int i, int delta):以原子方式将输入值与数组中索引i的元素相加。
- boolean compareAndSet(int i, int expect, int update): 如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值。
给出AtomicIntegerArray的使用案例如下:
1
2
3
4
5
6
7
8
9
static int[] value = new int[]{1,2};
static AtomicIntegerArray ai = new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0,3);
System.out.println(ai.get(0)); //打印3
System.out.println(value[0]); // 打印1
}
数组value通过构造方式传入进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响到传入的数组。
4. 原子更新引用类型
原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子引用类型提供的类。Atomic包提供了以下3个类:
- AtomicReference: 原子更新引用类型。
- AtomicReferenceFieldUpdater: 原子更新引用类型里的字段。
- AtomicMarkableReference: 原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。构造方法时AtomicMarkableReference(V initialRef, boolean initialMark).
以上几个类提供的方法几乎一样,下面就以AtomicReference为例,给出其使用案例代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AtomicReferenceTest {
public static AtomicReference<User> atomicReference = new AtomicReference<>();
public static void main(String[] args) {
User user = new User("David",24);
atomicReference.set(user);
User updateUser = new User("Bob",25);
atomicReference.compareAndSet(user,updateUser);
System.out.println(atomicReference.get().getName());
System.out.println(atomicReference.get().getOld());
}
static class User{
private String name;
private int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() {
return name;
}
public int getOld() {
return old;
}
}
}
输出结果:
1
2
Bob
25
5.原子更新字段类
如果需要原子地更新某个类里的某个字段时,就需要使用原子更新字段类。Atomic包提供了3个类进行原子字段更新。
- AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。
- AtomicLongFieldUpdater: 原子更新长整形字段的更新器。
- AtomicReferenceFieldUpdater: 原子更新引用类型的更新器。
要想原子地更新字段类需要两步。第一步,因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新类的字段必须使用使用public volatile修饰符。
6.AtomicStampedReference源码分析
AtomicStampedReference是java并发包下提供的一个原子类,该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。首先,我们来介绍一下ABA问题。
6.1 ABA问题
ABA问题发生在多线程环境中,当某线程连续读取同一块内存地址两次,两次得到的值一样,它简单地认为“此内存地址的值并没有被修改过”,然而,同时可能存在另一个线程在这两次读取之间把这个内存地址的值从A修改成了B又修改回了A,这时还简单地认为“没有修改过”显然是错误的。
例如,两个线程按下面的顺序执行:
- 线程1读取内存位置X的值为A;
- 线程1阻塞了;
- 线程2读取内存位置X的值为A;
- 线程2修改内存位置X的值为B;
- 线程2又修改内存位置X的值为A;
- 线程1恢复,继续执行,比较发现还是A,把内存位置X的值设置为C

可以看到,针对线程1来说,第一次的A和第二次的A实际上并不是同一个A。下面用代码来表示一下上述过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) throws Exception{
testABA();
}
private static void testABA(){
AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(() ->{
int value = atomicInteger.get();
System.out.println("thread 1 read value: " + value);
//阻塞1s
LockSupport.parkNanos(1000000000L);
if(atomicInteger.compareAndSet(value,3)){
System.out.println("thread 1 update from " + value + " to 3");
}else {
System.out.println("thread 1 update fail");
}
}).start();
new Thread(() ->{
int value = atomicInteger.get();
System.out.println("thread 2 read value: " + value);
if(atomicInteger.compareAndSet(value,2)){
System.out.println("thread 2 update from " + value + " to 2");
value = atomicInteger.get();
System.out.println("thread 2 read value: " + value);
if(atomicInteger.compareAndSet(value,1)){
System.out.println("thread 2 update from " + value + " to 1");
}
}
}).start();
}
输出结果为:
1
2
3
4
5
6
thread 1 read value: 1
thread 2 read value: 1
thread 2 update from 1 to 2
thread 2 read value: 2
thread 2 update from 2 to 1
thread 1 update from 1 to 3
6.2 ABA的危害
上面一个例子或许没能让我们感受到ABA问题带来的危害,下面看一个现实的案例。例如,假设我们有一个无锁的栈结构,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ABATest {
static class Stack{
private AtomicReference<Node> top = new AtomicReference<>();
static class Node{
int value;
Node next;
public Node(int value){
this.value = value;
}
}
//出栈操作
public Node pop(){
for (;;){
//获取栈顶节点
Node t = top.get();
if(t == null){
return null;
}
//栈顶下一个节点
Node next = t.next;
//CAS更新top指向next节点
if(top.compareAndSet(t,next)){
// 把栈顶元素弹出,应该把next清空防止外面直接操作栈
t.next = null;
return t;
}
}
}
//入栈操作
public void push(Node node){
for (;;){
Node next = top.get();
//设置栈顶节点为新节点的next节点
node.next = next;
// CAS更新top指向新节点
if(top.compareAndSet(next,node)){
return;
}
}
}
}
}
我们来试想一下下面的情形:
假如,我们初始化栈结构为top->1->2->3,然后由两个线程分别做如下操作:
- 线程1执行pop()出栈操作,但是执行到
if(top.compareAndSet(t,next)){这行之前暂停了,所以此时节点1并未出栈; - 线程2执行pop()出栈操作弹出节点1,此时栈为top->2->3;
- 线程3执行pop()出栈操作弹出节点2,此时栈为top->3;
- 线程2执行push()入栈操作添加节点1,此时栈为top->1->3;
- 线程1恢复执行,比较节点1的引用并没有改变,执行CAS成功,此时栈变为top->2.
top->2 ? 而没有变成top->3. 这是因为线程1在第一步保存的next节点是节点2,所以它执行CAS成功后top节点就指向节点2了。测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static void testStack()throws Exception{
//初始化栈为top->1->2->3
Stack stack = new Stack();
stack.push(new Stack.Node(3));
stack.push(new Stack.Node(2));
stack.push(new Stack.Node(1));
new Thread(() ->{
//线程1出栈一个元素
stack.pop();
},"thread1").start();
Thread.sleep(1000);
new Thread(() -> {
//线程2出栈两个元素
Stack.Node A = stack.pop();
Stack.Node B = stack.pop();
//线程2又把A入栈了
stack.push(A);
},"thread2").start();
}
public static void main(String[] args) throws Exception{
testStack();
}
在Stack的pop()方法的if(top.compareAndSet(t,next)){处打个断点,线程1运行到这里时阻塞它的执行,让线程2执行完,再执行线程1这句,这句执行完可以看到栈的top对象中只有2这个节点了。
6.3 ABA问题的解决办法
其一般有以下几种方式:
- 版本号。比如,前面的栈结构增加一个版本号用于控制,每次CAS操作的同时检查版本号有没有变过。还有一些数据结构喜欢使用高位存储一个邮戳来保证CAS的安全。
- 不重复使用节点的引用。比如,上面的栈结构在线程2执行push()入栈操作的时候新建一个节点传入,而不是复用节点1的引用。
- 直接操作元素而不是节点。比如,上面的栈结构push()方法不应该传入一个节点(Node),而是传入元素值(int的value)。
下面,我们来看看java中的AtomicStampedReference是如何解决ABA问题的。
6.4 源码分析
1. 内部类
1
2
3
4
5
6
7
8
9
10
11
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
将元素值和版本号绑定在一起,存储在Pair的reference和stamp中。
2.属性
1
2
3
4
private volatile Pair<V> pair;
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
声明一个Pair类型的变量,并使用Unsafe获取其偏移量,存储到pairOffset中。
3.构造方法
1
2
3
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
构造方法需要传入初始值和初始版本号。
4. compareAndSet()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 获取到当前的(元素值,版本号)Pari对
Pair<V> current = pair;
return
//引用没变
expectedReference == current.reference &&
// 版本号没变
expectedStamp == current.stamp &&
//新引用等于旧引用
((newReference == current.reference &&
//新版本号等于旧版本号
newStamp == current.stamp) ||
// 构造新的Pair对象并CAS更新
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
//调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
分析:
- 如果元素值和版本号都没有变化,并且和新的也相同,返回true;
- 如果元素值和版本号都没有变化,并且和新的不完全相同,就构造一个新的Pair对象并执行CAS更新pair。
可以看到,java的AtomicStampedReference具体实现和前面讲的ABA的解决方法是一致的。首先,它使用版本号控制;其次,不重复使用节点(Pair)的引用,每次都都新建一个新的Pair来作为CAS比较的对象,而不是复用旧的;最后,外部传入元素值和版本号,而不是节点(Pair)的引用。
6.5 案例
让我们来使用AtomicStampedReference解决开篇那个AtomicInteger带来的ABA问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static void testABA(){
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
new Thread(() ->{
int[] stampHolder = new int[1];
int value = atomicStampedReference.get(stampHolder);
int stamp = stampHolder[0];
System.out.println("thread 1 read value: " + value + ", stamp" + stamp);
//阻塞1s
LockSupport.parkNanos(1000000000L);
if(atomicStampedReference.compareAndSet(value,3, stamp,stamp+1)){
System.out.println("thread 1 update from " + value + " to 3");
}else {
System.out.println("thread 1 update fail");
}
}).start();
new Thread(() ->{
int[] stampHolder = new int[1];
int value = atomicStampedReference.get(stampHolder);
int stamp = stampHolder[0];
System.out.println("thread 2 read value: " + value + ", stamp" + stamp);
if(atomicStampedReference.compareAndSet(value,2,stamp,stamp+1)){
System.out.println("thread 2 update from " + value + " to 2");
value = atomicStampedReference.get(stampHolder);
stamp = stampHolder[0];
System.out.println("thread 2 read value: " + value);
if(atomicStampedReference.compareAndSet(value,1,stamp,stamp+1)){
System.out.println("thread 2 update from " + value + " to 1");
}
}
}).start();
}
运行结果为:
1
2
3
4
5
6
thread 1 read value: 1, stamp: 1
thread 2 read value: 1, stamp: 1
thread 2 update from 1 to 2
thread 2 read value: 2, stamp: 2
thread 2 update from 2 to 1
thread 1 update fail!
可以看到线程1最后更新1到3时失败了,因为这时版本号也变了,成功解决了ABA的问题。
6.6 总结
- 在多线程环境下使用无锁结构要注意ABA问题;
- ABA的解决一般使用版本号来控制,并保证数据结构使用元素值来传递,且每次添加元素都新建节点承载元素值;
- AtomicStampedReference内部使用Pair来存储元素值及其版本号。
最后,在Java中,AtomicMarkableReference也可以解决ABA问题,不过它不是维护一个版本号,而是维护一个boolean类型的标记,标记值有修改。
7. LongAdder源码分析
LongAdder是java 8 新增的原子类,在多线程环境中,它比AtomicLong性能要高出不少,特别是写多的场景。
7.1 原理
LongAdder的原理是,在最初无竞争的时候,只更新base的值,当有多线程竞争的时候通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。

低竞争环境下直接更新base,类似AtomicLong;高并发下,会将每个线程的操作hash到不同的cell数组中,从而将AtomicLong中更新一个value的行为优化之后,分散到多个value中,从而降低更新热点,而需要得到当前值的时候,直接将cell中的value与base相加即可。但是其跟AtomicLong(compare and change -> xadd)的CAS不同,incrementAndGet操作及其变种可以返回更新后的值,而LongAdder返回到的是void.
7.2 源码分析
LongAdder继承自Striped64类,Striped64中定义了Cell内部类和各个重要属性。
主要内部类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Striped64 中的内部类,使用@sun.misc.Contended注解,说明里面的值可以消除伪共享
@sun.misc.Contended static final class Cell {
// 存储元素值,使用volatile修饰保证可见性
volatile long value;
Cell(long x) { value = x; }
// CAS更新value的值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
主要属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// cells数组,存储各个段的值
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
// 最初无竞争时使用的,也算是一个特殊的段
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
// 标记当前是否有线程在创建或者扩容cells,或者在创建Cell
// 通过CAS更新该值,相当于是一个锁
transient volatile int cellsBusy;
最初无竞争或有其它线程在创建cells数组时使用base更新值,有过竞争时使用cells更新值。
最初无竞争是指一开始没有线程之间的竞争,但也有可能是多线程在操作,只是这些线程没有同时去更新base的值。而有过竞争是指只要出现过竞争,不管后面有没有竞争都是用cells更新值,规则是不同的线程hash到不同的cell上去更新,减少竞争。
7.3 add()方法分析
add(long x)方法是LongAdder的主要方法,使用它可以使LongAdder中存储的值增加x,x可为正为负。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void add(long x) {
// as是Striped64中的cells属性
// b是Striped64中的base属性
// v是当前线程hash到的Cell中存储的值
// m是cells的长度减1,hash时作为掩码使用
// a是当前线程hash到的Cell
Cell[] as; long b, v; int m; Cell a;
// 条件1: cells不为空,说明出现过竞争,cells已经创建
// 条件2: cas操作base失败,说明其他线程先一步修改了base,正在出现竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
//true表示竞争不激烈
// false 表示竞争激烈,多个线程hash到同一个Cell上,可能要扩容
boolean uncontended = true;
// 条件1: cells为空,说明正在出现竞争,上面是从条件2过来的
// 条件2: 应该不会出现
// 条件3: 当前线程所在Cell为空,说明当前线程还没有更新过Cell,应初始化一个Cell
// 条件4: 更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到了同一个Cell,应扩容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是线程中的threadLocalRandomProbe字段
// 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的
// 除非刻意修改它
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 调用Striped64中的方法处理
longAccumulate(x, null, uncontended);
}
}
// Striped64中方法,通过Unsafe获取ThreadLocalRanddomProbe的值
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
- 最初无竞争时只更新base;
- 直到更新base失败时,创建cells数组;
- 当多个线程竞争同一个Cell比较激烈时,可能要扩容;
接着来看一下Striped64中的longAccumulate()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存储线程的probe值
int h;
// 如果getProbe()方法返回空,说明随机数未初始化
if ((h = getProbe()) == 0) {
// 强制初始化
ThreadLocalRandom.current(); // force initialization
// 重新获取probe值
h = getProbe();
// 都未初始化,肯定还不存在竞争激烈的情况
wasUncontended = true;
}
// 是否发生碰撞
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells已经初始化
if ((as = cells) != null && (n = as.length) > 0) {
// 当前线程所在的Cell未初始化
if ((a = as[(n - 1) & h]) == null) {
//当前无其他线程在创建或者扩容cells,也没有线程在创建Cell
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个Cell,值为当前需要增加的值
Cell r = new Cell(x); // Optimistically create
// 再次检测cellsBusy,并尝试更新它为1,相当于给当前线程加锁
if (cellsBusy == 0 && casCellsBusy()) {
// 是否创建成功
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 重新获取cells,并找到当前hash到cells数组中的位置
// 这里一定需要重新获取cells,因为as并不在锁定范围内
// 有可能已经扩容了,这里需要重新获取
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 把上面新建的Cell放在cells的j位置上
rs[j] = r;
//创建成功
created = true;
}
} finally {
//相当于释放锁
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
// 标记当前未出现冲突
collide = false;
}
// 当前线程所在的Cell不为空,且更新失败了
// 这里简单地设为true,相当于简单地自旋了一次
// 通过下面的语句修改线程的probe再重新尝试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 再次尝试CAS更新当前线程所在Cell的值,如果成功了就返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果cells数组的长度达到了CPU核心数,或者cells扩容了
// 设置collide为false并且通过下面的语句修改线程的probe再重新尝试
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 上上个else if都更新失败了,且上个条件不成立,说明出现冲突了
else if (!collide)
collide = true;
// 明确出现冲突了,尝试占有锁并扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 检查是否有其他线程已经扩容过了
if (cells == as) { // Expand table unless stale
// 新数组为原数组的两倍
Cell[] rs = new Cell[n << 1];
// 把旧数组元素拷贝到新数组中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 重新赋值cells为新数组
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
//已解决冲突
collide = false;
// 使用扩容后的新数组重新尝试
continue; // Retry with expanded table
}
// 更新失败或者达到了CPU核心数,重新生成probe,并重试
h = advanceProbe(h);
}
// 未初始化过cells数组,尝试占有锁并初始化cells数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//是否初始化成功
boolean init = false;
try { // Initialize table
// 检测是否有其他线程初始化过
if (cells == as) {
//新建一个大小为2的数组
Cell[] rs = new Cell[2];
// 找到当前线程hash到数组中的位置并创建其对应的Cell
rs[h & 1] = new Cell(x);
// 赋值给cells数组
cells = rs;
// 初始化成功
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 初始化成功直接返回
// 因为增加的值已经同时创建到Cell中了
if (init)
break;
}
// 如果有其他线程在初始化cells数组中,就尝试更新base
// 如果成功了就返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
具体步骤如下:
- 如果cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组;
- 如果当前线程尝试创建cells数组时,发现其他线程已经在创建了,就尝试更新base,如果成功直接返回;
- 通过线程的probe值找到当前线程应该更新cells数组中的哪个Cell;
- 如果当前线程所在的Cell未初始化,就占有cellsBusy锁并在相应位置创建一个Cell;
- 尝试CAS更新当前线程所在Cell,如果成功就返回,如果失败说明出现冲突;
- 当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次;
- 如果再重试的时候还是更新失败,就扩容;
- 扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中;
cellsBusy在创建cells数组,创建Cell、扩容cells数组这三个地方用得到。
7.4 sum()方法
sum()方法是获取LongAdder中真正存储的值的大小,通过把base和所有段相加得到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public long sum() {
Cell[] as = cells; Cell a;
// sum初始化等于base
long sum = base;
// 如果cells不为空
if (as != null) {
// 遍历所有的Cell
for (int i = 0; i < as.length; ++i) {
// 如果所在的Cell不为空,就把它的value累加到sum中
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看到sum()方法是把base和所有段的值相加得到,那么,这里有一个问题,如果前面已经累加到sum上的Cell的value有修改,不是就没法计算到了么?
答案是的,所有LongAdder可以说不是强一致性的,它是最终一致性的。
7.5 LongAdder vs AtomicLong
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class LongAdderVSAtomicLongTest {
public static void main(String[] args) {
testAtomicLongVSLongAdder(1, 10000000);
testAtomicLongVSLongAdder(10, 10000000);
testAtomicLongVSLongAdder(20, 10000000);
testAtomicLongVSLongAdder(40, 10000000);
testAtomicLongVSLongAdder(80, 10000000);
}
static void testAtomicLongVSLongAdder(final int threadCount, final int times){
try {
System.out.println("threadCount: " + threadCount + ", times:" + times);
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
System.out.println("LongAdder elapse: " + (System.currentTimeMillis() - start) + "ms");
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
System.out.println("AtomicLong elapse: " + (System.currentTimeMillis() - start2) + "ms");
}catch (InterruptedException e){
e.printStackTrace();
}
}
static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
AtomicLong atomicLong = new AtomicLong();
List<Thread> list = new ArrayList<>();
for (int i = 0; i < threadCount; i++){
list.add(new Thread(() ->{
for (int j = 0; j <times; j++){
atomicLong.incrementAndGet();
}
}));
}
for (Thread thread : list){
thread.start();
}
for (Thread thread : list){
thread.join();
}
}
static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
LongAdder longAdder = new LongAdder();
List<Thread> list = new ArrayList<>();
for (int i = 0; i < threadCount; i++){
list.add(new Thread(() ->{
for (int j = 0; j <times; j++){
longAdder.add(1);
}
}));
}
for (Thread thread : list){
thread.start();
}
for (Thread thread : list){
thread.join();
}
}
}
运行结果如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
threadCount: 1, times:10000000
LongAdder elapse: 131ms
AtomicLong elapse: 98ms
threadCount: 10, times:10000000
LongAdder elapse: 161ms
AtomicLong elapse: 10510ms
threadCount: 20, times:10000000
LongAdder elapse: 277ms
AtomicLong elapse: 23270ms
threadCount: 40, times:10000000
LongAdder elapse: 538ms
AtomicLong elapse: 43994ms
threadCount: 80, times:10000000
LongAdder elapse: 1005ms
AtomicLong elapse: 94821ms
可以看到当只有一个线程的时候,AtomicLong反而性能更高,随着线程越来越多,AtomicLong的性能急剧下降,而LongAdder的性能影响很小。
7.6 总结
- LongAdder通过base和cells数组来存储值;
- 不同的线程会hash到不同的cell上去更新,减少了竞争;
- LongAdder的性能非常高,最终会达到一种无竞争的状态。
此外,在longAccumulate()方法中有个条件是n>=NCPU就不会走到扩容逻辑了,而n是2的倍数,那是不是代表cells数组最大只能达到大于等于NCPU的最小2次方?
答案是明确的。因为同一个CPU核心同时只会运行一个线程,而更新失败了说明有两个不同的核心更新了同一个Cell,这时会重新设置更新失败的那个线程的probe值,这样下一次它所在的Cell很大概率会发生改变,如果运行的时间足够长,最终会出现同一个核心的所有线程都会hash到同一个Cell(大概率,但不一定全在一个Cell上)上去更新,所以,这里cells数组中长度并不需要太长,达到CPU核心数足够了。
比如,笔者的电脑是8核的,所以这里cells的数组最大只会到8,达到8就不会扩容了。
