之前在项目中使用到了并发队列,场景为多写多读,查阅资料推荐使用ConcurretLinkedQueue,但不知道为什么。这里对并发队列ConcurrentLinkedQueue与LinkedBlockingQueue的源码做一个简单分析,比较一下两者差别,并测试在不同并发请求下读写的性能差异。使用的JDK版本为1.8。

使用方法很简单,该类实现了Queue接口,提供了offer()、poll()等入队和出队的操作接口。
多线程环境下的使用如下:

  1. // 无界并发队列
  2. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
  3. // 模拟n个线程竞争环境
  4. int n = 100;
  5. CountDownLatch countDownLatch = new CountDownLatch(n);
  6. for (int i = 0; i < n; i++) {
  7. int finalI = i;
  8. new Thread(()->{
  9. // 进行10000次的写操作
  10. for (int j = 0; j < 10000; j++) {
  11. queue.add(j);
  12. }
  13. // 进行10000次的读操作
  14. for (int j = 0; j < 10000; j++) {
  15. queue.poll();
  16. }
  17. // 该线程结束读写请求
  18. System.out.println("Thread-"+ finalI +"结束");
  19. countDownLatch.countDown();
  20. }).start();
  21. }
  22. // 直到所有线程结束读写
  23. countDownLatch.await();
  24. // 验证并发队列中元素是否清空
  25. System.out.println("队列已清空:"+queue.isEmpty());

输出结果如下:

  1. Thread-0结束
  2. ...........
  3. Thread-55结束
  4. 队列已清空:true

该类使用了Node类来表示队列中的节点,包含一个volatile修饰的类型为传入泛型的item成员(节点存储的值)和volatile修饰的next指针。同时引入了Unsafe组件,使用了其CAS方法来替换item和next。其中lazySetNext()方法保证了volatile的语义,该次修改对下次读是可见的。

  1. private static class Node<E> {
  2. volatile E item;
  3. volatile Node<E> next;
  4. Node(E item) {
  5. UNSAFE.putObject(this, itemOffset, item);
  6. }
  7. // CAS替换节点的值,返回是否成功
  8. boolean casItem(E cmp, E val) {
  9. return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  10. }
  11. // 给next引用赋值,这个方法保证了volatile的语义,即该修改对next读取是可见的
  12. void lazySetNext(Node<E> val) {
  13. UNSAFE.putOrderedObject(this, nextOffset, val);
  14. }
  15. // CAS替换next引用,返回是否成功
  16. boolean casNext(Node<E> cmp, Node<E> val) {
  17. return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  18. }
  19. // unsafe类引入和相关静态代码
  20. ...
  21. }

默认初始化方法如下:

  1. public ConcurrentLinkedQueue() {
  2. // 创建空的头尾节点
  3. head = tail = new Node<E>(null);
  4. }

还有一个基于已有集合的初始化方法,大致流程为:依次取出集合元素;检查是否为null;构建新节点;采用尾插法插入到链表尾部。

  1. public ConcurrentLinkedQueue(Collection<? extends E> c) {
  2. Node<E> h = null, t = null;
  3. for (E e : c) {
  4. // 检查元素是否为null
  5. checkNotNull(e);
  6. // 基于集合中的元素构建新节点
  7. Node<E> newNode = new Node<E>(e);
  8. // 第一个元素设置为头尾结点
  9. if (h == null)
  10. h = t = newNode;
  11. else { // 其余元素采用尾插法插入
  12. t.lazySetNext(newNode);
  13. t = newNode;
  14. }
  15. }
  16. // 集合为空集合时,新建值为nul的头尾节点
  17. if (h == null)
  18. h = t = new Node<E>(null);
  19. head = h;
  20. tail = t;
  21. }
  1. public boolean offer(E e) {
  2. // 确保元素非null,为null时抛出NullPointer异常
  3. checkNotNull(e);
  4. // 基于传入值构造新节点
  5. final Node<E> newNode = new Node<E>(e);
  6. // 自旋,直到入队成功
  7. for (Node<E> t = tail, p = t;;) {
  8. Node<E> q = p.next;
  9. // case1:此时p为队尾节点,q=null
  10. if (q == null) {
  11. // 通过cas的方式设置新节点为p的后继节点
  12. // 如果失败,说明此时p已不再是队尾结点,继续进行自旋
  13. // 如果成功,尝试修改tail后返回true
  14. if (p.casNext(null, newNode)) {
  15. // p != t代表此时p和第一次循环时相比已经向后移动了,此时就通过CAS的方式将tail节点修改为newNode
  16. // 失败了也没关系,代表有其他线程已经修改了tail
  17. if (p != t) // hop two nodes at a time
  18. casTail(t, newNode);
  19. return true;
  20. }
  21. }
  22. // case2:p=q,表示是删除的节点
  23. else if (p == q)
  24. // t != (t = tail) 说明t!=tail,tail节点已经更新过,此时就使用tail赋值给p,然后继续自旋
  25. // 否则说明tail没有更新过,指向出队的节点。这时就使用head赋值给p,然后继续自旋
  26. p = (t != (t = tail)) ? t : head;
  27. // case3:p不是队尾节点,也没有出队。就更新p,然后继续自旋
  28. else
  29. // case3.1:p!=t且t!=tail时,说明tail节点更新过,让p重新指向tail节点
  30. // case3.2:否则,p往后移动一位,指向q
  31. p = (p != t && t != (t = tail)) ? t : q;
  32. }
  33. }

