Java并发基础-并发工具类(二)
并发工具类
本系列文章主要讲解Java
并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:
系列文章列表:
本文主要以实例讲解Semaphore
、阻塞队列等内容。
Semaphore
基本概念和用途
Semaphore
常称信号量,其维护了一个许可集,可以用来控制线程并发数。线程调用acquire()
方法去或者许可证,然后执行相关任务,任务完成后,调用release()
方法释放该许可证,让其他阻塞的线程可以运行。Semaphore
可以用于流量控制,尤其是一些公共资源有限的场景,比如数据库连接。假设我们上面的账户余额管理中的账户修改操作涉及到去更改mysql
数据库,为了避免数据库并发太大,我们进行相关限制。
常用方法Semaphore(int permits)
:构造方法,初始化许可证数量void acquire()
:获取许可证void release()
:释放许可证int availablePermits()
:返回此信号量中当前可用的许可证数。int getQueueLength()
:返回正在等待获取许可证的线程数。boolean hasQueuedThreads()
:是否有线程正在等待获取许可证。void reducePermits(int reduction)
:减少reduction个许可证。是个protected方法。Collection getQueuedThreads()
:返回所有等待获取许可证的线程集合。是个protected方法。
运行示例
虽然在代码中设置了20
个线程去运行,但同时设置了许可证的数量为5
,因而实际的最大并发数还是5
。
package com.aidodoo.java.concurrent;
import java.util.concurrent.*;
/**
* Created by zhangkh on 2018/9/9.
*/
public class SemaphoreDemo {
public static void main(String[] args){
Semaphore semaphore=new Semaphore(5);
ExecutorService executorService = Executors.newFixedThreadPool(20);
Account account=new Account();
for(int i=0;i<20;i++){
SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore);
executorService.submit(spender);
}
executorService.shutdown();
}
}
class SpenderWithSemaphore implements Runnable {
private final Account account;
private final Semaphore semaphore;
public SpenderWithSemaphore(Account account, Semaphore semaphore) {
this.account = account;
this.semaphore = semaphore;
}
@Override
public void run() {
try{
semaphore.acquire();
System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
// System.out.println(String.format("%s release a premit",Thread.currentThread().getName()));
semaphore.release();
}
}
}
获取许可证后,模拟操作mysql
,我们让线程睡眠2
秒,程序输出如下:
pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql
可以看到前面5
个线程同一时间1536480858
获得许可证,然后执行操作,并不是20
个线程一起操作,这样能降低对mysql
数据库的影响。
如果把上面Semaphore
的构造方法中的许可证数量改为20
,大家可以看到20
个线程的运行时间基本一致。
源码实现
Semaphore
实现直接基于AQS
,有公平和非公平两种模式。公平模式即按照调用acquire()
的顺序依次获得许可证,遵循FIFO
(先进先出),非公平模式是抢占式的,谁先抢到先使用。
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取许可证acquire()
方法最终调用父类AQS
中的acquireSharedInterruptibly
方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //(1)
doAcquireSharedInterruptibly(arg); //(2)
}
(1):调用tryAcquireShared
,尝试去获取许可证
(2):如果获取失败,则调用doAcquireSharedInterruptibly
,将线程加入到等待队列中tryAcquireShared
方法由Semaphore
的内部类,同时也是AQS
的子类去实现,即NonfairSync
和FairSync
,下面我们以NonfairSync
为例说明其实现。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
而nonfairTryAcquireShared
方法如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //(1)
int remaining = available - acquires; //(2)
if (remaining < 0 ||
compareAndSetState(available, remaining)) (3)
return remaining;
}
}
(1):获取state
的值,也就是总许可证数量
(2):计算本次申请后,剩余的许可证数量
(3):如果剩余的许可证数量大于0
且通过CAS
将state
的值修改成功后,返回剩余的许可证数量,否则继续循环阻塞。
释放许可证release()
方法的调用最终会调用父类AQS
的releaseShared()
方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //(1)
doReleaseShared(); //(2)
return true;
}
return false;
}
(1):尝试释放许可证
(2):如果释放许可证成功,则通知阻塞的线程,让其执行tryReleaseShared
方法很简单,基本上是nonfairTryAcquireShared
的逆过程,即增加许可证的数量,并通过CAS
修改state
的值。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
BlockingQueue
基本概念
阻塞队列主要是解决如何高效安全传输数据的问题,此外能降低程序耦合度,让代码逻辑更加清晰。
其继承了Queue
,并在其基础上支持了两个附加的操作:
- 当队列为空时,获取元素的线程会阻塞,等待队列变为非空
- 当队列满时,添加元素的线程会阻塞,等待队列可用
比较典型的使用场景是生产者和消费者。BlockingQueue
根据对于不能立即满足但可能在将来某一时刻可以满足的操作,提供了不同的处理方法,进而导致众多的api
操作:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek()} | not applicable | not applicable |
`Throws exception`:指当阻塞队列满时候,再往队列里插入元素,会抛出`IllegalStateException`异常。当队列为空时,从队列里获取元素时会抛出`NoSuchElementException`异常
`Special value`:插入方法会返回是否成功,成功则返回`true`。移除方法,则是从队列里拿出一个元素,如果没有则返回`null`
`Blocks`:当阻塞队列满时,如果生产者线程往队列里`put`元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里`take`元素,队列也会阻塞消费者线程,直到队列可用。
`Time out`:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
整体架构和类图
Java
并发包根据不同的结构和功能提供了不同的阻塞队列,整体类图如下:
其中BlockingQueue
有如下子类:
-
ArrayBlockingQueue
:一个由数组结构组成的有界阻塞队列。 -
DelayQueue
:一个使用优先级队列实现的无界阻塞队列。 -
PriorityBlockingQueue
:一个支持优先级排序的无界阻塞队列。 -
SynchronousQueue
:一个不存储元素的阻塞队列。 -
LinkedBlockingQueue
:一个由链表结构组成的有界阻塞队列。
其中BlockingDeque
有一个子类:
-
LinkedBlockingDeque
:一个由链表结构组成的双向阻塞队列。BlockingDeque
作为双端队列,针对头部元素,还提供了如下方法:
First Element (Head) | ||||
Throws exception | Special value | Blocks | Times out | |
Insert | addFirst(e) | offerFirst(e) | putFirst(e) | offerFirst(e, time, unit) |
Remove | removeFirst() | pollFirst() | takeFirst() | pollFirst(time, unit) |
Examine | getFirst() | peekFirst() | not applicable | not applicable |
针对尾部元素
Last Element (Tail) | ||||
Throws exception | Special value | Blocks | Times out | |
Insert | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
Remove | removeLast() | pollLast() | takeLast() | pollLast(time, unit) |
Examine | getLast() | peekLast() | not applicable | not applicable |
使用示例
一个典型的生产者和消费者实例如下,一个BlockingQueue
可以安全地与多个生产者和消费者一起使用,Producer
线程调用NumerGenerator
.getNextNumber()
生成自增整数,不断地写入数字,然后Consumer
循环消费。
package com.aidodoo.java.concurrent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by zhangkh on 2018/7/17.
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(1024,true);
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 5; i++) {
executorService.submit(new Producer(queue));
}
for (int i = 0; i < 3; i++) {
executorService.submit(new Consumer(queue));
}
Thread.sleep(30 * 1000L);
executorService.shutdown();
}
}
class Producer implements Runnable {
Logger logger = LoggerFactory.getLogger(Producer.class.getName());
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for(int i=0;i<3;i++){
int num = NumerGenerator.getNextNumber();
queue.put(num);
Thread.sleep(1000);
logger.info("{} producer put {}", Thread.currentThread().getName(), num);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
Logger logger = LoggerFactory.getLogger(Consumer.class.getName());
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
int ele = (int) queue.take();
logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class NumerGenerator{
private static AtomicInteger count = new AtomicInteger();
public static Integer getNextNumber(){
return count.incrementAndGet();
}
}
程序输出如下:
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15
其他BlockingQueue
子类的使用可参考对应的Java
Api
。
源码分析
由于BlockingQueue
相关的子类众多,我们仅以ArrayBlockingQueue
从源码角度分析相关实现。
构造方法ArrayBlockingQueue
中定义的成员变量如下:
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null
各变量的解释如下,以便了解后续的代码:
-
items
用于存储具体的元素 -
takeIndex
元素索引,用于记录下次获取元素的位置 -
putIndex
元素索引,用于记录下次插入元素的位置 -
count
用于记录当前队列中元素的个数 -
notEmpty
条件变量,此处为获取元素的条件,即队列不能为空,否则线程阻塞 -
notFull
条件变量,此处为插入元素的条件,即队列不能已满,否则线程阻塞 -
itrs
用于维护迭代器相关内容
内部结构如下:
构造方法如下:
public ArrayBlockingQueue(int capacity) {
this(capacity, false); //(1)
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //(2)
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); //(3)
notFull = lock.newCondition(); //(4)
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) { //(5)
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
(1):默认情况下,非公平模式,即抢占式
(2):数组初始化
(3)/(4):条件变量初始化
(5):如果构造方法中,含有初始化集合的话,则将对应元素添加到内部数组,并更改count
和putIndex
的值。
插入数据
插入数据,我们主要看put()
方法的实现,重点看生产者和消费者插入和获取数据时,线程何时阻塞,同时又何时唤醒。
public void put(E e) throws InterruptedException {
checkNotNull(e); //(1)
final ReentrantLock lock = this.lock; //(2)
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //(3)
enqueue(e);
} finally {
lock.unlock(); //(4)
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; //(5)
if (++putIndex == items.length) //(6)
putIndex = 0;
count++; //(7)
notEmpty.signal(); //(8)
}
(1
):非空检查,插入的元素不能为null
,否则抛出NullPointerException
(2
):获取互斥锁
(3
):如果当前队列的元素个数等于队列总长度,即队列已满,则通过条件变量,释放和notFull
相关的锁,当前线程阻塞。当前线程唤醒的条件如下:
- 其他某个线程调用此
Condition
的signal()
方法,并且碰巧将当前线程选为被唤醒的线程; - 或者其他某个线程调用此
Condition
的signalAll()
方法; - 或者其他某个线程中断当前线程,且支持中断线程的挂起;
- 或者发生“虚假唤醒”
(5):如果队列未满,则将元素添加的putIndex
索引的位置
(6):putIndex
增加1
后和队列长度相等,即已到达队列尾部,则putIndex
置0
(7):队列已有元素数量加1
(8):通知notEmpty
条件变量,唤醒等待获取元素的线程
(4):释放互斥锁
可以看到ArrayBlockingQueue
每次插入元素后,都会去唤醒等待获取元素的线程。
获取数据take()
方法源码如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //(1)
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //(2)
return dequeue();
} finally {
lock.unlock(); //(9)
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //(3)
items[takeIndex] = null; //(4)
if (++takeIndex == items.length)
takeIndex = 0; //(5)
count--; //(6)
if (itrs != null)
itrs.elementDequeued(); //(7)
notFull.signal(); //(8)
return x;
}
(1):获取互斥锁
(2):如果count
为0
,即队列为空,则释放互斥锁,然后挂起当前线程
(3):根据takeIndex
索引到数组中获取具体的值,并赋值给x
(4):赋值完成后,takeIndex
索引位置数据置null
,便于回收
(5):takeIndex
加1
,然后和队列长度比较,如果相等,即已经读取到队列尾部,takeIndex
置0
(6):获取后,将队列元素个数count
减1
(7):维护和queue
相关的迭代器
(8):唤醒等待插入元素的线程
(9):释放互斥锁
可以看到ArrayBlockingQueue
每次获取元素后,都会唤醒等待插入元素的线程。
迭代器
在分析源码前,我们先看在一个迭代器的示例
package com.aidodoo.java.concurrent;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created by zhangkh on 2018/9/10.
*/
public class ArrayBlockingQueueIterDemo {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<String> queue=new ArrayBlockingQueue(5);
queue.put("hadoop");
queue.put("spark");
queue.put("storm");
queue.put("flink");
Iterator<String> iterator1 = queue.iterator();
System.out.println( queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println();
while(iterator1.hasNext()) {
System.out.println(iterator1.next());
}
System.out.println();
Iterator<String> iterator2 = queue.iterator();
while(iterator2.hasNext()) {
System.out.println(iterator2.next());
}
}
}
程序输出如下:
hadoop
spark
storm
hadoop
flink
flink
我们结合这个示例来具体分析数据插入和获取时,内部成员变量的值
当分别插入hadoop
、spark
、storm
、flink
四个元素后,内部变量的值如下:
此时,ArrayBlockingQueue
的成员变量的值itrs
为null
。
调用iterator()
方法后,源码如下:
public Iterator<E> iterator() {
return new Itr(); //(1)
}
Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock(); //(2)
try {
if (count == 0) { //(3)
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex); //(4)
cursor = incCursor(takeIndex); //(5)
if (itrs == null) {
itrs = new Itrs(this); //(6)
} else {
itrs.register(this); //(7)
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock(); //(8)
}
}
(1):调用内部类Itr
的构造方法
(2):获取外部类即ArrayBlockingQueue
的锁
(3):没有没有元素,初始化变量值。内部类Itr
的成员变量如下:
/** Index to look for new nextItem; NONE at end */
private int cursor;
/** Element to be returned by next call to next(); null if none */
private E nextItem;
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;
/** Last element returned; null if none or not detached. */
private E lastItem;
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;
/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;
/** Previous value of iters.cycles */
private int prevCycles;
(4):将外部类的takeIndex
赋值给内部类nextIndex
,并获取数组具体的值赋值给nextItem
(5):计算游标cursor
的下个值,其中incCursor
方法如下:
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
(6):注册,主要是维护链表
(7):清理itrs
(8):释放外部类的互斥锁
在上面的示例中,调用iterator()
方法后,Itr
的内部变量值如下:
由于后面三次调用了queue
.take()
,依次输出hadoop
、spark
、storm
后,相关成员变量的值见图片标识,重点关注takeIndex
=3
。
当调用next()
方法时,代码如下:
public E next() {
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached()) //(1)
incorporateDequeues();
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
其中(1)处的isDetached
方法如下
boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}
由于我们示例中初始化Itr
的时候的prevTakeIndex
为0
,故isDetached
返回为false
,程序将调用incorporateDequeues
方法,根据注释我们也知道,该方法主要是调整和迭代器相关的内部索引。
/**
* Adjusts indices to incorporate all dequeues since the last
* operation on this iterator. Call only from iterating thread.
*/
private void incorporateDequeues() {
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// Check indices for invalidation
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
注意cursor
= takeIndex
这句代码,将外部内的takeIndex
赋值给cursor
,这样子将队列和迭代器数据读取进行了同步。
对于iterator1
,第一次调用next()
方法时,cursor
被赋值为3
首先将nextItem
的值保持在x
变量中,即hadoop
字符串。
然后设置nextItem
和cursor
的值
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
设置完成后,nextItem
为flink
,cursor
为-1
。
最后返回保存在x
变量中的值,即返回hadoop
字符串。
第二次调用next()
方法时,输出的值即上次保存的nextItem
值,即flink
字符串。
迭代器运行过程中,相关变量内容如下:
至于iterator2
迭代器,各位可以自己去分析,不再赘述。
本文主要以实例讲解Semaphore
、阻塞队列,并分析了相关核心源码实现。
本文参考
关于作者
爱编程、爱钻研、爱分享、爱生活
关注分布式、高并发、数据挖掘
如需捐赠,请扫码