面试官:好了,聊完了ArrayBlockingQueue,我们接着说说LinkedBlockingQueue

Hydra:还真是不给人喘口气的机会,LinkedBlockingQueue是一个基于链表的阻塞队列,内部是由节点Node构成,每个被加入队列的元素都会被封装成下面的Node节点,并且节点中有指向下一个元素的指针:

  1. static class Node<E> {
  2. E item;
  3. Node<E> next;
  4. Node(E x) { item = x; }
  5. }

LinkedBlockingQueue中的关键属性有下面这些:

  1. private final int capacity;//队列容量
  2. private final AtomicInteger count = new AtomicInteger();//队列中元素数量
  3. transient Node<E> head;//头节点
  4. private transient Node<E> last;//尾节点
  5. //出队锁
  6. private final ReentrantLock takeLock = new ReentrantLock();
  7. //出队的等待条件对象
  8. private final Condition notEmpty = takeLock.newCondition();
  9. //入队锁
  10. private final ReentrantLock putLock = new ReentrantLock();
  11. //入队的等待条件对象
  12. private final Condition notFull = putLock.newCondition();

构造函数分为指定队列长度和不指定队列长度两种,不指定时队列最大长度是int的最大值。当然了,你要是真存这么多的元素,很有可能会引起内存溢出:

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }
  4. public LinkedBlockingQueue(int capacity) {
  5. if (capacity <= 0) throw new IllegalArgumentException();
  6. this.capacity = capacity;
  7. last = head = new Node<E>(null);
  8. }

还有另一种在初始化时就可以将集合作为参数传入的构造方法,实现非常好理解,只是循环调用了后面会讲到的enqueue入队方法,这里暂且略过。

LinkedBlockingQueue中,队列的头节点head是不存元素的,它的itemnullnext指向的元素才是真正的第一个元素,它也有两个用于阻塞等待的Condition条件对象。与之前的ArrayBlockingQueue不同,这里出队和入队使用了不同的锁takeLockputLock。队列的结构是这样的:

面试官:为什么要使用两把锁,之前ArrayBlockingQueue使用一把锁,不是也可以保证线程的安全么?

Hydra:使用两把锁,可以保证元素的插入和删除并不互斥,从而能够同时进行,达到提高吞吐量的的效果

面试官:嗯,那还是老规矩,先说插入方法是怎么实现的吧

Hydra:这次就不提父类AbstractQueueadd方法了,反正它调用的也是子类的插入方法offer,我们就直接来看offer方法的源码:

  1. public boolean offer(E e) {
  2. if (e == null) throw new NullPointerException();
  3. final AtomicInteger count = this.count;//队列中元素个数
  4. if (count.get() == capacity)//已满
  5. return false;
  6. int c = -1;
  7. Node<E> node = new Node<E>(e);
  8. final ReentrantLock putLock = this.putLock;
  9. putLock.lock();
  10. try {
  11. //并发情况,再次判断队列是否已满
  12. if (count.get() < capacity) {
  13. enqueue(node);
  14. //注意这里获取的是未添加元素前的对列长度
  15. c = count.getAndIncrement();
  16. if (c + 1 < capacity)//未满
  17. notFull.signal();
  18. }
  19. } finally {
  20. putLock.unlock();
  21. }
  22. if (c == 0)
  23. signalNotEmpty();
  24. return c >= 0;
  25. }

offer方法中,首先判断队列是否已满,未满情况下将元素封装成Node对象,尝试获取插入锁,在获取锁后会再进行一次队列已满判断,如果已满则直接释放锁。在持有锁且队列未满的情况下,调用enqueue入队方法。

enqueue方法的实现也非常的简单,将当前尾节点的next指针指向新节点,再把last指向新节点:

  1. private void enqueue(Node<E> node) {
  2. last = last.next = node;
  3. }

画一张图,方便你理解:

在完成入队后,判断队列是否已满,如果未满则调用notFull.signal(),唤醒等待将元素插入队列的线程。

面试官:我记得在ArrayBlockingQueue里插入元素后,是调用的notEmpty.signal(),怎么这里还不一样了?

Hydra:说到这,就不得不再提一下使用两把锁来分别控制插入和获取元素的好处了。在ArrayBlockingQueue中,使用了同一把锁对入队和出队进行控制,那么如果在插入元素后再唤醒插入线程,那么很有可能等待获取元素的线程就一直得不到唤醒,造成等待时间过长。

而在LinkedBlockingQueue中,分别使用了入队锁putLock和出队锁takeLock,插入线程和获取线程是不会互斥的。所以插入线程可以在这里不断的唤醒其他的插入线程,而无需担心是否会使获取线程等待时间过长,从而在一定程度上提高了吞吐量。当然了,因为offer方法是非阻塞的,并不会挂起阻塞线程,所以这里唤醒的是阻塞插入的put方法的线程。

面试官:那接着往下看,为什么要在c等于0的情况下才去唤醒notEmpty中的等待获取元素的线程?