入队的逻辑看起来比较复杂,其核心思想就是自旋+cas的方式将新节点插入到队尾节点的后面。
这里就按第一次入队和第二次入队两种情况分析一下:

  • 第一次入队

首先检查非空,然后构造新节点。
t和p都指向tail节点,q为null。此时进入case1:尝试CAS设置p.next为newNode。
成功的话,说明节点入队成功了。然后直接返回true
失败的话,说明p.next!=null,p不是队尾节点了,这时就自旋,q=p.next,然后会进入case3.2的逻辑,更新p。再次自旋,q=p.next,然后会进入case1的逻辑,然后重复上面一样的操作,直到CAS设置成功。

  • 第二次入队

首先检查非空,然后构造新节点。
tail节点指向倒数第二个节点,t和p指向tail,q指向最后一个节点。此时进入case3:,执行case3.2的逻辑,p = q。
然后自旋后,q=p.next,进入case1,然后CAS设置p.next为newNode。成功了的话,会发现p!=t,执行重置tail节点的操作,该操作失败了说明有其他线程重置了,所以也ok。之后返回true。

  1. // 将原head(h指向head节点)更新为p
  2. // 并将原head节点next指向自己,表示当前节点已经出队
  3. final void updateHead(Node<E> h, Node<E> p) {
  4. if (h != p && casHead(h, p)) // 将head通过CAS的方式更新为p
  5. h.lazySetNext(h); // 将h节点的next指向自己,表示出队
  6. }
  7. public E poll() {
  8. restartFromHead:
  9. // 大循环
  10. for (;;) {
  11. // 自旋
  12. for (Node<E> h = head, p = h, q;;) {
  13. E item = p.item;
  14. // case1:p指向节点为第一个有元素节点(实质上要出队的节点)
  15. // cas的方式设置item,失败了的话说明有其他线程将该接节点出队了,会再次自旋
  16. if (item != null && p.casItem(item, null)) {
  17. // Successful CAS is the linearization point
  18. // for item to be removed from this queue.
  19. // p!=h,表示p已经向后移动了。此时
  20. if (p != h) // hop two nodes at a time
  21. updateHead(h, ((q = p.next) != null) ? q : p);
  22. return item;
  23. }
  24. // case2:如果p的后继节点为null,表示p已经是最后一个节点,无节点可出队了
  25. else if ((q = p.next) == null) {
  26. // 更新头节点为p,然后返回null
  27. updateHead(h, p);
  28. return null;
  29. }
  30. // case3:p=q,表示p和q指向的节点已经出队,通过p和q已无法找到头节点,这时需要重新去获取head节点
  31. else if (p == q)
  32. // 回到大循环中重新开始小循环自旋
  33. continue restartFromHead;
  34. // case4:将p指向q,实质上是q往后移动一位
  35. else
  36. p = q;
  37. }
  38. }
  39. }

出队的核心思想就是找到头节点,CAS将其item设置为null。如果成功的话,就可以出队了,如果失败了,就自旋再次寻找头结点。
这里也分析一下出队执行步骤:

  • 出队

