您好,欢迎来到纷纭教育。
搜索
您的当前位置:首页原子操作类使用以及源码分析

原子操作类使用以及源码分析

来源:纷纭教育


atomic是什么?

  • ①. atomic是原子类,主要有如下:

        

  • ②. 阿里巴巴Java开发手册中说明:

 

1、 基本类型原子类(AtomicInteger、AtomicBoolean、AtomicLong)

①. 常用API简介

 

 

②. AtomicInteger解决 i++ 多线程下不安全问题

package com.bilibili.juc.atomics;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyNumber
{
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus()
    {
        atomicInteger.getAndIncrement();
    }
}


/**
 * @auther zzyy
 * @create 2022-02-25 21:59
 */
public class AtomicIntegerDemo
{
    public static final int SIZE = 50;

    public static void main(String[] args) throws InterruptedException
    {
        MyNumber myNumber = new MyNumber();
        
        for (int i = 1; i <=SIZE; i++) {
            new Thread(() -> {
               
                    for (int j = 1; j <=1000; j++) {
                        myNumber.addPlusPlus();
                    }
               
            },String.valueOf(i)).start();
        }
        //等待上面50个线程全部计算完成后,再去获得最终值
        //如果不加上下面的停顿2秒的时间,会导致还没有进行i++ 50000次main线程就已经结束了

        //暂停几秒钟线程
      try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
      
        System.out.println(Thread.currentThread().getName()+"\t"+"result: "+myNumber.atomicInteger.get());
    }
}

注意:

使用 TimeUnit.SECONDS.sleep 让主线程进行阻塞 ,在生产环境中及其不规范 使用

CountDownLatch 来解决这个问题

package com.bilibili.juc.atomics;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyNumber
{
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus()
    {
        atomicInteger.getAndIncrement();
    }
}


/**
 * @auther zzyy
 * @create 2022-02-25 21:59
 */
public class AtomicIntegerDemo
{
    public static final int SIZE = 50;

    public static void main(String[] args) throws InterruptedException
    {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);

        for (int i = 1; i <=SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=1000; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }
        
        countDownLatch.await();

        System.out.println(Thread.currentThread().getName()+"\t"+"result: "+myNumber.atomicInteger.get());
    }
}

③. AtomicBoolean可以作为中断标识停止线程的方式

//线程中断机制的实现方法
public class AtomicBooleanDemo {
    public static void main(String[] args) {
        AtomicBoolean atomicBoolean=new AtomicBoolean(false);

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t"+"coming.....");
            while(!atomicBoolean.get()){
                System.out.println("==========");
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"over.....");
        },"A").start();

        new Thread(()->{
            atomicBoolean.set(true);
        },"B").start();
    }
}

④. AtomicLong

其底层是CAS+自旋锁的思想,适用于低并发的全局计算,高并发后性能急剧下降,原因如下:N个线程CAS操作修改线程的值,每次只有一个成功过,其他N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu资源消耗更高了(AtomicLong的自旋会成为瓶颈)
在高并发的情况下,我们使用LoadAdder
 

2、数组类型原子类 (AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray)

数组类型原子类,主要有三个AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
 

public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
        //(1). 创建一个新的AtomicIntegerArray,其长度与从给定数组复制的所有元素相同。
        int[]arr2={1,2,3,4,5};
        AtomicIntegerArray array=new AtomicIntegerArray(arr2);
        //(2). 创建给定长度的新AtomicIntegerArray,所有元素最初为零。
        //AtomicIntegerArray array=new AtomicIntegerArray(5);

        for (int i = 0; i < arr.length; i++) {
            System.out.print(arr[i]);
        }
        System.out.println();
        System.out.println("=======");
        array.getAndSet(0,1111);
        System.out.println("============");
        System.out.println("将数字中位置为0位置上的元素改为:"+array.get(0));
        System.out.println("数组位置为1位置上的旧值是:"+array.get(1));
        System.out.println("将数组位置为1位置上的数字进行加1的处理");
        array.getAndIncrement(1);
        System.out.println("数组位置为1位置上的新值是:"+array.get(1));
    }
}

3、引用类型原子类 (AtomicReference、AtomicStampedReference、AtomicMarkableReference)


①. 引用类型原子类主要有三个: AtomicReference、AtomicStampedReference、AtomicMark ableReference

②. 使用AtomicReference来实现自旋锁案例

package com.bilibili.juc.cas;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
 
