并发编程的基石——AQS类
本博客系列是学习并发编程过程中的记录总结。由于文章比较多,写的时间也比较散,所以我整理了个目录贴(传送门),方便查阅。
本文参考了[Java多线程进阶(六)—— J.U.C之locks框架:AQS综述(1)]和Java技术之AQS详解两篇文章。
AQS 简介
AbstractQueuedSynchronizer
(简称AQS)类是整个 JUC包的核心类。JUC 中的ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
、Semaphore
和LimitLatch
等同步工具都是基于AQS实现的。
AQS 分离出了构建同步器时的通用关注点,这些关注点主要包括如下:
- 资源是可以被同时访问?还是在同一时间只能被一个线程访问?(共享/独占功能)
- 访问资源的线程如何进行并发管理?(等待队列)
- 如果线程等不及资源了,如何从等待队列退出?(超时/中断)
这些关注点都是围绕着资源——同步状态(synchronization state)来展开的,AQS将这些通用的关注点封装成了一个个模板方法,让子类可以直接使用。
AQS 留给用户的只有两个问题:
- 什么是资源
- 什么情况下资源是可以被访问的
这样一来,定义同步器的难度就大大降低了。用户只要解决好上面两个问题,就能构建出一个性能优秀的同步器。
下面是几个常见的同步器对资源的定义:
同步器 | 资源的定义 |
---|---|
ReentrantLock | 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数 |
ReentrantReadWriteLock | 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。 |
CountDownLatch | 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。 |
Semaphore | 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。 |
AQS 原理
上面一节中介绍到 AQS 抽象出了三个关注点,下面就具体看下 AQS 是如果解决这三个问题的。
同步状态的管理
同步状态,其实就是资源。AQS使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSetState操作来读取和更新这个状态。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
线程的阻塞和唤醒
在JDK1.5之前,除了内置的监视器机制外,没有其它方法可以安全且便捷得阻塞和唤醒当前线程。
JDK1.5以后,java.util.concurrent.locks包提供了LockSupport类来作为线程阻塞和唤醒的工具。
等待队列
等待队列,是AQS框架的核心,整个框架的关键其实就是如何在并发状态下管理被阻塞的线程。
等待队列是严格的FIFO队列,是Craig,Landin和Hagersten锁(CLH锁)的一种变种,采用双向循环链表实现,因此也叫CLH队列。
1. 节点定义
CLH队列中的结点是对线程的包装,结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。
每种类型的结点都有一些状态,其中独占结点使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享结点使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
结点状态 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 取消。表示后驱结点被中断或超时,需要移出队列 |
SIGNAL | -1 | 发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。) |
CONDITION | -2 | Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了 |
PROPAGATE | -3 | 传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。) |
INITIAL | 0 | 默认。新结点会处于这种状态 |
AQS使用CLH队列实现线程的结构管理,而CLH结构正是用前一结点某一属性表示当前结点的状态,之所以这种做是因为在双向链表的结构下,这样更容易实现取消和超时功能。
next指针:用于维护队列顺序,当临界区的资源被释放时,头结点通过next指针找到队首结点。
prev指针:用于在结点(线程)被取消时,让当前结点的前驱直接指向当前结点的后驱完成出队动作。
static final class Node {
// 共享模式结点
static final Node SHARED = new Node();
// 独占模式结点
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* INITAL: 0 - 默认,新结点会处于这种状态。
* CANCELLED: 1 - 取消,表示后续结点被中断或超时,需要移出队列;
* SIGNAL: -1- 发信号,表示后续结点被阻塞了;(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
* CONDITION: -2- Condition专用,表示当前结点在Condition队列中,因为等待某个条件而被阻塞了;
* PROPAGATE: -3- 传播,适用于共享模式。(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
*
* waitStatus表示的是后续结点状态,这是因为AQS中使用CLH队列实现线程的结构管理,而CLH结构正是用前一结点某一属性表示当前结点的状态,这样更容易实现取消和超时功能。
*/
volatile int waitStatus;
// 前驱指针
volatile Node prev;
// 后驱指针
volatile Node next;
// 结点所包装的线程
volatile Thread thread;
// Condition队列使用,存储condition队列中的后继节点
Node nextWaiter;
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
2. 队列定义
对于CLH队列,当线程请求资源时,如果请求不到,会将线程包装成结点,将其挂载在队列尾部。
CLH队列的示意图如下:
初始状态,队列head和tail都指向空
首个线程入队,先创建一个空的头结点,然后以自旋的方式不断尝试插入一个包含当前线程的新结点
再次加入新节点,新加入的节点会被放置到队列的尾部。(PS:看下了代码,AQS 的线程管理队列好像是一个双向循环队列,这边这个图是不是有点问题???)
AQS 的方法介绍
用户需要自己重写的方法
上面介绍到 AQS 已经帮用户解决了同步器定义过程中的大部分问题,只将下面两个问题丢给用户解决:
- 什么是资源
- 什么情况下资源是可以被访问的
具体的,AQS 是通过暴露以下 API 来让用户解决上面的问题的。
钩子方法 | 描述 |
---|---|
tryAcquire | 独占方式。尝试获取资源,成功则返回true,失败则返回false。 |
tryRelease | 独占方式。尝试释放资源,成功则返回true,失败则返回false。 |
tryAcquireShared | 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
tryReleaseShared | 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。 |
isHeldExclusively | 该线程是否正在独占资源。只有用到condition才需要去实现它。 |
如果你需要实现一个自己的同步器,一般情况下只要继承 AQS ,并重写 AQS 中的这个几个方法就行了。至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。要不怎么说Doug Lea贴心呢。
需要注意的是:如果你没在子类中重写这几个方法就直接调用了,会直接抛出异常。所以,在你调用这些方法之前必须重写他们。不使用的话可以不重写。
AQS 提供的一系列模板方法
查看 AQS 的源码我们就可以发现这个类提供了很多方法,看起来让人“眼花缭乱”的。但是最主要的两类方法就是获取资源的方法和释放资源的方法。因此我们抓住主要矛盾就行了:
- public final void acquire(int arg) // 独占模式的获取资源
- public final boolean release(int arg) // 独占模式的释放资源
- public final void acquireShared(int arg) // 共享模式的获取资源
- public final boolean releaseShared(int arg) // 共享模式的释放资源
acquire(int)方法
该方法以独占方式获取资源,如果获取到资源,线程继续往下执行,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面分析下这个acquire
方法的具体执行流程:
step1:首先这个方法调用了用户自己实现的方法
tryAcquire
方法尝试获取资源,如果这个方法返回true,也就是表示获取资源成功,那么整个acquire
方法就执行结束了,线程继续往下执行;step2:如果
tryAcquir
方法返回false,也就表示尝试获取资源失败。这时acquire
方法会先调用addWaiter
方法将当前线程封装成Node类并加入一个FIFO的双向队列的尾部。step3:再看
acquireQueued
这个关键方法。首先要注意的是这个方法中哪个无条件的for循环,这个for循环说明acquireQueued
方法一直在自旋尝试获取资源。进入for循环后,首先判断了当前节点的前继节点是不是头节点,如果是的话就再次尝试获取资源,获取资源成功的话就直接返回false(表示未被中断过)假如还是没有获取资源成功,判断是否需要让当前节点进入
waiting
状态,经过shouldParkAfterFailedAcquire
这个方法判断,如果需要让线程进入waiting
状态的话,就调用LockSupport的park方法让线程进入waiting
状态。进入waiting
状态后,这线程等待被interupt
或者unpark
(在release操作中会进行这样的操作,可以参见后面的代码)。这个线程被唤醒后继续执行for循环来尝试获取资源。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//首先判断了当前节点的前继节点是不是头节点,如果是的话就再次尝试获取资源,
//获取资源成功的话就直接返回false(表示未被中断过)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断是否需要让当前节点进入waiting状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果在整个等待过程中被中断过,则返回true,否则返回false。
// 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上就是acquire
方法的简单分析。
单独看这个方法的话可能会不太清晰,结合ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
、Semaphore
和LimitLatch
等同步工具看这个代码的话就会好理解很多。
release(int)方法
release(int)
方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//上面已经讲过了,需要用户自定义实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
与acquire()方法中的tryAcquire()类似,tryRelease()方法也是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。
但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
unparkSuccessor(Node)
方法用于唤醒等待队列中下一个线程。这里要注意的是,下一个线程并不一定是当前节点的next节点,而是下一个可以用来唤醒的线程,如果这个节点存在,调用unpark()
方法唤醒。
总之,release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
acquireShared(int)方法
acquireShared(int)
方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
public final void acquireShared(int arg) {
//tryAcquireShared需要用户自定义实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
可以发现,这个方法的关键实现其实是获取资源失败后,怎么管理线程。也就是doAcquireShared
的逻辑。
//不响应中断
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看出,doAcquireShared
的逻辑和acquireQueued
的逻辑差不多。将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。
简单总结下acquireShared
的流程:
step1:tryAcquireShared()尝试获取资源,成功则直接返回;
step2:失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
releaseShared(int)方法
releaseShared(int)
方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。