轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

    • 核心

    • 并发

      • Java并发-线程基础与synchronized关键字
      • Java并发-重入锁ReentrantLock详解与实践
      • Java并发-信号量Semaphore
      • Java并发-读写锁ReadWriteLock
      • Java并发-倒计时器CountDownLatch
      • Java并发-栅栏CyclicBarrier
      • Java并发-LockSupport线程阻塞工具类
      • Java并发-线程池ThreadPoolExecutor
      • Java并发-阻塞队列BlockingQueue
      • Java并发-以空间换时间之ThreadLocal
      • Java并发-无锁策略CAS与atomic包
        • 一、CAS原理详解
          • 1、什么是CAS
          • 2、无锁的优势
          • 3、CAS的底层实现
          • 4、CAS的问题与限制
        • 二、atomic包详解
          • 1、原子类分类
          • 2、AtomicInteger使用示例
          • 3、源码分析
        • 三、深入理解ABA问题
          • 1、ABA问题示例
          • 2、使用AtomicStampedReference解决ABA问题
        • 四、高性能原子累加器:LongAdder
          • 1、为什么需要LongAdder
          • 2、LongAdder的设计思想
          • 3、LongAdder内部结构
          • 4、LongAdder工作原理
          • 5、性能对比示例
          • 6、伪共享问题与解决
          • 7、LongAccumulator:通用累加器
          • 8、使用场景建议
        • 五、实战案例:构建无锁计数器
        • 六、最佳实践与性能优化建议
          • 1、选择合适的原子类
          • 2、减少竞争的技巧
          • 3、合理使用自旋
          • 4、批量操作优化
        • 七、总结
      • Java并发-JDK并发容器
      • Java并发-异步调用结果之Future和CompletableFuture
      • Java并发-Fork Join框架
      • Java并发-调试与诊断
    • 经验

    • JVM

    • 企业应用

  • Spring

  • 其他语言

  • 工具

  • 后端
  • Java
  • 并发
轩辕李
2021-11-08
目录

Java并发-无锁策略CAS与atomic包

在高并发场景下,传统的锁机制虽然能保证线程安全,但可能带来性能瓶颈。今天要介绍一种无锁并发策略——Compare And Set(比较并交换),简称CAS。它通过一种乐观的比较交换技术来鉴别线程冲突,一旦检测到冲突发生,就重试当前操作直到成功为止。

# 一、CAS原理详解

# 1、什么是CAS

CAS是一种无锁算法,它包含三个操作数:

  • V(内存位置):需要读写的内存位置
  • E(预期值):期望内存中的值
  • N(新值):要写入的新值

CAS操作的逻辑是:当且仅当V的值等于E时,才将V的值设置为N,否则什么都不做。整个操作是原子的,不会被中断。

# 2、无锁的优势

相比传统锁机制,无锁策略具有以下特点:

  • 高性能:避免了线程阻塞和上下文切换的开销
  • 无死锁:不存在锁的获取和释放,天然免疫死锁问题
  • 响应时间可预测:不会因为等待锁而产生不确定的延迟
  • 更好的伸缩性:在高并发场景下性能优势更明显

# 3、CAS的底层实现

CAS底层依赖于CPU的原子指令,如x86架构的CMPXCHG指令。现代处理器都已支持这类原子指令。

在Java中,主要通过Unsafe类中的compareAndSwap系列方法来调用CAS指令:

// var1: 操作的对象
// var2: 对象中的字段偏移量
// var4: 期望值
// var5: 新值
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

# 4、CAS的问题与限制

虽然CAS很强大,但也存在一些问题:

  1. ABA问题:如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但实际上却变化了。
  2. 循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
  3. 只能保证一个共享变量的原子操作:对多个共享变量操作时,CAS无法保证操作的原子性。

# 二、atomic包详解

Java 1.5中引入了java.util.concurrent.atomic包,提供了一系列原子操作类,它们都是基于CAS实现的无锁并发工具。