/**
 * 题目:实现一个自旋锁,复习CAS思想
 * 自旋锁好处:循环比较获取没有类似wait的阻塞。
 *
 * 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
 * 当前有线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到。
 */
public class SpinLockDemo
{
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
 
    public void lock()
    {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
        while (!atomicReference.compareAndSet(null, thread)) {
 
        }
    }
 
    public void unLock()
    {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread,null);
        System.out.println(Thread.currentThread().getName()+"\t"+"----task over,unLock...");
    }
 
    public static void main(String[] args)
    {
        SpinLockDemo spinLockDemo = new SpinLockDemo();
 
        new Thread(() -> {
            spinLockDemo.lock();
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
            spinLockDemo.unLock();
        },"A").start();
 
        //暂停500毫秒,线程A先于B启动
        try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
 
        new Thread(() -> {
            spinLockDemo.lock();
 
            spinLockDemo.unLock();
        },"B").start();
 
 
    }
}


 

③. AtomicStampedReference 解决ABA问题

什么是ABA

一个线程先读取共享内存数据值A,随后因某种原因,线程暂时挂起,同时另一个线程临时将共享内存数据值先改为B,随后又改回为A。随后挂起线程恢复,并通过CAS比较,最终比较结果将会无变化。这样会通过检查,这就是ABA问题。 在CAS比较前会读取原始数据,随后进行原子CAS操作。这个间隙之间由于并发操作,最终可能会带来问题。
 

/**
 * Description: ABA问题的解决
 *
 * @author TANGZHI
 * @date 2021-03-26 21:30
 **/
public class ABADemo {
    private static AtomicReference<Integer> atomicReference=new AtomicReference<>(100);
    private static AtomicStampedReference<Integer> stampedReference=new AtomicStampedReference<>(100,1);
    public static void main(String[] args) {
        System.out.println("===以下是ABA问题的产生===");
                /****
                **/
        new Thread(()->{
            atomicReference.compareAndSet(100,101);
            atomicReference.compareAndSet(101,100);
        },"t1").start();

        new Thread(()->{
            //先暂停1秒 保证完成ABA
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(atomicReference.compareAndSet(100, 2019)+"\t"+atomicReference.get());
        },"t2").start();
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }


        System.out.println("===以下是ABA问题的解决===");
          /****
                **/

        new Thread(()->{
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t 第1次版本号"+stamp+"\t值是"+stampedReference.getReference());
            //暂停1秒钟t3线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

            stampedReference.compareAndSet(100,101,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t 第2次版本号"+stampedReference.getStamp()+"\t值是"+stampedReference.getReference());
            stampedReference.compareAndSet(101,100,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t 第3次版本号"+stampedReference.getStamp()+"\t值是"+stampedReference.getReference());
        },"t3").start();

        new Thread(()->{
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t 第1次版本号"+stamp+"\t值是"+stampedReference.getReference());
            //保证线程3完成1次ABA
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean result = stampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName()+"\t 修改成功否"+result+"\t最新版本号"+stampedReference.getStamp());
            System.out.println("最新的值\t"+stampedReference.getReference());
        },"t4").start();
    }

④. AtomicMarkableReference 不建议用它解决ABA问题

  1. 原子更新带有标志位的引用类型对象
  2. 解决是否修改(它的定义就是将状态戳简化位true|false),类似一次性筷子
  3. 状态戳(true/false)原子引用
  4. 不建议用它解决ABA问题

package com.bilibili.juc.atomics;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;

/**
 * @auther zzyy
 * @create 2022-02-26 10:57
 */
public class AtomicMarkableReferenceDemo
{
    static AtomicMarkableReference markableReference = new AtomicMarkableReference(100,false);

    public static void main(String[] args)
    {
        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName()+"\t"+"默认标识:"+marked);
            //暂停1秒钟线程,等待后面的T2线程和我拿到一样的模式flag标识,都是false
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            markableReference.compareAndSet(100,1000,marked,!marked);
        },"t1").start();

        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName()+"\t"+"默认标识:"+marked);

            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean b = markableReference.compareAndSet(100, 2000, marked, !marked);
            System.out.println(Thread.currentThread().getName()+"\t"+"t2线程CASresult: "+b);
            System.out.println(Thread.currentThread().getName()+"\t"+markableReference.isMarked());
            System.out.println(Thread.currentThread().getName()+"\t"+markableReference.getReference());
        },"t2").start();
    }
}

