一.semaphore信号量,底层也是基于AQS
/**
* 可以理解为控制某个资源最多有多少个线程同时执行,(比如洗手间,并行与排队)
* 如果满了只能等待直到其它资源释放(可以理解为并发量控制)
* @author Binglong
* @date 2018-11-12
*/
public class SemaphoreUtils {
public static void main(String[] args) {
final int SH_SIZE = 10;
Semaphore semaphore = new Semaphore(SH_SIZE);
final int TH_NUM = 20;
for (int i = 0; i < TH_NUM; i++) {
ThreadPoolUtils.getSingle().threadPoolDo(new TaskSemaphore(semaphore));
}
}
}
class TaskSemaphore implements Runnable {
private Semaphore semaphore;
TaskSemaphore(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void run() {
String threadName = Thread.currentThread().getName();
try {
this.semaphore.acquire();
System.out.println(threadName + “:occupy…”);
Thread.sleep(new Random().nextInt(10000));
System.out.println(threadName + “:over…”);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
二、CyclicBarrier
1.构造方法
//等待parties个线程后,先完成barrierAction的run方法,其它线程继续执行
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//只是做等待parties个线程
public CyclicBarrier(int parties) {
this(parties, null);
}
2.重要方法
a.wait()方法,当调用wait()方法的线程数量,达到CyclicBarrier构造方法的N时,(CyclicBarrier在构造方法的Runnable barrierAction,方法完成后,当前线程继续执行)
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public static void main(String[] args) {
final int N = 5;
final CyclicBarrier cyclic = new CyclicBarrier(N, new Runnable() {
public void run() {
try {
System.out.println(“汇总计算开始”);
Thread.sleep(Math.abs(10));
System.out.println(“汇总计算完成”);
} catch (Exception e) {
e.printStackTrace();
}
}
});
for (int i = 0; i < N; i++) {
final int t = i;
new Thread(new Runnable() {
public void run() {
try {
System.out.println(t + “中心数据已计算开始”);
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
System.out.println(t + “中心数据已计算结束”);
cyclic.await();
System.out.println(t + “中心数据退出”);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
0中心数据已计算开始
3中心数据已计算开始
4中心数据已计算开始
2中心数据已计算开始
4中心数据已计算结束
1中心数据已计算开始
1中心数据已计算结束
3中心数据已计算结束
2中心数据已计算结束
0中心数据已计算结束
汇总计算开始
汇总计算完成
0中心数据退出
1中心数据退出
4中心数据退出
2中心数据退出
3中心数据退出
b.getParties()获取CyclicBarrier等待的线程数,也就是CyclicBarrier构造方法参数parties的值
c.getNumberWaiting() how many thread wait now
d.rest()
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
三、CountDownLatch
1.使用
三个方法
CountDownLatch(int count):构造器中的计数值(count)。
void await() :会一直阻塞当前线程,直到计时器的值为0
void countDown():计数减一
/**
* countDownLatch.countDown()调用一次减一,到0时,其它await方法继续往下执行
* 可以做并发开关(把SIZE设置为1,通过主线程来countDown(),其它线程都调用await()方法)
* @author Binglong
* @date 2018-11-12
*/
public class CountDownLatchUtils {
public static void main(String[] args) throws InterruptedException {
final int SIZE = 20;
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (int i = 0; i < SIZE; i++) {
ThreadPoolUtils.getSingle().threadPoolDo(new TaskCountDownLatch(countDownLatch));
}
System.out.println(“waiting…..”);
// Thread.sleep(10000);
// countDownLatch.countDown();
}
}
class TaskCountDownLatch implements Runnable {
private CountDownLatch countDownLatch;
TaskCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public void run() {
String name = Thread.currentThread().getName();
try {
System.out.println(name + “:waiting..”+countDownLatch.getCount());
//等待一定数量任务继续执行
Thread.sleep(new Random().nextInt(10000));
countDownLatch.countDown();
System.out.println(name + “:over…”);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.原理
CountDownLatch源代码是有内部类Sync实现,而Sync是继承AQS(抽象队列同步器)
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
//构造器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException(“count < 0”);
this.sync = new Sync(count);
}
//countDown方法
public void countDown() {
//releaseShared方法是抽象队列同步器的方法
sync.releaseShared(1);
}
//await方法
public void await() throws InterruptedException {
//acquireSharedInterruptibly方法是抽象队列同步器的方法
sync.acquireSharedInterruptibly(1);
}