# 1、原子类分类

atomic包中的类可以分为以下几类:

  1. 基本类型原子类

    • AtomicBoolean:原子布尔类型
    • AtomicInteger:原子整型
    • AtomicLong:原子长整型
  2. 数组类型原子类

    • AtomicIntegerArray:原子整型数组
    • AtomicLongArray:原子长整型数组
    • AtomicReferenceArray:原子引用数组
  3. 引用类型原子类

    • AtomicReference:原子引用
    • AtomicStampedReference:带版本号的原子引用(解决ABA问题)
    • AtomicMarkableReference:带标记的原子引用
  4. 字段更新器

    • AtomicIntegerFieldUpdater:原子更新整型字段
    • AtomicLongFieldUpdater:原子更新长整型字段
    • AtomicReferenceFieldUpdater:原子更新引用字段

# 2、AtomicInteger使用示例

以AtomicInteger为例,它提供了线程安全的整数操作:

public class AtomicIntegerExample {
    // 使用AtomicInteger保证线程安全
    private static AtomicInteger counter = new AtomicInteger(0);
    
    public static void main(String[] args) throws InterruptedException {
        // 创建多个线程并发操作
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.incrementAndGet(); // 原子递增
            }
        });
        
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.incrementAndGet();
            }
        });
        
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        
        System.out.println("Final counter value: " + counter.get()); // 结果总是2000
    }
}

AtomicInteger提供了丰富的原子操作方法:

// 常用方法示例
AtomicInteger ai = new AtomicInteger(10);

// 获取并增加
int oldValue1 = ai.getAndIncrement(); // 返回10,当前值变为11
// 增加并获取
int newValue1 = ai.incrementAndGet(); // 返回12,当前值变为12

// 获取并设置
int oldValue2 = ai.getAndSet(20); // 返回12,当前值变为20
// 比较并设置
boolean success = ai.compareAndSet(20, 30); // 期望值为20,更新为30,返回true

// 获取并累加
int oldValue3 = ai.getAndAdd(5); // 返回30,当前值变为35
// 累加并获取
int newValue2 = ai.addAndGet(5); // 返回40,当前值变为40

// Lambda表达式更新
int result = ai.updateAndGet(x -> x * 2); // 当前值变为80,返回80

# 3、源码分析

查看incrementAndGet方法的源码,可以发现它调用的是Unsafe#getAndAddInt方法:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);  // 获取当前值
    } while (!weakCompareAndSetInt(o, offset, v, v + delta)); // CAS更新
    return v;
}

这里使用了自旋CAS:如果CAS失败,就不断重试,直到成功为止。

# 三、深入理解ABA问题

# 1、ABA问题示例

AtomicReference可以保证你在修改对象引用时的线程安全性,但它存在ABA问题。来看一个具体的例子:

public class ABADemo {
    static class Node {
        public final String value;
        public Node next;
        