/**
 *  CAS----Unsafe----do while+ABA---AtomicStampedReference,AtomicMarkableReference
 *
 *  AtomicStampedReference,version号,+1;
 *
 *  AtomicMarkableReference,一次,false,true
 *
 */

 AtomicStampedReference和AtomicMarkableReference区别

  1. stamped – version number 版本号,修改一次+1
  2. Markable – true、false 是否修改过

4. 对象的属性修改原子类 (AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater)

①. 使用目的:以一种线程安全的方式操作非线程安全对象内的某些字段
(是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,已达到精确加锁+节约内存的目的)

②. 使用要求

更新的对象属性必须使用public volatile修饰符

因为对象的属性修改,类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

AtomicIntegerFieldUpdater:原子更新对象中int类型字段的值

按照以前操作资源类 一般添加synchronized

代码如下、

package com.bilibili.juc.atomics;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class BankAccount//资源类
{
    String bankName = "CCB";

    //更新的对象属性必须使用 public volatile 修饰符。
    public  int money = 0;//钱

    public synchronized void add()
    {
        money++;
    }



}

/**
 * @auther zzyy
 * 以一种线程安全的方式操作非线程安全对象的某些字段。
 *
 * 需求:
 * 10个线程,
 * 每个线程转账1000,
 * 不使用synchronized,尝试使用AtomicIntegerFieldUpdater来实现。
 */
public class AtomicIntegerFieldUpdaterDemo
{
    public static void main(String[] args) throws InterruptedException
    {
        BankAccount bankAccount = new BankAccount();
    //设置栅栏是为了防止循环还没结束就执行main线程输出自增的变量,导致误以为线程不安全
        CountDownLatch countDownLatch = new CountDownLatch(10);

        for (int i = 1; i <=10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=1000; j++) {
                        bankAccount.add();
                      
                    }
                } finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName()+"\t"+"result: "+bankAccount.money);
    }
}

不在在使用synchronized实现线程安全 而使用 AtomicIntegerFieldUpdater 来实现线程安全

按照官方文档 和规范 修改如下

package com.bilibili.juc.atomics;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class BankAccount//资源类
{
    String bankName = "CCB";

    //更新的对象属性必须使用 public volatile 修饰符。
    public volatile int money = 0;//钱数



    //因为对象的属性修改类型,原子类都是抽象类,所以每次使用都必须
    // 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
    AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
            AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");

    //不加synchronized,保证高性能原子性,局部微创小手术
    public void transMoney(BankAccount bankAccount)
    {
        fieldUpdater.getAndIncrement(bankAccount);
    }


}

/**
 * @auther zzyy
 * 以一种线程安全的方式操作非线程安全对象的某些字段。
 *
 * 需求:
 * 10个线程,
 * 每个线程转账1000,
 * 不使用synchronized,尝试使用AtomicIntegerFieldUpdater来实现。
 */
public class AtomicIntegerFieldUpdaterDemo
{
    public static void main(String[] args) throws InterruptedException
    {
        BankAccount bankAccount = new BankAccount();
 //设置栅栏是为了防止循环还没结束就执行main线程输出自增的变量,导致误以为线程不安全
        CountDownLatch countDownLatch = new CountDownLatch(10);

        for (int i = 1; i <=10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=1000; j++) {

                        bankAccount.transMoney(bankAccount);
                    }
                } finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName()+"\t"+"result: "+bankAccount.money);
    }
}

 AtomicIntegerFieldUpdater与AtomicInteger使用引发的思考

代码如下

思考 

通过码我们不难得知使用AtomicIntegerFieldUpdater与AtomicInteger其实效果是一致的,那既然已经存在了AtomicInteger为什么又要有一个AtomicIntegerFieldUpdater呢?

package com.bilibili.juc;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class Candidate {
    int id;

    volatile int score = 0;

    AtomicInteger score2 = new AtomicInteger();


    public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
            AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    public static AtomicInteger realScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final Candidate candidate = new Candidate();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <=10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=1000; j++) {
                        candidate.score2.incrementAndGet();
                        scoreUpdater.incrementAndGet(candidate);
                        realScore.incrementAndGet();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
        System.out.println("AtomicInteger Score=" + candidate.score2.get());
        System.out.println("realScore=" + realScore.get());

    }
}


总结

1、从代码中我们不难发现,通过AtomicIntegerFieldUpdater更新score我们获取最后的int值时相较于AtomicInteger来说不需要调用get()方法


2、对于代码中AtomicIntegerFieldUpdater是static final类型也就是说即使创建了100个对象AtomicIntegerFieldUpdater也只存在一个不会占用对象的内存,但是AtomicInteger会创建多个AtomicInteger对象,占用的内存比AtomicIntegerFieldUpdater大,