Hydra:其实获取元素的方法和上面插入元素的方法是一个模式的,只要有一个获取线程在执行方法,那么就会不断的通过notEmpty.signal()唤醒其他的获取线程。只有当c等于0时,才证明之前队列中已经没有元素,这时候获取线程才可能会被阻塞,在这个时候才需要被唤醒。上面的这些可以用一张图来说明:

由于我们之前说过,队列中的head节点可以认为是不存储数据的标志性节点,所以可以简单的认为出队时直接取出第二个节点,当然这个过程不是非常的严谨,我会在后面讲解出队的过程中再进行补充说明。

面试官:那么阻塞方法put和它有什么区别?

Hydra:putoffer方法整体思路一致,不同的是加锁是使用的是可被中断的方式,并且当队列中元素已满时,将线程加入notFull等待队列中进行等待,代码中体现在:

  1. while (count.get() == capacity) {
  2. notFull.await();
  3. }

这个过程体现在上面那张图的notFull等待队列中的元素上,就不重复说明了。另外,和put方法比较类似的,还有一个携带等待时间参数的offer方法,可以进行有限时间内的阻塞添加,当超时后放弃插入元素,我们只看和offer方法不同部分的代码:

  1. public boolean offer(E e, long timeout, TimeUnit unit){
  2. ...
  3. long nanos = unit.toNanos(timeout);//转换为纳秒
  4. ...
  5. while (count.get() == capacity) {
  6. if (nanos <= 0)
  7. return false;
  8. nanos = notFull.awaitNanos(nanos);
  9. }
  10. enqueue(new Node<E>(e));
  11. ...
  12. }

awaitNanos方法在await方法的基础上,增加了超时跳出的机制,会在循环中计算是否到达预设的超时时间。如果在到达超时时间前被唤醒,那么会返回超时时间减去已经消耗的时间。无论是被其他线程唤醒返回,还是到达指定的超时时间返回,只要方法返回值小于等于0,那么就认为它已经超时,最终直接返回false结束。

面试官:费这么大顿功夫才把插入讲明白,我先喝口水,你接着说获取元素方法

Hydra:……那先看非阻塞的poll方法

  1. public E poll() {
  2. final AtomicInteger count = this.count;
  3. if (count.get() == 0)//队列为空
  4. return null;
  5. E x = null;
  6. int c = -1;
  7. final ReentrantLock takeLock = this.takeLock;
  8. takeLock.lock();
  9. try {
  10. if (count.get() > 0) {//队列非空
  11. x = dequeue();
  12. //出队前队列长队
  13. c = count.getAndDecrement();
  14. if (c > 1)
  15. notEmpty.signal();
  16. }
  17. } finally {
  18. takeLock.unlock();
  19. }
  20. if (c == capacity)
  21. signalNotFull();
  22. return x;
  23. }

出队的逻辑和入队的非常相似,当队列非空时就执行dequeue进行出队操作,完成出队后如果队列仍然非空,那么唤醒等待队列中挂起的获取元素的线程。并且当出队前的元素数量等于队列长度时,在出队后唤醒等待队列上的添加线程。

出队方法dequeue的源码如下:

  1. private E dequeue() {
  2. Node<E> h = head;
  3. Node<E> first = h.next;
  4. h.next = h; // help GC
  5. head = first;
  6. E x = first.item;
  7. first.item = null;
  8. return x;
  9. }

之前提到过,头节点head并不存储数据,它的下一个节点才是真正意义上的第一个节点。在出队操作中,先得到头节点的下一个节点first节点,将当前头节点的next指针指向自己,代码中有一个简单的注释是help gc,个人理解这里是为了降低gc中的引用计数,方便它更早被回收。之后再将新的头节点指向first,并返回清空为null前的内容。使用图来表示是这样的:

面试官:(看看手表)take方法的整体逻辑也差不多,能简单概括一下吗

Hydra:阻塞方法take方法和poll的思路基本一致,是一个可以被中断的阻塞获取方法,在队列为空时,会挂起当前线程,将它添加到条件对象notEmpty的等待队列中,等待其他线程唤醒。

面试官:再给你一句话的时间,总结一下它和ArrayBlockingQueue的异同,我要下班回家了

Hydra:好吧,我总结一下,有下面几点:

  • 队列长度不同,ArrayBlockingQueue创建时需指定长度并且不可修改,而LinkedBlockingQueue可以指定也可以不指定长度
  • 存储方式不同,ArrayBlockingQueue使用数组,而LinkedBlockingQueue使用Node节点的链表
  • ArrayBlockingQueue使用一把锁来控制元素的插入和移除,而LinkedBlockingQueue将入队锁和出队锁分离,提高了并发性能
  • ArrayBlockingQueue采用数组存储元素,因此在插入和移除过程中不需要生成额外对象,LinkedBlockingQueue会生成新的Node节点,对gc会有影响

面试官:明天上午9点,老地方,我们再聊聊别的

Hydra:……

如果文章对您有所帮助,欢迎关注公众号 码农参上

版权声明:本文为trunks2008原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/trunks2008/p/14803016.html