        public Node(String value) {
            this.value = value;
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Node nodeA = new Node("A");
        Node nodeB = new Node("B");
        AtomicReference<Node> atomicRef = new AtomicReference<>(nodeA);
        
        // 线程1:读取nodeA,准备修改为nodeC
        Thread thread1 = new Thread(() -> {
            Node oldNode = atomicRef.get();
            System.out.println("Thread1: 读取到节点 " + oldNode.value);
            
            try {
                Thread.sleep(1000); // 模拟处理时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            // 尝试CAS更新
            Node nodeC = new Node("C");
            boolean success = atomicRef.compareAndSet(oldNode, nodeC);
            System.out.println("Thread1: CAS结果 = " + success);
        });
        
        // 线程2:将nodeA改为nodeB,再改回nodeA
        Thread thread2 = new Thread(() -> {
            atomicRef.compareAndSet(nodeA, nodeB);
            System.out.println("Thread2: A -> B");
            
            Node newNodeA = new Node("A"); // 新的nodeA对象
            atomicRef.compareAndSet(nodeB, newNodeA);
            System.out.println("Thread2: B -> A (新对象)");
        });
        
        thread1.start();
        Thread.sleep(100); // 确保线程1先执行
        thread2.start();
        
        thread1.join();
        thread2.join();
    }
}

在这个例子中,线程1的CAS操作会成功,但实际上引用已经被修改过了(虽然值看起来一样)。

# 2、使用AtomicStampedReference解决ABA问题

AtomicStampedReference通过引入版本号(stamp)来解决ABA问题:

public class AtomicStampedReferenceDemo {
    public static void main(String[] args) throws InterruptedException {
        String initialRef = "A";
        int initialStamp = 0;
        
        AtomicStampedReference<String> atomicStampedRef = 
            new AtomicStampedReference<>(initialRef, initialStamp);
        
        // 线程1:尝试将A改为C
        Thread thread1 = new Thread(() -> {
            int stamp = atomicStampedRef.getStamp();
            String reference = atomicStampedRef.getReference();
            System.out.println("Thread1: 初始值=" + reference + ", stamp=" + stamp);
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            // 使用期望的stamp进行CAS
            boolean success = atomicStampedRef.compareAndSet(
                reference, "C", stamp, stamp + 1);
            System.out.println("Thread1: CAS结果=" + success);
        });
        
        // 线程2:A->B->A,但stamp会改变
        Thread thread2 = new Thread(() -> {
            int stamp = atomicStampedRef.getStamp();
            atomicStampedRef.compareAndSet("A", "B", stamp, stamp + 1);
            System.out.println("Thread2: A->B, 新stamp=" + (stamp + 1));
            
            stamp = atomicStampedRef.getStamp();
            atomicStampedRef.compareAndSet("B", "A", stamp, stamp + 1);
            System.out.println("Thread2: B->A, 新stamp=" + (stamp + 1));
        });
        
        thread1.start();
        Thread.sleep(100);
        thread2.start();
        
        thread1.join();
        thread2.join();
        
        System.out.println("最终值: " + atomicStampedRef.getReference() + 
                         ", stamp=" + atomicStampedRef.getStamp());
    }
}

在这个例子中,即使值从A变成B又变回A,由于stamp(版本号)发生了变化,线程1的CAS操作会失败,从而避免了ABA问题。

# 四、高性能原子累加器:LongAdder

# 1、为什么需要LongAdder

AtomicLong在高并发场景下存在性能问题。当多个线程同时更新同一个原子变量时,只有一个线程能成功,其他线程都会自旋重试,导致大量的CPU资源浪费在自旋上。

# 2、LongAdder的设计思想

LongAdder采用了分段累加的思想(类似于ConcurrentHashMap的分段锁):

  • 热点分离:将一个变量分解为多个变量,让不同的线程去更新不同的变量
  • 最终一致性:读取时将所有变量求和,得到最终结果
  • 动态扩容:根据竞争情况动态调整内部数组大小

# 3、LongAdder内部结构

// LongAdder的核心组成
class LongAdder extends Striped64 {
    // 继承自Striped64的主要字段:
    // base: 基础值,在没有竞争时使用
    // cells[]: Cell数组,存在竞争时使用
    // cellsBusy: 自旋锁,用于cells数组的初始化和扩容
}

# 4、LongAdder工作原理

  1. 无竞争时:直接更新base值
  2. 低竞争时:如果CAS更新base失败,创建cells数组
  3. 高竞争时:不同线程更新cells数组的不同元素,减少冲突

# 5、性能对比示例

public class LongAdderVsAtomicLong {
    private static final int THREAD_COUNT = 50;
    private static final int INCREMENT_COUNT = 1000000;
    
