AQS解析
什么是AQS?
AQS是JUC内存的基石,它本质上是一个抽象类,定义了多线程下资源争夺与释放的规则和过程,许多实现类都是继承于AQS,使用AQS的骨架。
AQS的原理
AQS总体上来看是由一个FIFO(先进先出)队列和一个state属性配合组成资源分配机制的。FIFO用来存储线程结点的,state属性用来表示资源的状态,如果为0表示空闲,如果资源被某个线程获取到,那么这个state就会+1,释放-1。当其他线程试图争夺资源时会检查state值,如果发现不为0就会放弃争夺。
当然这只是总体上的原理,如果想要了解其中的细节,还需要阅读相应的源码才能彻底弄清楚其中的细节。
源码剖析
结构
要想完整知道AQS的原理,需要从它的源码出发,查看它的内部结构。这里只针对几个重要内部类和属性说明。
从左图可以看出在AQS内部含有一个内部类Node,这个Node就是上面提到的队列中存储的线程结点对象对应的类,可以看到它包含prev,next属性,所以可以看出这是一个双向链表结构形成的队列。waitStatus表示当前结点对应线程的状态,它的值也在属性中设置了,就是1,-1,-2,-3那几个常量属性。1表示线程的请求已经被取消了,-1表示线程正在资源释放,-2表示,
右图是AQS的属性,head表示队列的头结点,tail表示队列的尾结点,state表示资源的状态。
过程
这里从ReentrantLock为例,查看它的lock、unlock方法的源码过程。
首先需要知道ReentrantLock的继承关系。
sync、FairSync、NonfairSync都是ReentrantLock的内部类,其中Sync是直接继承AQS的,而ReentrantLock在定义时可以声明为公平锁或者是非公平锁,所以内部设置了两个内部类,一个 FairSync 表示公平锁,一个 NonfairSync 表示非公平锁,这两个类又是继承Sync,实际执行的方法会根据锁性质的不同而选择执行这两个类中对应的实现方法。
lock()
public void lock() { sync.lock(); } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); .... }
View Code
可以看到这里直接跳转到一个sync的抽象方法,上面也说了,这里会根据锁的性质来选择不同的实现执行。
1 static final class FairSync extends Sync { 2 private static final long serialVersionUID = -3000897897090466540L; 3 4 final void lock() { 5 acquire(1); 6 } 7 .... 8 } 9 10 11 12 13 static final class NonfairSync extends Sync { 14 private static final long serialVersionUID = 7316153563782823691L; 15 16 /** 17 * Performs lock. Try immediate barge, backing up to normal 18 * acquire on failure. 19 */ 20 final void lock() { 21 if (compareAndSetState(0, 1)) 22 setExclusiveOwnerThread(Thread.currentThread()); 23 else 24 acquire(1); 25 } 26 .... 27 }
可以看出公平锁的实现是比较简单的,因为公平锁是需要遵守队列秩序,按顺序执行就可以了,而非公平锁则没有那么 “老实” ,它会先尝试获取锁,如果之前获取资源的线程正好执行完了或者调用wait等方法释放锁了,那么就会 “插队” 直接夺取资源执行。这里就看一下更复杂的非公平锁是如何执行的。
1、compareAndSetState 方法
根据NonfairSync对lock方法的实现可以看到,第一步会执行 compareAndSetState 方法。
1 protected final boolean compareAndSetState(int expect, int update) { 2 // See below for intrinsics setup to support this 3 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 4 } 5 6 7 8 9 10 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
发现这里最终调用的是一个本地方法,其实这个方法就是一个CAS乐观锁方法,compareAndSwapInt 方法的四个参数分别是对象,对象属性名,期望值,更新值。当修改时检查该对象属性值等于期望值就更新成功,否则就失败。而这里的 stateOffset 又是哪个属性呢?
1 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2 private static final long stateOffset; 3 private static final long headOffset; 4 private static final long tailOffset; 5 private static final long waitStatusOffset; 6 private static final long nextOffset; 7 8 static { 9 try { 10 stateOffset = unsafe.objectFieldOffset 11 (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 12 headOffset = unsafe.objectFieldOffset 13 (AbstractQueuedSynchronizer.class.getDeclaredField("head")); 14 tailOffset = unsafe.objectFieldOffset 15 (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); 16 waitStatusOffset = unsafe.objectFieldOffset 17 (Node.class.getDeclaredField("waitStatus")); 18 nextOffset = unsafe.objectFieldOffset 19 (Node.class.getDeclaredField("next")); 20 21 } catch (Exception ex) { throw new Error(ex); } 22 }
可以看出这个 stateOffset 属性就是 AQS 的 state 属性。所以在lock方法里首先是尝试将state改成1,如果成功就继续执行条件代码块中的代码。也就是 setExclusiveOwnerThread 方法,这个方法实现是这样的。
1 protected final void setExclusiveOwnerThread(Thread thread) { 2 exclusiveOwnerThread = thread; 3 }
关于这个方法和这个属性可以看到是属于 AbstractOwnableSynchronizer 这个类的,而这个类又是AQS的父类,所以也是从 AbstractOwnableSynchronizer 继承而来的,这个属性就是表示当前占用资源的线程。所以第一步是直接使用CAS尝试抢占锁,如果成功就修改相关属性,然后结束。如果失败就执行 acquire 方法。
2、acquire 方法
这个方法是AQS中的方法。在这个方法里面又包含许多小的方法。首先先看一下源码。
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
2.1、tryAcquire 方法:尝试获取锁资源以及判断是否是当前线程已获取到锁资源并重复加锁
这个是一个抽象方法。下面是 NonFairSync 实现的相关代码。
1 protected final boolean tryAcquire(int acquires) { 2 return nonfairTryAcquire(acquires); 3 } 4 5 6 7 8 final boolean nonfairTryAcquire(int acquires) { 9 final Thread current = Thread.currentThread(); 10 int c = getState(); 11 if (c == 0) { 12 if (compareAndSetState(0, acquires)) { 13 setExclusiveOwnerThread(current); 14 return true; 15 } 16 } 17 else if (current == getExclusiveOwnerThread()) { 18 int nextc = c + acquires; 19 if (nextc < 0) // overflow 20 throw new Error("Maximum lock count exceeded"); 21 setState(nextc); 22 return true; 23 } 24 return false; 25 }
这个方法首先是判断当前资源是否空闲(state=0),如果空闲就将相关属性进行修改(还是上面说得 exclusiveOwnerThread 属性),然后结束,返回 true(这是针对之前占用资源的线程刚好释放锁的情况);否则检查当前线程是否和占用资源的线程属性一致,如果一致就将state+传参值(一般情况下是+1),然后结束,返回 true(这是针对当前线程在已占用资源的情况下再次加锁(可重入锁));负责返回 false (获取锁失败)。
2.2、addWriter 方法:执行队列初始化以及Node结点插入操作并返回这个结点
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 // Try the fast path of enq; backup to full enq on failure 4 Node pred = tail; // 获取尾结点 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { // 判读尾结点是否为空,如果不为空就直接将当前结点新增至尾结点之后作为尾结点 8 pred.next = node; 9 return node; 10 } 11 } 12 enq(node); // 队列初始化以及执行插入操作 13 return node; 14 }
enp方法:初始化方法
private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) { for (;;) { Node t = tail; if (t == null) { // 队列初始化,如果尾结点为空就新建一个空结点作为头结点,并且因为是for循环所以在初始化队列后还会继续执行插入操作 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 执行插入操作,然后return 返回 t.next = node; return t; } } } }
从addWriter方法的源码可以知道,这个方法就是执行队列初始化以及Node结点插入操作的,并且在队列的头结点会是一个空结点(哨兵结点)。
2.3、acquireQueued 方法:控制线程的阻塞
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 获取当前结点的前一个结点 if (p == head && tryAcquire(arg)) { // 如果是头结点且尝试获取锁资源成功,就将当前结点设为头结点(哨兵结点), // 然后将之前的头结点引用全部消除,让它顺利回收,再返回中断状态false 。
// 这里的代码是当前线程获取到锁后执行的(是非公平锁,在还没加入队列正好碰上占用线程释放了锁资源或者是正常在队列中收到阻塞唤醒,也就是其他线程执行了unlock方法) setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 先判断修改前一个线程结点的waitStatus(防止中途跳出取消等待),
// 如果符合再进行线程的阻塞,通过后将中断状态设为true(因为获取到资源),执行后面的出队操作 interrupted = true; } } finally { if (failed) cancelAcquire(node); // 如果线程发生异常,避免没有执行线程出队的代码所以这里使用finally强制执行,将线程从队列中移除 } }
shouldParkAfterFailedAcquire:检查前面一个线程的waitStatus状态,如果不是1(取消执行),那么就将当前线程正式加入阻塞队列。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 表示线程准备就绪,直接返回true /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { // 表示线程请求取消了,将跳过该线程往后找直到<=0 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 找到后将其设为-1(就绪) } return false; // 虽然这里返回false,但是上一个方法是for循环,所以下一个循环还是会返回true来继续执行后面的判断代码 }
parkAndCheckInterrupt:进行线程的阻塞。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 将当前线程阻塞,效果与wait()、notify()一样,不过更灵活, // 不需要在同步代码块中也不需要按照先加锁后解锁的方式,它是通过“通行证”完成的 return Thread.interrupted(); }
到这里如果线程没有释放资源的话,那么当前线程就会因为LockSupport的park方法进入阻塞,正式进入阻塞队列等待资源释放。而让它解除阻塞就是靠unlock()方法
unlock方法
这里调用的是内部sync的release方法。
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { // 判断资源是否空闲 Node h = head; if (h != null && h.waitStatus != 0) // 如果头结点不为空且状态不是初始值就唤醒队列中第一个有效线程 unparkSuccessor(h); return true; } return false; }
1、tryRelease 方法:更新state等属性并返回资源空闲状态
ReentrantLock的实现方法:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 让state减去参数值 if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程不是占用资源的线程,就抛出异常 throw new IllegalMonitorStateException(); boolean free = false; // 当前资源是否空闲 if (c == 0) { free = true; // 如果state变成0,就设为true,再将当前占用线程属性设为null setExclusiveOwnerThread(null); } setState(c); // 更新state return free; }
2、unparkSuccessor:将头结点的waitStatus设为初始值,并唤醒队列第一个有效结点对应的线程(如果头结点下一个结点不符合条件就从队尾开始找到第一个合适的线程)
private void unparkSuccessor(Node node) { // 因为传来的头结点,所以这里的node就是头结点 /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 将头结点的waitStatus设为0 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { // 下一个线程结点不满足条件执行(结点为空或者取消请求了) s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 从队列尾部循环遍历,找到前一个有效结点 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 将其释放 }
在执行完这段代码后队列中第一个有效线程就会被唤醒,随后成为新的哨兵结点,而前一个线程的引用也会被断开。需要注意的是当头结点后面一个结点不符合条件,是从队尾开始遍历直到找到第一个合适的线程唤醒的,而不是从头部开始遍历。至于原因可以参考 https://www.zhihu.com/question/50724462 ,其中有一个解释比较有道理,那就是在上面 addWrite 方法中的 enq 方法中,是先执行 “node.prev=t ” 以及 “compareAndSetTail(t, node)” 的,然后才执行后一句 “t.next = node;” 在此之间可能就会发生锁的释放,如果是从head开始向后遍历,那么因为 “t.next=node” 还没有执行,所以新加的结点就遍历不到,这就会影响总体的性能。
而在 unlock()唤醒合适的线程之后,上面lock中的代码就会继续往后执行。
下面是总体上大概的流程图:
总结
AQS是JUC并发编程的基石,它定义了线程执行的过程。总体上来看其原理主要通过state和一个FIFO队列组成。state展示资源的占用状态,队列用来存储排队的线程(头结点是哨兵结点)。每个线程结点包含一个等待状态属性waitStatus,用来表示对应线程的等待状态。需要注意的是,1.队列并一定是先进先出的,当顺数第一个线程中断了等待且没有其他线程抢夺资源时,就会从队列尾部遍历找到第一个没有中断的线程唤醒执行。2.队列头结点并不是下一个会检查执行的线程结点,而是一个哨兵结点,下一个会检查第二个。