最开始的时候,head节点的item应该是null(queue初始化方法创建的节点)。第一次循环,h和p指向head节点。
如果此时队列中没有元素,会进入case2,直接更新head节点后返回null。
如果队列中有元素,会进入case4,将q向后移动,然后再次自旋,进行case1的判断。如果case1中item!=null且cas设置成功,则表示出队成功,返回出队元素。如果cas设置失败,则继续自旋寻找头结点出队。直至出队成功,同时如果p!=h,会更新下头结点。在自旋的过程中,如果当前节点已经被出队了,会进入case3,然后回到大循环重新寻找head节点。

  1. // 返回p的后继节点,如果p已经出队(next指向自身),则返回head节点
  2. final Node<E> succ(Node<E> p) {
  3. Node<E> next = p.next;
  4. // 当一个节点从队列删除后,其next指针会指向自己。此时就返回head节点
  5. return (p == next) ? head : next;
  6. }
  7. // 获取队首节点
  8. Node<E> first() {
  9. restartFromHead:
  10. for (;;) {
  11. for (Node<E> h = head, p = h, q;;) {
  12. boolean hasItem = (p.item != null);
  13. // p节点有元素,或者p节点为最后一个节点
  14. if (hasItem || (q = p.next) == null) {
  15. // 更新头结点
  16. updateHead(h, p);
  17. // p节点有元素返回p,无元素代表p是最后一个节点,返回null
  18. return hasItem ? p : null;
  19. }
  20. // 如果p已经出队,重新回到大循环
  21. else if (p == q)
  22. continue restartFromHead;
  23. // p向后移动一位
  24. else
  25. p = q;
  26. }
  27. }
  28. }
  29. public int size() {
  30. int count = 0;
  31. // 获取首元素后,遍历后继节点的数量
  32. for (Node<E> p = first(); p != null; p = succ(p))
  33. if (p.item != null)
  34. // Collection.size() spec says to max out
  35. if (++count == Integer.MAX_VALUE)
  36. break;
  37. return count;
  38. }

可以看到计算的大小不是非常准确的,从获取到首节点开始后,一直遍历到尾结点。期间增加的节点都能被统计进入,出队的节点则不计入数量。所以计算的数量>=计算完成时刻的实际数量。

LinkedBlockingQueue实现了Queue接口,也提供了offer和poll等方法。同时也提供了put和带时间参数的offer和pool方法。简单示例如下:

  1. // 无界并发队列
  2. LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
  3. // 插入一个元素,容量满时会失败
  4. queue.offer(1);
  5. // 插入一个元素,容量满时最多等待2s
  6. queue.offer(2, 2, TimeUnit.SECONDS);
  7. // 插入一个元素,容量满时会一直等待,直到能够入队
  8. queue.put(3);
  9. // 取出一个元素,无元素时返回null
  10. queue.poll();
  11. // 取出一个元素,无元素时最多等待2s
  12. queue.poll(2, TimeUnit.SECONDS);

使用了Node节点存储元素,不过没有UNSAFE组件,没有CAS操作。后面也可以看到,使用了可重入锁(独占锁),所以不需要考虑多线程同时修改属性的情况。

  1. static class Node<E> {
  2. E item;
  3. Node<E> next;
  4. Node(E x) { item = x; }
  5. }

使用了head和last表示队列的头部和尾部节点,使用了入队锁和出队锁两个锁来实现同一时刻只有一个元素入队,同一时刻只有一个元素出队。使用了AotomicInteger类来表示队列中的元素个数。

  1. transient Node<E> head;
  2. private transient Node<E> last;
  3. private final int capacity;
  4. private final ReentrantLock takeLock = new ReentrantLock();
  5. /** Wait queue for waiting takes */
  6. private final Condition notEmpty = takeLock.newCondition();
  7. /** Lock held by put, offer, etc */
  8. private final ReentrantLock putLock = new ReentrantLock();
  9. /** Wait queue for waiting puts */
  10. private final Condition notFull = putLock.newCondition();

默认初始化方法,设置容量为Integer.MAX_VALUE

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }
  4. public LinkedBlockingQueue(int capacity) {
  5. if (capacity <= 0) throw new IllegalArgumentException();
  6. this.capacity = capacity;
  7. last = head = new Node<E>(null);
  8. }

还有一个基于已有集合的初始化方法,大致思路为:

1.加上putLock入队锁;
2.遍历集合的所有元素,然后依次添加到队列中。
3.解锁。

由于使用了ReentrantLock,同一时刻只有单个线程入队,所以不用考虑并发问题。新增一个节点,然后将该节点添加到last节点后,最后更新last节点即可。
offer方法源码解析如下:需要注意,当入队时容量达到最大容量,会入队失败。

  1. public boolean offer(E e) {
  2. if (e == null) throw new NullPointerException();
  3. final AtomicInteger count = this.count;
  4. // 当前容量已满时,直接返回false
  5. if (count.get() == capacity)
  6. return false;
  7. int c = -1;
  8. // 构建新节点
  9. Node<E> node = new Node<E>(e);
  10. final ReentrantLock putLock = this.putLock;
  11. // 入队锁加锁,已经被其它线程加锁时,当前线程会park挂起
  12. putLock.lock();
  13. try {
  14. // 只有当前元素个数<capacity,才能入队
  15. if (count.get() < capacity) {
  16. // 执行入队操作
  17. enqueue(node);
  18. // count数量+1
  19. c = count.getAndIncrement();
  20. // 如果当前元素个数<capacity,表示还可以继续入队
  21. if (c + 1 < capacity)
  22. // 唤醒一个在notFull的条件等待队列中的线程
  23. notFull.signal();
  24. }
  25. } finally {
  26. // 入队锁解锁
  27. putLock.unlock();
  28. }
  29. // 如果此时元素数量为1,表示可以出队
  30. if (c == 0)
  31. // 唤醒一个在notEmpty的条件等待队列中的线程
  32. signalNotEmpty();
  33. // c>=表示入队成功,返回true,反之入队失败,返回false
  34. return c >= 0;
  35. }
  36. // 节点入队,加到队尾节点,然后更新last
  37. private void enqueue(Node<E> node) {
  38. // assert putLock.isHeldByCurrentThread();
  39. // assert last.next == null;
  40. last = last.next = node;
  41. }

put方法相对于offer方法,多了一个等待逻辑,当元素数量达到最大容量时,会一直等待,直到能够入队。

  1. putLock.lockInterruptibly();
  2. try {
  3. // 多了一个等待的过程
  4. // 如果容量已满,当前线程park并进入notFull的条件等待队列
  5. while (count.get() == capacity) {
  6. notFull.await();
  7. }
  8. enqueue(node);
  9. c = count.getAndIncrement();
  10. if (c + 1 < capacity)
  11. notFull.signal();
  12. } finally {
  13. putLock.unlock();
  14. }

同一时刻只有单个线程出队,所以不用考虑并发问题。
offer方法源码解析如下:需要注意,当入队时容量达到最大容量,会入队失败。

  1. public E poll() {
  2. final AtomicInteger count = this.count;
  3. if (count.get() == 0)
  4. return null;
  5. E x = null;
  6. int c = -1;
  7. final ReentrantLock takeLock = this.takeLock;
  8. // 出队锁加锁
  9. takeLock.lock();
  10. try {
  11. // 只有数量>0时才能出队
  12. if (count.get() > 0) {
  13. // 执行出队操作
  14. x = dequeue();
  15. // 容器数量-1
  16. c = count.getAndDecrement();
  17. // 当容器数量>=1时,唤醒notEmpty条件队列中等待的一个线程
  18. if (c > 1)
  19. notEmpty.signal();
  20. }
  21. } finally {
  22. // 出队锁释放
  23. takeLock.unlock();
  24. }
  25. // 表示当前数量<capacity时,容器未满,唤醒notFull条件队列中等待的一个线程
  26. if (c == capacity)
  27. signalNotFull();
  28. return x;
  29. }
  30. // 节点出队操作
  31. private E dequeue() {
  32. // 获取队首节点以及下一个节点(队首节点值都是null,下一个节点才是真正有元素的节点)
  33. Node<E> h = head;
  34. Node<E> first = h.next;
  35. // h节点next指向自身,表示出队
  36. h.next = h;
  37. // 更新head节点
  38. head = first;
  39. // 返回第一个实际节点的值并重置为null(head节点的item都是null)
  40. E x = first.item;
  41. first.item = null;
  42. return x;
  43. }