    public static void main(String[] args) throws InterruptedException {
        // 测试AtomicLong
        AtomicLong atomicLong = new AtomicLong(0);
        long startTime = System.currentTimeMillis();
        
        Thread[] threads1 = new Thread[THREAD_COUNT];
        for (int i = 0; i < THREAD_COUNT; i++) {
            threads1[i] = new Thread(() -> {
                for (int j = 0; j < INCREMENT_COUNT; j++) {
                    atomicLong.incrementAndGet();
                }
            });
            threads1[i].start();
        }
        
        for (Thread t : threads1) {
            t.join();
        }
        long atomicTime = System.currentTimeMillis() - startTime;
        System.out.println("AtomicLong result: " + atomicLong.get());
        System.out.println("AtomicLong time: " + atomicTime + "ms");
        
        // 测试LongAdder
        LongAdder longAdder = new LongAdder();
        startTime = System.currentTimeMillis();
        
        Thread[] threads2 = new Thread[THREAD_COUNT];
        for (int i = 0; i < THREAD_COUNT; i++) {
            threads2[i] = new Thread(() -> {
                for (int j = 0; j < INCREMENT_COUNT; j++) {
                    longAdder.increment();
                }
            });
            threads2[i].start();
        }
        
        for (Thread t : threads2) {
            t.join();
        }
        long adderTime = System.currentTimeMillis() - startTime;
        System.out.println("LongAdder result: " + longAdder.sum());
        System.out.println("LongAdder time: " + adderTime + "ms");
        
        System.out.println("性能提升: " + (atomicTime * 100.0 / adderTime - 100) + "%");
    }
}

# 6、伪共享问题与解决

LongAdder通过@Contended注解解决了伪共享问题:

// Cell类使用@Contended注解防止伪共享
@sun.misc.Contended 
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
}

要启用@Contended注解,需要添加JVM参数:-XX:-RestrictContended

# 7、LongAccumulator:通用累加器

LongAccumulator是LongAdder的通用版本,支持自定义的累加函数:

public class LongAccumulatorExample {
    public static void main(String[] args) throws InterruptedException {
        // 使用max函数作为累加器
        LongAccumulator maxAccumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
        
        // 使用自定义函数:计算所有值的乘积
        LongAccumulator productAccumulator = new LongAccumulator((x, y) -> x * y, 1);
        
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    maxAccumulator.accumulate(threadId * 100 + j);
                    productAccumulator.accumulate(2);
                }
            });
            threads[i].start();
        }
        
        for (Thread t : threads) {
            t.join();
        }
        
        System.out.println("Max value: " + maxAccumulator.get()); // 输出最大值
        System.out.println("Product: " + productAccumulator.get()); // 输出2^1000
    }
}

# 8、使用场景建议

  • LongAdder适用场景:

    • 高并发计数场景
    • 统计信息收集
    • 需要高吞吐量的累加操作
  • AtomicLong适用场景:

    • 需要精确的实时值
    • 低并发场景
    • 需要其他原子操作(如compareAndSet)

# 五、实战案例:构建无锁计数器

让我们通过一个实际案例来综合运用所学知识,构建一个高性能的分布式计数器:

public class DistributedCounter {
    // 使用LongAdder进行高性能计数
    private final LongAdder totalCount = new LongAdder();
    // 使用AtomicReference管理配置
    private final AtomicReference<CounterConfig> config = new AtomicReference<>();
    // 使用ConcurrentHashMap存储分类计数
    private final ConcurrentHashMap<String, LongAdder> categoryCounters = new ConcurrentHashMap<>();
    
    static class CounterConfig {
        final boolean enableCategory;
        final int threshold;
        
        CounterConfig(boolean enableCategory, int threshold) {
            this.enableCategory = enableCategory;
            this.threshold = threshold;
        }
    }
    
    public DistributedCounter() {
        config.set(new CounterConfig(true, 1000));
    }
    