所以对于熟悉dubbo源码的人都知道,dubbo有个实现轮询负载均衡策略的类AtomicPositiveInteger用的就是AtomicIntegerFieldUpdate,在netty底层大量使用了这个类

AtomicReferenceFieldUpdater:原子更新引用类型字段的值

package com.bilibili.juc.atomics;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

class MyVar //资源类
{
    public volatile Boolean isInit = Boolean.FALSE;

    AtomicReferenceFieldUpdater<MyVar,Boolean> referenceFieldUpdater =
            AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit");

    public void init(MyVar myVar)
    {
        if (referenceFieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE))
        {
            System.out.println(Thread.currentThread().getName()+"\t"+"----- start init,need 2 seconds");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+"----- over init");
        }else{
            System.out.println(Thread.currentThread().getName()+"\t"+"----- 已经有线程在进行初始化工作。。。。。");
        }
    }
}


/**
 * @auther zzyy
 * 需求:
 * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,
 * 要求只能被初始化一次,只有一个线程操作成功
 */
public class AtomicReferenceFieldUpdaterDemo
{
    public static void main(String[] args)
    {
        MyVar myVar = new MyVar();

        for (int i = 1; i <=5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            },String.valueOf(i)).start();
        }
    }
}

5、原子操作增强类(DoubleAccumulator 、DoubleAdder 、LongAccumulator 、LongAdder)

 

 常用API

入门讲解(LongAdder|LongAccumulator区别)

  1. LongAdder只能用来计算加法、减法,且从零开始计算
  2. LongAccumulator提供了自定义的函数操作

public class LongAdderDemo {
    public static void main(String[] args) {
        // LongAdder只能做加减法,不能做乘除法
        LongAdder longAdder=new LongAdder();
        longAdder.increment();
        longAdder.increment();
        longAdder.increment();
        longAdder.decrement();

        System.out.println(longAdder.longValue());
        System.out.println("========");
        //LongAccumulator​(LongBinaryOperator accumulatorFunction, long identity)
        //LongAccumulator longAccumulator=new LongAccumulator((x,y)->x+y,0);
        LongAccumulator longAccumulator=new LongAccumulator(new LongBinaryOperator() {
            @Override
            public long applyAsLong(long left, long right) {
                return left+right;
            }
        },0);
        longAccumulator.accumulate(1);
        longAccumulator.accumulate(3);
        System.out.println(longAccumulator.longValue());
    }
}

使用各种技术来实现i++  高并发下的 效率对比

package com.bilibili.juc.atomics;


import javax.lang.model.element.VariableElement;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

class ClickNumber //资源类
{
    int number = 0;
    public synchronized void clickBySynchronized()
    {
        number++;
    }

    AtomicLong atomicLong = new AtomicLong(0);
    public void clickByAtomicLong()
    {
        atomicLong.getAndIncrement();
    }

    LongAdder longAdder = new LongAdder();
    public void clickByLongAdder()
    {
        longAdder.increment();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x + y,0);
    public void clickByLongAccumulator()
    {
        longAccumulator.accumulate(1);
    }

}

/**
 * @auther zzyy
 * 需求: 50个线程,每个线程100W次,总点赞数出来
 */
public class AccumulatorCompareDemo
{
    public static final int _1W = 10000;
    public static final int threadNumber = 50;

    public static void main(String[] args) throws InterruptedException
    {
        ClickNumber clickNumber = new ClickNumber();
        long startTime;
        long endTime;

        CountDownLatch countDownLatch1 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch2 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch3 = new CountDownLatch(threadNumber);
        CountDownLatch countDownLatch4 = new CountDownLatch(threadNumber);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickBySynchronized();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickBySynchronized: "+clickNumber.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickByAtomicLong();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByAtomicLong: "+clickNumber.atomicLong.get());


        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickByLongAdder();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAdder: "+clickNumber.longAdder.sum());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.clickByLongAccumulator();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t clickByLongAccumulator: "+clickNumber.longAccumulator.get());

    }
}



 AtomicLong和LongAdder的分别

LongAdder源码分析

LongAdder是Striped的子类、架构图

LongAdder的引入、原理、能否代替AtomicLong

 Cell 底层也是CAS 思想

1、  我们知道,AtomicLong是利用底层的CAS操作来提供并发性的,比如addAndGet方法:

2、下面方法调用了Unsafe类的getAndAddLong方法,该方法是一个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。(也即乐观锁的实现模式)
并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N个线程同时进行自旋操作,N-1个线程失败,导致CPU打满场景,此时AtomicLong的自旋会成为瓶颈


3、这就是LongAdder引入的初衷------解决高并发环境下AtomictLong的自旋瓶颈问题

LongAdder底层实现原理 (分散热点

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果,这就是 分散热点思想

 LongAdder能否替代AtomicLong

1、从下面的图中可以看到,LongAdder的API和AtomicLong的API还是有比较大的差异,而且AtomicLong提供的功能更丰富,尤其是addAndGet、decrementAndGet、compareAndSet这些方法。addAndGet、decrementAndGet除了单纯的做自增自减外,还可以立即获取增减后的值,而LongAdder则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,则使用AtomicLong比较合适;


2、低并发、一般的业务尝尽下AtomicLong(数据准确)是足够了,如果并发量很多,存在大量写多读少的情况,那LongAdder(数据最终一致性,不保证强一致性)可能更合适
 

AtomicLong api

jdk 1.9

jdk1.8 

LongAdder  api

总结 :分散热点

 

Striped类

Striped有几个比较重要的成员函数

	//CPU数量,即Cells数组的最大长度
	static final int NCPU = Runtime.getRuntime().availableProcessors();
	//存放Cell的hash表,大小为2的幂
	//这里的Cell是Striped的内部类
	transient volatile Cell[] cells;
	/*
	1.在开始没有竞争的情况下,将累加值累加到base;
	2.在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base
	*/
	transient volatile long base;
	/*
	cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,
	防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:
	(1). cells数组初始化的时候;
    (2). cells数组扩容的时候;
    (3).如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

	*/
	transient volatile int cellsBusy;	

Striped中一些变量或者方法的定义

Cell:是java.util.concurrent.atomic下Striped下的一个内部类

 源码解析 longAdder.increment( )

先看总结 分散热点思想

①. add(1L)

  • ①. 最初无竞争时,直接通过casBase进行更新base的处理

  • ②. 如果更新base失败后,首次新建一个Cell[ ]数组(默认长度是2)

  • ③. 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容

    LongAdder.java
	public void add(long x) {
		//as是striped中的cells数组属性
		//b是striped中的base属性
		//v是当前线程hash到的cell中存储的值
		//m是cells的长度减1,hash时作为掩码使用
		//a时当前线程hash到的cell
        Cell[] as; long b, v; int m; Cell a;
		/**
		首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
		如果成功则下面代码不执行 ,但只是适用于低并发,但如果处于高并时,则cas可能更新失败时,    才会走到if中
		条件1:cells不为空,说明出现过竞争,cell[]已创建
		条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
		*/
        if ((as = cells) != null || !casBase(b = base, b + x)) {
			//true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容
            boolean uncontended = true;
			/*
			条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true
				  会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2
			条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,
			条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
				  a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理
			条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
				  (如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true)
			**/
            if (as == null || (m = as.length - 1) < 0 ||
			    //getProbe( )方法返回的时线程中的threadLocalRandomProbe字段
				//它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
				//调用Striped中的方法处理
                longAccumulate(x, null, uncontended);
        }

逐条解析add()方法源码

((as = cells) != null || !casBase(b = base, b + x)

    首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
     如果成功则下面代码不执行 ,但只是适用于低并发,但如果处于高并时,则cas可能               更新失败时,    才会走到if中

        条件1:cells不为空,说明出现过竞争,cell[]已创建
        条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争

boolean uncontended = true;   

true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容

条件1:

as == null

cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true

会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2

条件2:

(m = as.length - 1) < 0

默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,

条件3:

a = as[getProbe() & m]) == null

当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
  a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理

getProbe()

getProbe( )方法返回的时线程中的threadLocalRandomProbe字段

它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)

条件4

!(uncontended = a.cas(v = a.value, v + x))

更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容(处于高并发时)

(如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true,此时是处于低并发时候)

 精简版即总结 add()源码分析

②. longAccumulate(x, null, uncontended)

 uncontended 关键这个 标识

前提知识

 

源代码

 final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

逐条解析longAccumulate()源代码

  • ①. 线程hash值:probe

	//存储线程的probe值
	int h;
	//如果getProbe()方法返回0,说明随机数未初始化
	if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
		//使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
		ThreadLocalRandom.current(); // force initialization
		//重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
		h = getProbe();
		//重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈
		//wasUncontended竞争状态为true
		wasUncontended = true;
	}

    int h;


   存储线程的probe值

    (h = getProbe()) == 0

   如果getProbe()方法返回0,说明随机数未初始化
 

 

ThreadLocalRandom.current(); // force initialization

使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化

     h = getProbe();

重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
  

 wasUncontended = true;       

 重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈
    wasUncontended竞争状
态为true

2、 longAccumulate()方法里最重要的 for 循环里 3大判断

   boolean collide = false;                // True if last slot nonempty
        for (; ; ) {
            Cell[] as;
            Cell a;
            int n;
            long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //.....
            } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                //.....
            } else if (casBase(v = base,
                    ((fn == null) ?
                            Double.doubleToRawLongBits
                                    (Double.longBitsToDouble(v) + x) :
                            Double.doubleToRawLongBits
                                    (fn.applyAsDouble
                                            (Double.longBitsToDouble(v), x))))) {
                //... 
            }
   
        }

 boolean collide = false;  

