java并发包之AtomicStampedReference源码分析(三)

devtool · · 331 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。
--- ## 问题 (1)什么是ABA? (2)ABA的危害? (3)ABA的解决方法? (4)AtomicStampedReference是什么? (5)AtomicStampedReference是怎么解决ABA的? ## 简介 AtomicStampedReference是java并发包下提供的一个原子类,它能解决其它原子类无法解决的ABA问题。 ## ABA ABA问题发生在多线程环境中,当某线程连续读取同一块内存地址两次,两次得到的值一样,它简单地认为“此内存地址的值并没有被修改过”,然而,同时可能存在另一个线程在这两次读取之间把这个内存地址的值从A修改成了B又修改回了A,这时还简单地认为“没有修改过”显然是错误的。 比如,两个线程按下面的顺序执行: (1)线程1读取内存位置X的值为A; (2)线程1阻塞了; (3)线程2读取内存位置X的值为A; (4)线程2修改内存位置X的值为B; (5)线程2修改又内存位置X的值为A; (6)线程1恢复,继续执行,比较发现还是A把内存位置X的值设置为C; ![ABA](https://gitee.com/alan-tang-tt/yuan/raw/master/死磕%20java并发包/resource/ABA1.png) 可以看到,针对线程1来说,第一次的A和第二次的A实际上并不是同一个A。 ABA问题通常发生在无锁结构中,用代码来表示上面的过程大概就是这样: ```java public class ABATest { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(1); new Thread(()->{ int value = atomicInteger.get(); System.out.println("thread 1 read value: " + value); // 阻塞1s LockSupport.parkNanos(1000000000L); if (atomicInteger.compareAndSet(value, 3)) { System.out.println("thread 1 update from " + value + " to 3"); } else { System.out.println("thread 1 update fail!"); } }).start(); new Thread(()->{ int value = atomicInteger.get(); System.out.println("thread 2 read value: " + value); if (atomicInteger.compareAndSet(value, 2)) { System.out.println("thread 2 update from " + value + " to 2"); // do sth value = atomicInteger.get(); System.out.println("thread 2 read value: " + value); if (atomicInteger.compareAndSet(value, 1)) { System.out.println("thread 2 update from " + value + " to 1"); } } }).start(); } } ``` 打印结果为: ```java thread 1 read value: 1 thread 2 read value: 1 thread 2 update from 1 to 2 thread 2 read value: 2 thread 2 update from 2 to 1 thread 1 update from 1 to 3 ``` ## ABA的危害 为了更好地理解ABA的危害,我们还是来看一个现实点的例子。 假设我们有一个无锁的栈结构,如下: ```java public class ABATest { static class Stack { // 将top放在原子类中 private AtomicReference<Node> top = new AtomicReference<>(); // 栈中节点信息 static class Node { int value; Node next; public Node(int value) { this.value = value; } } // 出栈操作 public Node pop() { for (;;) { // 获取栈顶节点 Node t = top.get(); if (t == null) { return null; } // 栈顶下一个节点 Node next = t.next; // CAS更新top指向其next节点 if (top.compareAndSet(t, next)) { // 把栈顶元素弹出,应该把next清空防止外面直接操作栈 t.next = null; return t; } } } // 入栈操作 public void push(Node node) { for (;;) { // 获取栈顶节点 Node next = top.get(); // 设置栈顶节点为新节点的next节点 node.next = next; // CAS更新top指向新节点 if (top.compareAndSet(next, node)) { return; } } } } } ``` 咋一看,这段程序似乎没有什么问题,然而试想以下情形。 假如,我们初始化栈结构为 top->1->2->3,然后有两个线程分别做如下操作: (1)线程1执行pop()出栈操作,但是执行到`if (top.compareAndSet(t, next)) {`这行之前暂停了,所以此时节点1并未出栈; (2)线程2执行pop()出栈操作弹出节点1,此时栈变为 top->2->3; (3)线程2执行pop()出栈操作弹出节点2,此时栈变为 top->3; (4)线程2执行push()入栈操作添加节点1,此时栈变为 top->1->3; (5)线程1恢复执行,比较节点1的引用并没有改变,执行CAS成功,此时栈变为 top->2; What?点解变成 top->2 了?不是应该变成 top->3 吗? 那是因为线程1在第一步保存的next是节点2,所以它执行CAS成功后top节点就指向了节点2了。 测试代码如下: ```java private static void testStack() { // 初始化栈为 top->1->2->3 Stack stack = new Stack(); stack.push(new Stack.Node(3)); stack.push(new Stack.Node(2)); stack.push(new Stack.Node(1)); new Thread(()->{ // 线程1出栈一个元素 stack.pop(); }).start(); new Thread(()->{ // 线程2出栈两个元素 Stack.Node A = stack.pop(); Stack.Node B = stack.pop(); // 线程2又把A入栈了 stack.push(A); }).start(); } public static void main(String[] args) { testStack(); } ``` 在Stack的pop()方法的`if (top.compareAndSet(t, next)) {`处打个断点,线程1运行到这里时阻塞它的执行,让线程2执行完,再执行线程1这句,这句执行完可以看到栈的top对象中只有2这个节点了。 _记得打断点的时候一定要打Thread断点,在IDEA中是右击选择Suspend为Thread。_ 通过这个例子,笔者认为你肯定很清楚ABA的危害了。 ## ABA的解决方法 ABA的危害我们清楚了,那么怎么解决ABA呢? 笔者总结了一下,大概有以下几种方式: (1)版本号 比如,上面的栈结构增加一个版本号用于控制,每次CAS的同时检查版本号有没有变过。 还有一些数据结构喜欢使用高位存储一个邮戳来保证CAS的安全。 (2)不重复使用节点的引用 比如,上面的栈结构在线程2执行push()入栈操作的时候新建一个节点传入,而不是复用节点1的引用; (3)直接操作元素而不是节点 比如,上面的栈结构push()方法不应该传入一个节点(Node),而是传入元素值(int的value)。 好了,扯了这么多,让我们来看看java中的AtomicStampedReference是怎么解决ABA的吧^^ ## 源码分析 ### 内部类 ```java private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } ``` 将元素值和版本号绑定在一起,存储在Pair的reference和stamp(邮票、戳的意思)中。 ### 属性 ```java private volatile Pair<V> pair; private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class); ``` 声明一个Pair类型的变量并使用Unsfae获取其偏移量,存储到pairOffset中。 ### 构造方法 ```java public AtomicStampedReference(V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); } ``` 构造方法需要传入初始值及初始版本号。 ### compareAndSet()方法 ```java public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { // 获取当前的(元素值,版本号)对 Pair<V> current = pair; return // 引用没变 expectedReference == current.reference && // 版本号没变 expectedStamp == current.stamp && // 新引用等于旧引用 ((newReference == current.reference && // 新版本号等于旧版本号 newStamp == current.stamp) || // 构造新的Pair对象并CAS更新 casPair(current, Pair.of(newReference, newStamp))); } private boolean casPair(Pair<V> cmp, Pair<V> val) { // 调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用 return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); } ``` (1)如果元素值和版本号都没有变化,并且和新的也相同,返回true; (2)如果元素值和版本号都没有变化,并且和新的不完全相同,就构造一个新的Pair对象并执行CAS更新pair。 可以看到,java中的实现跟我们上面讲的ABA的解决方法是一致的。 首先,使用版本号控制; 其次,不重复使用节点(Pair)的引用,每次都新建一个新的Pair来作为CAS比较的对象,而不是复用旧的; 最后,外部传入元素值及版本号,而不是节点(Pair)的引用。 ## 案例 让我们来使用AtomicStampedReference解决开篇那个AtomicInteger带来的ABA问题。 ```java public class ABATest { public static void main(String[] args) { testStamp(); } private static void testStamp() { AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1); new Thread(()->{ int[] stampHolder = new int[1]; int value = atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; System.out.println("thread 1 read value: " + value + ", stamp: " + stamp); // 阻塞1s LockSupport.parkNanos(1000000000L); if (atomicStampedReference.compareAndSet(value, 3, stamp, stamp + 1)) { System.out.println("thread 1 update from " + value + " to 3"); } else { System.out.println("thread 1 update fail!"); } }).start(); new Thread(()->{ int[] stampHolder = new int[1]; int value = atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; System.out.println("thread 2 read value: " + value + ", stamp: " + stamp); if (atomicStampedReference.compareAndSet(value, 2, stamp, stamp + 1)) { System.out.println("thread 2 update from " + value + " to 2"); // do sth value = atomicStampedReference.get(stampHolder); stamp = stampHolder[0]; System.out.println("thread 2 read value: " + value + ", stamp: " + stamp); if (atomicStampedReference.compareAndSet(value, 1, stamp, stamp + 1)) { System.out.println("thread 2 update from " + value + " to 1"); } } }).start(); } } ``` 运行结果为: ```java thread 1 read value: 1, stamp: 1 thread 2 read value: 1, stamp: 1 thread 2 update from 1 to 2 thread 2 read value: 2, stamp: 2 thread 2 update from 2 to 1 thread 1 update fail! ``` 可以看到线程1最后更新1到3时失败了,因为这时版本号也变了,成功解决了ABA的问题。 ## 总结 (1)在多线程环境下使用无锁结构要注意ABA问题; (2)ABA的解决一般使用版本号来控制,并保证数据结构使用元素值来传递,且每次添加元素都新建节点承载元素值; (3)AtomicStampedReference内部使用Pair来存储元素值及其版本号; ## 彩蛋 (1)java中还有哪些类可以解决ABA的问题? AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,标记值有修改,了解一下。 (2)实际工作中遇到过ABA问题吗? 笔者还真遇到过,以前做棋牌游戏的时候,ABCD四个玩家,A玩家出了一张牌,然后他这个请求迟迟没到服务器,也就是超时了,服务器就帮他自动出了一张牌。 然后,转了一圈,又轮到A玩家出牌了,说巧不巧,正好这时之前那个请求到了服务器,服务器检测到现在正好是A出牌,而且请求的也是出牌,就把这张牌打出去了。 然后呢,A玩家的牌就不对了。 最后,我们是通过给每个请求增加一个序列号来处理的,检测到过期的序列号请求直接抛弃掉。 你有没有遇到过ABA问题呢? ---
331 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传