    public void increment(String category) {
        // 总计数始终增加
        totalCount.increment();
        
        // 根据配置决定是否记录分类
        CounterConfig currentConfig = config.get();
        if (currentConfig.enableCategory) {
            categoryCounters.computeIfAbsent(category, k -> new LongAdder())
                          .increment();
        }
    }
    
    public long getTotal() {
        return totalCount.sum();
    }
    
    public Map<String, Long> getCategoryStats() {
        Map<String, Long> stats = new HashMap<>();
        categoryCounters.forEach((k, v) -> stats.put(k, v.sum()));
        return stats;
    }
    
    // 无锁更新配置
    public boolean updateConfig(boolean enableCategory, int threshold) {
        CounterConfig oldConfig = config.get();
        CounterConfig newConfig = new CounterConfig(enableCategory, threshold);
        return config.compareAndSet(oldConfig, newConfig);
    }
    
    public static void main(String[] args) throws InterruptedException {
        DistributedCounter counter = new DistributedCounter();
        int threadCount = 100;
        int operationsPerThread = 10000;
        
        Thread[] threads = new Thread[threadCount];
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                String category = "category-" + (threadId % 10);
                for (int j = 0; j < operationsPerThread; j++) {
                    counter.increment(category);
                }
            });
            threads[i].start();
        }
        
        for (Thread t : threads) {
            t.join();
        }
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("总计数: " + counter.getTotal());
        System.out.println("分类统计: " + counter.getCategoryStats());
        System.out.println("耗时: " + (endTime - startTime) + "ms");
        System.out.println("TPS: " + (threadCount * operationsPerThread * 1000L / (endTime - startTime)));
    }
}

# 六、最佳实践与性能优化建议

# 1、选择合适的原子类

  • 简单计数:使用LongAdder而不是AtomicLong
  • 对象引用:优先使用AtomicReference
  • 避免ABA:使用AtomicStampedReference或AtomicMarkableReference

# 2、减少竞争的技巧

// 不好的做法:所有线程竞争同一个原子变量
private AtomicLong counter = new AtomicLong();

// 好的做法:使用ThreadLocal减少竞争
private ThreadLocal<AtomicLong> localCounter = 
    ThreadLocal.withInitial(AtomicLong::new);

# 3、合理使用自旋

// 限制自旋次数,避免CPU空转
public boolean updateWithRetry(AtomicInteger atomic, int expect, int update) {
    int retries = 100;
    while (retries-- > 0) {
        if (atomic.compareAndSet(expect, update)) {
            return true;
        }
        // 可以加入退避策略
        Thread.yield();
    }
    return false;
}

# 4、批量操作优化

// 批量更新时,先收集再一次性更新
public void batchUpdate(LongAdder adder, List<Long> values) {
    long sum = values.stream().mapToLong(Long::longValue).sum();
    adder.add(sum); // 一次更新,减少竞争
}

# 七、总结

本文深入探讨了Java中的无锁并发策略:

  1. CAS原理:理解了比较并交换的原子操作原理及其优缺点
  2. atomic包:掌握了各种原子类的使用场景和方法
  3. ABA问题:学习了ABA问题的产生原因和解决方案
  4. LongAdder:了解了高性能累加器的设计思想和实现原理
  5. 实战应用:通过实际案例掌握了无锁编程的最佳实践

无锁编程是高性能并发编程的重要技术,从传统的悲观锁到乐观的CAS,再到分段思想的LongAdder,体现了并发编程技术的不断演进。掌握这些技术,能够帮助我们在高并发场景下构建更高效、更可靠的系统。

祝你变得更强!

编辑 (opens new window)
#CAS#AtomicInteger
上次更新: 2025/08/15
Java并发-以空间换时间之ThreadLocal
Java并发-JDK并发容器

← Java并发-以空间换时间之ThreadLocal Java并发-JDK并发容器→

最近更新
01
AI时代的编程心得
09-11
02
Claude Code与Codex的协同工作
09-01
03
Claude Code实战之供应商切换工具
08-18
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式