LinkedBlockingQueue 源码分析
LinkedBlockingQueue 源码分析
LinkedBlockingQueue
LinkedBlockingQueue 是基于链表实现的,可以选择有界或无界的阻塞队列。
队列的元素按照 FIFO 的顺序访问,新增元素添加到队列尾部,移除元素从队列头部开始。
队列头部通过 takeLock 进行并发控制,队列尾部通过 putLock 进行并发控制,
该队列最多可以有两个线程同时操作,其吞吐量要高于 ArrayBlockQueue。
创建实例
/**
* 单向链表节点
*/
static class Node<E> {
/**
* 存储的元素
*/
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞队列的容量,如果不指定,则为 Integer.MAX_VALUE */
private final int capacity;
/** 当前的元素总数 */
private final AtomicInteger count = new AtomicInteger();
/**
* 链表头结点
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* 链表尾节点
* Invariant: last.next == null
*/
private transient Node<E> last;
/** 执行读取操作时必须持有的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 队列为空时执行的 take 操作,当前线程将在此条件阻塞等待 */
private final Condition notEmpty = takeLock.newCondition();
/** 执行写入操作时必须持有的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 队列已满时执行的 put 操作,当前线程将在此条件阻塞等待 */
private final Condition notFull = putLock.newCondition();
/**
* 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 创建一个容量为 capacity 的 LinkedBlockingQueue 实例
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.capacity = capacity;
last = head = new Node<>(null);
}
写入元素
- 在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入
/**
* 在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入。
*/
@Override
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 创建新节点
final Node<E> node = new Node<>(e);
// 读取写锁
final ReentrantLock putLock = this.putLock;
// 读取元素计数值
final AtomicInteger count = this.count;
// 可中断地锁定
putLock.lockInterruptibly();
try {
// 如果当前队列已满
while (count.get() == capacity) {
// 则在非满条件上阻塞等待,线程被唤醒后进行重试
notFull.await();
}
// 加入队列尾部
enqueue(node);
// 递增元素总数
c = count.getAndIncrement();
// 如果添加元素之后还有可用空间
if (c + 1 < capacity) {
// 唤醒在非满条件上阻塞等待的线程
notFull.signal();
}
} finally {
// 释放写锁
putLock.unlock();
}
// 如果是添加的第一个元素
if (c == 0) {
// 唤醒在非空条件上阻塞等待的线程来读取元素
signalNotEmpty();
}
}
/**
* 将节点插入队列尾部
*/
private void enqueue(Node<E> node) {
last = last.next = node;
}
/**
* Signals a waiting take. Called only from put/offer
* 唤醒一个等待读取元素的线程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
- 如果队列有可用空间,则将元素添加到队列尾部,并返回 true;如果队列已满,则立即返回 false,元素被丢弃。
/**
* 如果队列有可用空间,则将元素添加到队列尾部,并返回 true;
* 如果队列已满,则立即返回 false,元素被丢弃。
*/
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
final AtomicInteger count = this.count;
// 队列已满
if (count.get() == capacity) {
// 返回 false
return false;
}
int c = -1;
// 新建节点
final Node<E> node = new Node<>(e);
// 读取写锁
final ReentrantLock putLock = this.putLock;
// 获取锁
putLock.lock();
try {
// 进行二次判断
if (count.get() < capacity) {
// 将节点加入到队列尾部
enqueue(node);
// 递增元素总个数
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
return c >= 0;
}
- 在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
/**
* 在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
*/
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
// 转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列已满
while (count.get() == capacity) {
// 已经超时则直接返回 false
if (nanos <= 0L) {
return false;
}
// 最多阻塞等待指定的超时时间后再次尝试添加元素
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<>(e));
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
return true;
}
读取元素
- 如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
/**
* 如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
* created by ZXD at 6 Dec 2018 T 20:20:15
* @return
* @throws InterruptedException
*/
@Override
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空
while (count.get() == 0) {
// 在非空条件上阻塞等待,线程被唤醒后再次尝试读取
notEmpty.await();
}
// 移除队列头部元素
x = dequeue();
// 递减总数
c = count.getAndDecrement();
// 如果还有元素可用
if (c > 1) {
// 唤醒在非空条件上阻塞等待的线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果有可用空间
if (c == capacity) {
// 唤醒在非满条件上阻塞等待的线程来插入元素
signalNotFull();
}
return x;
}
/**
* 移除并返回头部节点元素
*/
private E dequeue() {
// 读取头节点
final Node<E> h = head;
// 读取后继节点
final Node<E> first = h.next;
// 清除旧头结点
h.next = h; // help GC
// 写入新头节点
head = first;
// 读取元素值
final E x = first.item;
// 清除头结点的元素值,它只作为一个标记节点
first.item = null;
// 返回元素值
return x;
}
- 如果队列为空格,则直接返回 null,否则尝试移除并返回头部元素
/**
* 如果队列为空格,则直接返回 null,否则尝试移除并返回头部元素
* created by ZXD at 6 Dec 2018 T 20:23:51
* @return
*/
@Override
public E poll() {
final AtomicInteger count = this.count;
// 队列为空时直接返回 null
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 有元素可用
if (count.get() > 0) {
// 移除并返回头部元素
x = dequeue();
c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
}
} finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
- 尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
/**
* 尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
* created by ZXD at 6 Dec 2018 T 20:27:27
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空
while (count.get() == 0) {
// 已经超时则直接返回 null
if (nanos <= 0L) {
return null;
}
// 在非空条件上阻塞等待指定的纳秒数,被唤醒后再次进行读取
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
版权声明:本文为zhuxudong原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。