如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向

判断1:

(as = cells) != null && (n = as.length) > 0

 CASE1:cells已经初始化了

判断2:

cellsBusy == 0 && cells == as && casCellsBusy()

CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组

cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁


cells == as    这个也就是 首次创建时  cells == as  ==null  是成立的


     casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true
 

判断3:

casBase(v = base,
                    ((fn == null) ?
                            Double.doubleToRawLongBits
                                    (Double.longBitsToDouble(v) + x) :
                            Double.doubleToRawLongBits
                                    (fn.applyAsDouble
                                            (Double.longBitsToDouble(v), x))))

CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作

这种情况是cell中都CAS失败了,有一个兜底的方法

该分支实现直接操作base基数,将值累加到base上,也即其他线程正在初始化,多个线程正在更新base的值

总结一下 这个for循环里的3个大条件判断

 

3、 longAccumulate()方法里最重要的 for 循环里 的第二个判断  

我们看的顺序 就 从 判断2 看起 也就是 初始化 Cell 数组 然后判断3 最后 判断1

源代码

 else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }

cellsBusy == 0 && cells == as && casCellsBusy()

CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组

cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁


 cells == as: 这个也就是 首次创建时  cells == as  ==null  是成立的


 casCellsBusy(): 通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true

boolean init = false;

是否初始化的标记

if (cells == as) {}

前面else if中进行了判断,这里再次判断,采用双端检索的机制   如果不进行双端检索就会在 new cell 数组,将上个线程对应数组里的值篡改

Cell[] rs = new Cell[2];

如果上面条件都执行成功就会执行数组的初始化及赋值操作创建一个大小为2的Cell 数组

rs[h & 1] = new Cell(x);
  cells = rs;

rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1 也就是 LongAdder.add( long x) 传过来的值

      h & 1: 类似于我们之前hashmap常用到的计算散列桶index的算法,

                通常都是hash&(table.len-1),同hashmap一个意思
                看这次的value是落在0还是1  也就是

                与0相与,全为0;与1相与,相同为1,不同为0   

                 其目的: 这样可以增加哈希碰撞的几率,从而提高查询效率

 4、 longAccumulate()方法里最重要的 for 循环里 的第三个判断 即兜底(多个线程尝试CAS修改失败的线程会走这个分支)  

 else if (casBase(v = base, ((fn == null) ? v + x :
                                fn.applyAsLong(v, x))))
        break;     

 CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作


  这种情况是cell中都CAS失败了,有一个兜底的方法
  
 该分支实现直接操作base基数,将值累加到base上,
    也即其他线程正在初始化,多个线程正在更新base的值

总结  longAccumulate()方法里 第二个和第三个判断

CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组

 CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作

5、 longAccumulate()方法里最重要的 for 循环里 的第一个判断(Cell数组不再为空且可能存在Cell数组扩容 即 Cell已被初始化

源代码

            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }

逐条解析源代码

我们先来看看 这个大判断 里第一个小判断 

            if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }

(a = as[(n - 1) & h]) == null

当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用

cellsBusy == 0

Cell[]数组没有正在扩容

Cell r = new Cell(x);

先创建一个Cell

cellsBusy == 0 && casCellsBusy()

采用双端检测 判断 并尝试加锁,加锁后cellsBusy=1

boolean created = false;

创建是否结束 即 退出循环标识

                Cell[] rs; int m, j;
                        if ((rs = cells) != null &&
                            (m = rs.length) > 0 &&
                            rs[j = (m - 1) & h] == null) {
                            rs[j] = r;
                            created = true;
                        }

  判断Cell 数组中该下标元素是否null, 并将创建的cell赋值到Cell[]数组上 并修改 退出循环标识
                       