take方法相比于poll,多了一个等待逻辑,当元素数量=0时,会一直等待,直到能够入队。

  1. takeLock.lockInterruptibly();
  2. try {
  3. // 多了一个等待的过程
  4. // 如果数量=0,当前线程park并进入notEmpty的条件等待队列
  5. while (count.get() == 0) {
  6. notEmpty.await();
  7. }
  8. x = dequeue();
  9. c = count.getAndDecrement();
  10. if (c > 1)
  11. notEmpty.signal();
  12. } finally {
  13. takeLock.unlock();
  14. }

直接获取原子变量capacity的值即可。由于入队和出队对数量大小的修改都是原子的,所以获取的数量大小是十分准确的,为当前时刻容器元素数量。

  1. public int size() {
  2. return count.get();
  3. }

通过之前的介绍,可以发现

  1. ConcurrentLinkedQueue是一个无界队列,最大长度为Integer.MAX_VALUE;LinkedBlockingQueue是一个有界队列(不设置长度时为Integer.MAX_VALUE),在达到最大容量后添加元素有可能会失败(使用offer方法入队会失败,put方法入队会一直等待)。
  2. ConcurrentLinkedQueue全程是没有线程阻塞的,通过自旋+CAS的方式入队和出队(不达目的不罢休);而LinkedBlockingQueue同一时刻只能有一个线程执行入队操作或出队操作,通过入队锁和出队锁实现(ReentrantLock+Condition)。

ConcurrentLinkedQueue全程是无锁的,而LinkedBlockingQueue多线程出入队时会有挂起和唤醒线程的操作,会进行线程的上下文切换,相对来说更耗时。
这里设置了几组不同的线程数量和并发读取次数,来测试各自的完成时间,每组数据测试5次,取平均数据。使用了同一台机器(4核CPU)进行测试。
代码设计如下:

  1. // 无界并发队列
  2. ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
  3. long startTime = System.currentTimeMillis();
  4. // 模拟n个线程竞争环境,各自完成m次插入和查找操作,计算最终完成时间
  5. int n = 10;
  6. // 读写次数
  7. int m = 10000;
  8. // 线程执行完成的计数器
  9. CountDownLatch countDownLatch = new CountDownLatch(n);
  10. // 控制所有线程同时运行
  11. CyclicBarrier cyclicBarrier = new CyclicBarrier(n);
  12. for (int i = 0; i < n; i++) {
  13. int finalI = i;
  14. new Thread(()->{
  15. // 等待信号量的改变
  16. try {
  17. cyclicBarrier.await();
  18. } catch (InterruptedException | BrokenBarrierException e) {
  19. e.printStackTrace();
  20. }
  21. // 进行100000次的写操作
  22. for (int j = 0; j < m; j++) {
  23. queue.add(j);
  24. }
  25. // 进行1000000次的读操作
  26. for (int j = 0; j < m; j++) {
  27. queue.poll();
  28. }
  29. // 该线程结束读写请求
  30. System.out.println("Thread-"+ finalI +"结束");
  31. countDownLatch.countDown();
  32. }).start();
  33. }
  34. // 直到所有线程结束读写,计算时间
  35. countDownLatch.await();
  36. long endTime = System.currentTimeMillis();
  37. long costTime = endTime - startTime;
  38. System.out.println("所用时间:" + costTime + "ms");
  39. // 验证并发队列中元素是否清空
  40. System.out.println("队列已清空:"+queue.isEmpty());
  41. 该次运行结果:
  42. Thread-9结束
  43. ...
  44. Thread-8结束
  45. 所用时间:78ms
  46. 队列已清空:true

最终测试得到结果:
LinkedBlockingQueue测试结果(ms):

线程数量\读取次数 10000 50000 100000
10 94 125 187
50 167 800 3109
100 266 1332 6168
200 503 5374 11365

ConcurrentLinkedQueue测试结果(ms):

线程数量\读取次数 10000 50000 100000
10 78 156 249
50 172 594 1375
100 250 828 3343
200 437 1656 6300

可以发现,在线程数量较少时,两者的消耗时长差不多。当线程数量比较多,并且短时间内的读写请求数量较大时,ConcurrentLinkedQueue消耗时间明显更少。

https://zhuanlan.zhihu.com/p/224964810

版权声明:本文为ningbing原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ningbing/p/15086007.html