finally { cellsBusy = 0; }

释放锁

if (created)
    break;

退出循环

continue

表名在 Cell 数组中该下标存在元素 即 插槽现在非空


collide = false;

我在在来看看 这个大判断 里第二个小判断  

  else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true; // Continue after rehash

wasUncontended表示cells初始化后,当前线程竞争修改失败

wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true, 紧接着执行advanceProbe(h)重置当前线程的hash,重新循环

CAS 已知道会失败

重新哈希后继续 

我在在来看看 这个大判断 里第三个小判断  

else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;

说明当前线程对应的数组中有了数据,也重置过hash值

这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环

我在在来看看 这个大判断 里第四个小判断  

               else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试

扩容标识 collide设置为false,标识永远不会再扩容

我在在来看看 这个大判断 里第五个小判断

  

 else if (!collide)
                    collide = true;

如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环

我在在来看看 这个大判断 里第六个小判断

 else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }

(cellsBusy == 0 && casCellsBusy()

锁状态为0并且将锁状态修改为1(持有锁)

if (cells == as) {   扩展表,除非陈旧   // Expand table unless stale
按位左移1位来操作,扩容大小为之前容量的两倍
    Cell[] rs = new Cell[n << 1];
    for (int i = 0; i < n; ++i)
扩容后将之前数组的元素拷贝到新数组中
        rs[i] = as[i];
    cells = rs;
}

finally { cellsBusy = 0; }

释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行

collide = false;

continue; //使用扩展表重试

这个大判断里最后调用的方法

advanceProbe()

总结longAccumulate()方法源码总结

3、LongAdder 里sum()源码分析

源码

 public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

​​​​​​​​​​​​​​ 

  • ①. sum( )会将所有Cell数组中的value和base累加作为返回值

  • ②. 核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点

为啥高并发下LongAdder 里sum()的值不精确

1、sum执行时,并没有对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性,它是最终一致性的


2、首先,最终返回的sum局部变量,初始被赋值为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致


3、其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果
 

Striped源码分析 即l)方法分析

Striped.java
/**
	1.LongAdder继承了Striped类,来实现累加功能,它是实现高并发累加的工具类
	2.Striped的设计核心思路就是通过内部的分散计算来避免竞争
	3.Striped内部包含一个base和一个Cell[] cells数组,又叫hash表
	4.没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,
	会将要累加的数累加到Cells数组中的某个cell元素里面
*/
abstract class Striped extends Number {
	//CPU数量,即Cells数组的最大长度
	static final int NCPU = Runtime.getRuntime().availableProcessors();
	//存放Cell的hash表,大小为2的幂
	transient volatile Cell[] cells;
	/*
	1.在开始没有竞争的情况下,将累加值累加到base;
	2.在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base
	*/
	transient volatile long base;
	/*
	cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,
	防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:
	(1). cells数组初始化的时候;
    (2). cells数组扩容的时候;
    (3).如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

	*/
	transient volatile int cellsBusy;
	
	//低并发状态,还没有新建cell数组且写入进入base,刚好够用
	//base罩得住,不用上cell数组
	final boolean casBase(long cmp, long val) {
		//当前对象,在base位置上,将base(类似于AtomicLong中全局的value值),将base=0(cmp)改为1(value)
		return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
	}
	
	final void longAccumulate(long x, LongBinaryOperator fn,
							  boolean wasUncontended) {
		//存储线程的probe值
		int h;
		//如果getProbe()方法返回0,说明随机数未初始化
		if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
			//使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
			ThreadLocalRandom.current(); // force initialization
			//重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
			h = getProbe();
			//重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,wasUncontended竞争状态为true
			wasUncontended = true;
		}
		//如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向
		boolean collide = false;                // True if last slot nonempty
		for (;;) {
			Cell[] as; Cell a; int n; long v;
			if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
			    // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
				if ((a = as[(n - 1) & h]) == null) {
					//Cell[]数组没有正在扩容
					if (cellsBusy == 0) {       // Try to attach new Cell
						//先创建一个Cell
						Cell r = new Cell(x);   // Optimistically create
						//尝试加锁,加锁后cellsBusy=1
						if (cellsBusy == 0 && casCellsBusy()) { 
							boolean created = false;
							try {               // Recheck under lock
								Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
								//在有锁的情况下再检测一遍之前的判断 
								if ((rs = cells) != null &&
									(m = rs.length) > 0 &&
									rs[j = (m - 1) & h] == null) {
									rs[j] = r;
									created = true;
								}
							} finally {
								cellsBusy = 0;//释放锁
							}
							if (created)
								break;
							continue;           // Slot is now non-empty
						}
					}
					collide = false;
				}
				/**
				wasUncontended表示cells初始化后,当前线程竞争修改失败
				wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
				紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
				*/
				else if (!wasUncontended)       // CAS already known to fail
					wasUncontended = true;      // Continue after rehash
				//说明当前线程对应的数组中有了数据,也重置过hash值
				//这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
				else if (a.cas(v = a.value, ((fn == null) ? v + x :
											 fn.applyAsLong(v, x))))
					break;
				//如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
				else if (n >= NCPU || cells != as)
					collide = false;    //扩容标识设置为false,标识永远不会再扩容
				//如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
				else if (!collide) 
					collide = true;
				//锁状态为0并且将锁状态修改为1(持有锁) 
				else if (cellsBusy == 0 && casCellsBusy()) { 
					try {
						if (cells == as) {      // Expand table unless stale
							//按位左移1位来操作,扩容大小为之前容量的两倍
							Cell[] rs = new Cell[n << 1];
							for (int i = 0; i < n; ++i)
								//扩容后将之前数组的元素拷贝到新数组中
								rs[i] = as[i];
							cells = rs; 
						}
					} finally {
						//释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
						cellsBusy = 0;
					}
					collide = false;
					continue;                   // Retry with expanded table
				}
				h = advanceProbe(h);
			}
			//CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
			/*
			cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
			cells == as == null  是成立的
			casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true
			**/
			else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
			    //是否初始化的标记
				boolean init = false;
				try {                           // Initialize table(新建cells)
					// 前面else if中进行了判断,这里再次判断,采用双端检索的机制
					if (cells == as) {
						//如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
						Cell[] rs = new Cell[2];
						//rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
						//h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,通常都是hash&(table.len-1),同hashmap一个意思
						rs[h & 1] = new Cell(x);
						cells = rs;
						init = true;
					}
				} finally {
					cellsBusy = 0;
				}
				if (init)
					break;
			}
			//CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
			//这种情况是cell中都CAS失败了,有一个兜底的方法
			//该分支实现直接操作base基数,将值累加到base上,也即其他线程正在初始化,多个线程正在更新base的值
			else if (casBase(v = base, ((fn == null) ? v + x :
										fn.applyAsLong(v, x))))
				break;                          // Fall back on using base
		}
	}
	
	static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
}

LongAdder 里add()源码分析

    LongAdder.java
	(1).baseOK,直接通过casBase进行处理
	(2).base不够用了,开始新建一个cell数组,初始值为2
    (3).当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容
	public void add(long x) {
		//as是striped中的cells数组属性
		//b是striped中的base属性
		//v是当前线程hash到的cell中存储的值
		//m是cells的长度减1,hash时作为掩码使用
		//a时当前线程hash到的cell
        Cell[] as; long b, v; int m; Cell a;
		/**
		首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
		且只有当cas失败时,才会走到if中
		条件1:cells不为空,说明出现过竞争,cell[]已创建
		条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
		*/
        if ((as = cells) != null || !casBase(b = base, b + x)) {
			//true无竞争 fasle表示竞争激烈,多个线程hash到同一个cell,可能要扩容
            boolean uncontended = true;
			/*
			条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true
				  会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2
			条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,
			条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
				  a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理
			条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
				  (如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true)
			**/
            if (as == null || (m = as.length - 1) < 0 ||
			    //getProbe( )方法返回的时线程中的threadLocalRandomProbe字段
				//它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
				//调用Striped中的方法处理
                longAccumulate(x, null, uncontended);
        }
    }
	
	Striped.java
	abstract class Striped extends Number {
		static final int NCPU = Runtime.getRuntime().availableProcessors();
		transient volatile Cell[] cells;
		transient volatile long base;
		transient volatile int cellsBusy;
		//低并发状态,还没有新建cell数组且写入进入base,刚好够用
		//base罩得住,不用上cell数组
		final boolean casBase(long cmp, long val) {
			//当前对象,在base位置上,将base(类似于AtomicLong中全局的value值),将base=0(cmp)改为1(value)
			return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
		}
	}

总结AtomicLong和LongAdder区别

精简版

参考博客

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- fenyunshixun.cn 版权所有 湘ICP备2023022495号-9

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务