为充分利用机器性能,人们发明了多线程。但同时带来了线程安全问题,于是人们又发明了同步锁。

  这个问题自然人人知道,但你真的了解同步锁吗?还是说你会用其中的上锁与解锁功能?

  今天我们就一起来深入看同步锁的原理和实现吧!

 

一、同步锁的职责

  同步锁的职责可以说就一个,限制资源的使用(线程安全从属)。

  它一般至少会包含两个功能: 1. 给资源加锁; 2. 给资源解锁;另外,它一般还有 等待/通知 即 wait/notify 的功能;

  同步锁的应用场景:多个线程同时操作一个事务必须保证正确性;一个资源只能同时由一线程访问操作;一个资源最多只能接入k的并发访问;保证访问的顺序性;

  同步锁的实现方式:操作系统调度实现;应用自行实现;CAS自旋;

  同步锁的几个问题:

    为什么它能保证线程安全?

    锁等待耗CPU吗?

    使用锁后性能下降严重的原因是啥?

 

二、同步锁的实现一:lock/unlock

  其实对于应用层来说,非常多就是 lock/unlock , 这也是锁的核心。

  AQS 是java中很多锁实现的基础,因为它屏蔽了很多繁杂而底层的阻塞操作,为上层抽象出易用的接口。

  我们就以AQS作为跳板,先来看一下上锁的过程。为不至于陷入具体锁的业务逻辑中,我们先以最简单的 CountDownLatch 看看。

  1. // 先看看 CountDownLatch 的基础数据结构,可以说是不能再简单了,就继承了 AQS,然后简单覆写了几个必要方法。
  2. // java.util.concurrent.CountDownLatch.Sync
  3. /**
  4. * Synchronization control For CountDownLatch.
  5. * Uses AQS state to represent count.
  6. */
  7. private static final class Sync extends AbstractQueuedSynchronizer {
  8. private static final long serialVersionUID = 4982264981922014374L;
  9. Sync(int count) {
  10. setState(count);
  11. }
  12. int getCount() {
  13. return getState();
  14. }
  15. protected int tryAcquireShared(int acquires) {
  16. // 只有一种情况会获取锁成功,即 state == 0 的时候
  17. return (getState() == 0) ? 1 : -1;
  18. }
  19. protected boolean tryReleaseShared(int releases) {
  20. // Decrement count; signal when transition to zero
  21. for (;;) {
  22. int c = getState();
  23. if (c == 0)
  24. return false;
  25. // 原始的锁数量是在初始化时指定的不可变的,每次释放一个锁标识
  26. int nextc = c-1;
  27. if (compareAndSetState(c, nextc))
  28. // 只有一情况会释放锁成功,即本次释放后 state == 0
  29. return nextc == 0;
  30. }
  31. }
  32. }
  33. private final Sync sync;

 

重点1,我们看看上锁过程,即 await() 的调用。

  1. public void await() throws InterruptedException {
  2. // 调用 AQS 的接口,由AQS实现了锁的骨架逻辑
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
  6. /**
  7. * Acquires in shared mode, aborting if interrupted. Implemented
  8. * by first checking interrupt status, then invoking at least once
  9. * {@link #tryAcquireShared}, returning on success. Otherwise the
  10. * thread is queued, possibly repeatedly blocking and unblocking,
  11. * invoking {@link #tryAcquireShared} until success or the thread
  12. * is interrupted.
  13. * @param arg the acquire argument.
  14. * This value is conveyed to {@link #tryAcquireShared} but is
  15. * otherwise uninterpreted and can represent anything
  16. * you like.
  17. * @throws InterruptedException if the current thread is interrupted
  18. */
  19. public final void acquireSharedInterruptibly(int arg)
  20. throws InterruptedException {
  21. if (Thread.interrupted())
  22. throw new InterruptedException();
  23. // 首先尝试获取锁,如果成功就不用阻塞了
  24. // 而从上面的逻辑我们看到,获取锁相当之简单,所以,获取锁本身并没有太多的性能消耗哟
  25. // 如果获取锁失败,则会进行稍后尝试,这应该是复杂而精巧的
  26. if (tryAcquireShared(arg) < 0)
  27. doAcquireSharedInterruptibly(arg);
  28. }
  29. /**
  30. * Acquires in shared interruptible mode.
  31. * @param arg the acquire argument
  32. */
  33. private void doAcquireSharedInterruptibly(int arg)
  34. throws InterruptedException {
  35. // 首先将当前线程添加排队队尾,此处会保证线程安全,稍后我们可以看到
  36. final Node node = addWaiter(Node.SHARED);
  37. boolean failed = true;
  38. try {
  39. for (;;) {
  40. // 获取其上一节点,如果上一节点是头节点,就代表当前线程可以再次尝试获取锁了
  41. final Node p = node.predecessor();
  42. if (p == head) {
  43. int r = tryAcquireShared(arg);
  44. if (r >= 0) {
  45. setHeadAndPropagate(node, r);
  46. p.next = null; // help GC
  47. failed = false;
  48. return;
  49. }
  50. }
  51. // 先检测是否需要阻塞,然后再进行阻塞等待,阻塞由 LockSupport 底层支持
  52. // 如果阻塞后,将不会主动唤醒,只会由 unlock 时,主动被通知
  53. // 因此,此处即是获取锁的最终等待点
  54. // 操作系统将不会再次调度到本线程,直到获取到锁
  55. if (shouldParkAfterFailedAcquire(p, node) &&
  56. parkAndCheckInterrupt())
  57. throw new InterruptedException();
  58. }
  59. } finally {
  60. if (failed)
  61. cancelAcquire(node);
  62. }
  63. }
  64. // 如此线程安全地添加当前线程到队尾? CAS 保证
  65. /**
  66. * Creates and enqueues node for current thread and given mode.
  67. *
  68. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  69. * @return the new node
  70. */
  71. private Node addWaiter(Node mode) {
  72. Node node = new Node(Thread.currentThread(), mode);
  73. // Try the fast path of enq; backup to full enq on failure
  74. Node pred = tail;
  75. if (pred != null) {
  76. node.prev = pred;
  77. if (compareAndSetTail(pred, node)) {
  78. pred.next = node;
  79. return node;
  80. }
  81. }
  82. enq(node);
  83. return node;
  84. }
  85. /**
  86. * Inserts node into queue, initializing if necessary. See picture above.
  87. * @param node the node to insert
  88. * @return node's predecessor
  89. */
  90. private Node enq(final Node node) {
  91. for (;;) {
  92. Node t = tail;
  93. if (t == null) { // Must initialize
  94. if (compareAndSetHead(new Node()))
  95. tail = head;
  96. } else {
  97. node.prev = t;
  98. if (compareAndSetTail(t, node)) {
  99. t.next = node;
  100. return t;
  101. }
  102. }
  103. }
  104. }
  105. // 检测是否需要进行阻塞
  106. /**
  107. * Checks and updates status for a node that failed to acquire.
  108. * Returns true if thread should block. This is the main signal
  109. * control in all acquire loops. Requires that pred == node.prev.
  110. *
  111. * @param pred node's predecessor holding status
  112. * @param node the node
  113. * @return {@code true} if thread should block
  114. */
  115. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  116. int ws = pred.waitStatus;
  117. if (ws == Node.SIGNAL)
  118. /*
  119. * This node has already set status asking a release
  120. * to signal it, so it can safely park.
  121. */
  122. // 只有前置节点是 SIGNAL 状态的节点,才需要进行 阻塞等待,当然前置节点会在下一次循环中被设置好
  123. return true;
  124. if (ws > 0) {
  125. /*
  126. * Predecessor was cancelled. Skip over predecessors and
  127. * indicate retry.
  128. */
  129. do {
  130. node.prev = pred = pred.prev;
  131. } while (pred.waitStatus > 0);
  132. pred.next = node;
  133. } else {
  134. /*
  135. * waitStatus must be 0 or PROPAGATE. Indicate that we
  136. * need a signal, but don't park yet. Caller will need to
  137. * retry to make sure it cannot acquire before parking.
  138. */
  139. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  140. }
  141. return false;
  142. }
  143. // park 阻塞实现
  144. /**
  145. * Convenience method to park and then check if interrupted
  146. *
  147. * @return {@code true} if interrupted
  148. */
  149. private final boolean parkAndCheckInterrupt() {
  150. // 将当前 AQS 实例作为锁对象 blocker, 进行操作系统调用阻塞, 所以所有等待锁的线程将会在同一个锁前提下执行
  151. LockSupport.park(this);
  152. return Thread.interrupted();
  153. }

  如上,上锁过程是比较简单明了的。加入一队列,然后由操作系统将线程调出。(那么操作系统是如何把线程调出的呢?有兴趣自行研究)

 

重点2. 解锁过程,即 countDown() 调用

  1. public void countDown() {
  2. // 同样直接调用 AQS 的接口,由AQS实现了锁的释放骨架逻辑
  3. sync.releaseShared(1);
  4. }
  5. // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
  6. /**
  7. * Releases in shared mode. Implemented by unblocking one or more
  8. * threads if {@link #tryReleaseShared} returns true.
  9. *
  10. * @param arg the release argument. This value is conveyed to
  11. * {@link #tryReleaseShared} but is otherwise uninterpreted
  12. * and can represent anything you like.
  13. * @return the value returned from {@link #tryReleaseShared}
  14. */
  15. public final boolean releaseShared(int arg) {
  16. // 调用业务实现的释放逻辑,如果成功,再执行底层的释放,如队列移除,线程通知等等
  17. // 在 CountDownLatch 的实现中,只有 state == 0 时才会成功,所以它只会执行一次底层释放
  18. // 这也是我们认为 CountDownLatch 能够做到多线程同时执行的效果的原因之一
  19. if (tryReleaseShared(arg)) {
  20. doReleaseShared();
  21. return true;
  22. }
  23. return false;
  24. }
  25. /**
  26. * Release action for shared mode -- signals successor and ensures
  27. * propagation. (Note: For exclusive mode, release just amounts
  28. * to calling unparkSuccessor of head if it needs signal.)
  29. */
  30. private void doReleaseShared() {
  31. /*
  32. * Ensure that a release propagates, even if there are other
  33. * in-progress acquires/releases. This proceeds in the usual
  34. * way of trying to unparkSuccessor of head if it needs
  35. * signal. But if it does not, status is set to PROPAGATE to
  36. * ensure that upon release, propagation continues.
  37. * Additionally, we must loop in case a new node is added
  38. * while we are doing this. Also, unlike other uses of
  39. * unparkSuccessor, we need to know if CAS to reset status
  40. * fails, if so rechecking.
  41. */
  42. for (;;) {
  43. Node h = head;
  44. // 队列不为空才进行释放
  45. if (h != null && h != tail) {
  46. int ws = h.waitStatus;
  47. // 看过上面的 lock 逻辑,我们知道只要在阻塞状态,一定是 Node.SIGNAL
  48. if (ws == Node.SIGNAL) {
  49. // 状态改变成功,才进行后续的唤醒逻辑
  50. // 因为先改变状态成功,才算是线程安全的,再进行唤醒,否则进入下一次循环再检查
  51. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  52. continue; // loop to recheck cases
  53. // 将头节点的下一节点唤醒,如有必要
  54. unparkSuccessor(h);
  55. }
  56. // 这里的 propagates, 是要传播啥呢??
  57. // 为什么只唤醒了一个线程,其他线程也可以动了?
  58. else if (ws == 0 &&
  59. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  60. continue; // loop on failed CAS
  61. }
  62. if (h == head) // loop if head changed
  63. break;
  64. }
  65. }
  66. /**
  67. * Wakes up node's successor, if one exists.
  68. *
  69. * @param node the node
  70. */
  71. private void unparkSuccessor(Node node) {
  72. /*
  73. * If status is negative (i.e., possibly needing signal) try
  74. * to clear in anticipation of signalling. It is OK if this
  75. * fails or if status is changed by waiting thread.
  76. */
  77. int ws = node.waitStatus;
  78. if (ws < 0)
  79. compareAndSetWaitStatus(node, ws, 0);
  80. /*
  81. * Thread to unpark is held in successor, which is normally
  82. * just the next node. But if cancelled or apparently null,
  83. * traverse backwards from tail to find the actual
  84. * non-cancelled successor.
  85. */
  86. // 唤醒下一个节点
  87. // 但如果下一节点已经取消等待了,那么就找下一个没最近的没被取消的线程进行唤醒
  88. // 唤醒只是针对一个线程的哟
  89. Node s = node.next;
  90. if (s == null || s.waitStatus > 0) {
  91. s = null;
  92. for (Node t = tail; t != null && t != node; t = t.prev)
  93. if (t.waitStatus <= 0)
  94. s = t;
  95. }
  96. if (s != null)
  97. LockSupport.unpark(s.thread);
  98. }

 

重要3. 线程解锁的传播性?

  因为从上一节的讲解中,我们看到,当用户调用 countDown 时,仅仅是让操作系统唤醒了 head 的下一个节点线程或者最近未取消的节点。那么,从哪里来的所有线程都获取了锁从而运行呢?

  其实是在 获取锁的过程中,还有一点我们未看清:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
  2. /**
  3. * Acquires in shared uninterruptible mode.
  4. * @param arg the acquire argument
  5. */
  6. private void doAcquireShared(int arg) {
  7. final Node node = addWaiter(Node.SHARED);
  8. boolean failed = true;
  9. try {
  10. boolean interrupted = false;
  11. for (;;) {
  12. final Node p = node.predecessor();
  13. if (p == head) {
  14. // 当countDown被调用后,head节点被唤醒,执行
  15. int r = tryAcquireShared(arg);
  16. if (r >= 0) {
  17. // 获取到锁后,设置node为下一个头节点,并把唤醒状态传播下去,而这里面肯定会做一些唤醒其他线程的操作,请看下文
  18. setHeadAndPropagate(node, r);
  19. p.next = null; // help GC
  20. if (interrupted)
  21. selfInterrupt();
  22. failed = false;
  23. return;
  24. }
  25. }
  26. if (shouldParkAfterFailedAcquire(p, node) &&
  27. parkAndCheckInterrupt())
  28. interrupted = true;
  29. }
  30. } finally {
  31. if (failed)
  32. cancelAcquire(node);
  33. }
  34. }
  35. /**
  36. * Sets head of queue, and checks if successor may be waiting
  37. * in shared mode, if so propagating if either propagate > 0 or
  38. * PROPAGATE status was set.
  39. *
  40. * @param node the node
  41. * @param propagate the return value from a tryAcquireShared
  42. */
  43. private void setHeadAndPropagate(Node node, int propagate) {
  44. Node h = head; // Record old head for check below
  45. setHead(node);
  46. /*
  47. * Try to signal next queued node if:
  48. * Propagation was indicated by caller,
  49. * or was recorded (as h.waitStatus either before
  50. * or after setHead) by a previous operation
  51. * (note: this uses sign-check of waitStatus because
  52. * PROPAGATE status may transition to SIGNAL.)
  53. * and
  54. * The next node is waiting in shared mode,
  55. * or we don't know, because it appears null
  56. *
  57. * The conservatism in both of these checks may cause
  58. * unnecessary wake-ups, but only when there are multiple
  59. * racing acquires/releases, so most need signals now or soon
  60. * anyway.
  61. */
  62. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  63. (h = head) == null || h.waitStatus < 0) {
  64. // 如果有必要,则做一次唤醒下一线程的操作
  65. // 在 countDown() 不会触发此操作,所以这里只是一个内部调用传播
  66. Node s = node.next;
  67. if (s == null || s.isShared())
  68. // 此处锁释放逻辑如上,总之,又是另一次的唤醒触发
  69. doReleaseShared();
  70. }
  71. }

  到此,我们明白了它是怎么做到一个锁释放,所有线程可通行的。也从根本上回答了我们猜想,所有线程同时并发运行。然而并没有,它只是通过唤醒传播性来依次唤醒各个等待线程的。从绝对时间性上来讲,都是有先后关系的。以后可别再浅显说是同时执行了哟。

 

三、 锁的切换:wait/notify

  上面看出,针对一个lock/unlock 的过程还是很简单的,由操作系统负责大头,实现代码也并不多。

  但是针对稍微有点要求的场景,就会进行条件式的操作。比如:持有某个锁运行一段代码,但是,运行时发现某条件不满足,需要进行等待而不能直接结束,直到条件成立。即所谓的 wait 操作。

  乍一看,wait/notify 与 lock/unlock 很像,其实不然。区分主要是 lock/unlock 是针对整个代码段的,而 wait/notify 则是针对某个条件的,即获取了锁不代表条件成立了,但是条件成立了一定要在锁的前提下才能进行安全操作。

  那么,是否 wait/notify 也一样的实现简单呢?比如java的最基础类 Object 类就提供了 wait/notify 功能。

  我们既然想一探究竟,还是以并发包下的实现作为基础吧,毕竟 java 才是我们的强项。

  本次,咱们以  ArrayBlockingQueue#put/take 作为基础看下这种场景的使用先。

  ArrayBlockingQueue 的put/take 特性就是,put当队列满时,一直阻塞,直到有可用位置才继续运行下一步。而take当队列为空时一样阻塞,直到队列里有数据才运行下一步。这种场景使用锁主不好搞了,因为这是一个条件判断。put/take 如下:

  1. // java.util.concurrent.ArrayBlockingQueue#put
  2. /**
  3. * Inserts the specified element at the tail of this queue, waiting
  4. * for space to become available if the queue is full.
  5. *
  6. * @throws InterruptedException {@inheritDoc}
  7. * @throws NullPointerException {@inheritDoc}
  8. */
  9. public void put(E e) throws InterruptedException {
  10. checkNotNull(e);
  11. final ReentrantLock lock = this.lock;
  12. lock.lockInterruptibly();
  13. try {
  14. // 当队列满时,一直等待
  15. while (count == items.length)
  16. notFull.await();
  17. enqueue(e);
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. // java.util.concurrent.ArrayBlockingQueue#take
  23. public E take() throws InterruptedException {
  24. final ReentrantLock lock = this.lock;
  25. lock.lockInterruptibly();
  26. try {
  27. // 当队列为空时一直等待
  28. while (count == 0)
  29. notEmpty.await();
  30. return dequeue();
  31. } finally {
  32. lock.unlock();
  33. }
  34. }

  看起来相当简单,完全符合人类思维。只是,这里使用的两个变量进行控制流程 notFull,notEmpty. 这两个变量是如何进行关联的呢?

  在这之前,我们还需要补充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何时才能运行呢?如上代码在各自的入队和出队完成之后进行通知就可以了。

  1. // 与 put 对应,入队完成后,队列自然就不为空了,通知下 notEmpty 就好了
  2. /**
  3. * Inserts element at current put position, advances, and signals.
  4. * Call only when holding lock.
  5. */
  6. private void enqueue(E x) {
  7. // assert lock.getHoldCount() == 1;
  8. // assert items[putIndex] == null;
  9. final Object[] items = this.items;
  10. items[putIndex] = x;
  11. if (++putIndex == items.length)
  12. putIndex = 0;
  13. count++;
  14. // 我已放入一个元素,不为空了
  15. notEmpty.signal();
  16. }
  17. // 与 take 对应,出队完成后,自然就不可能是满的了,至少一个空余空间。
  18. /**
  19. * Extracts element at current take position, advances, and signals.
  20. * Call only when holding lock.
  21. */
  22. private E dequeue() {
  23. // assert lock.getHoldCount() == 1;
  24. // assert items[takeIndex] != null;
  25. final Object[] items = this.items;
  26. @SuppressWarnings("unchecked")
  27. E x = (E) items[takeIndex];
  28. items[takeIndex] = null;
  29. if (++takeIndex == items.length)
  30. takeIndex = 0;
  31. count--;
  32. if (itrs != null)
  33. itrs.elementDequeued();
  34. // 我已移除一个元素,肯定没有满了,你们继续放入吧
  35. notFull.signal();
  36. return x;
  37. }

  是不是超级好理解。是的。不过,我们不是想看 ArrayBlockingQueue 是如何实现的,我们是要论清 wait/notify 是如何实现的。因为毕竟,他们不是一个锁那么简单。

  1. // 三个锁的关系,即 notEmpty, notFull 都是 ReentrantLock 的条件锁,相当于是其子集吧
  2. /** Main lock guarding all access */
  3. final ReentrantLock lock;
  4. /** Condition for waiting takes */
  5. private final Condition notEmpty;
  6. /** Condition for waiting puts */
  7. private final Condition notFull;
  8. public ArrayBlockingQueue(int capacity, boolean fair) {
  9. if (capacity <= 0)
  10. throw new IllegalArgumentException();
  11. this.items = new Object[capacity];
  12. lock = new ReentrantLock(fair);
  13. notEmpty = lock.newCondition();
  14. notFull = lock.newCondition();
  15. }
  16. // lock.newCondition() 是什么鬼?它是 AQS 中实现的 ConditionObject
  17. // java.util.concurrent.locks.ReentrantLock#newCondition
  18. public Condition newCondition() {
  19. return sync.newCondition();
  20. }
  21. // java.util.concurrent.locks.ReentrantLock.Sync#newCondition
  22. final ConditionObject newCondition() {
  23. // AQS 中定义
  24. return new ConditionObject();
  25. }

  接下来,我们要带着几个疑问来看这个 Condition 的对象:

    1. 它的 wait/notify 是如何实现的?
    2. 它是如何与互相进行联系的?
    3. 为什么 wait/notify 必须要在外面的lock获取之后才能执行?
    4. 它与Object的wait/notify 有什么相同和不同点?

  能够回答了上面的问题,基本上对其原理与实现也就理解得差不多了。

 

重点1. wait/notify 是如何实现的?

  我们从上面可以看到,它是通过调用 await()/signal() 实现的,到底做事如何,且看下面。

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
  2. /**
  3. * Implements interruptible condition wait.
  4. * <ol>
  5. * <li> If current thread is interrupted, throw InterruptedException.
  6. * <li> Save lock state returned by {@link #getState}.
  7. * <li> Invoke {@link #release} with saved state as argument,
  8. * throwing IllegalMonitorStateException if it fails.
  9. * <li> Block until signalled or interrupted.
  10. * <li> Reacquire by invoking specialized version of
  11. * {@link #acquire} with saved state as argument.
  12. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  13. * </ol>
  14. */
  15. public final void await() throws InterruptedException {
  16. if (Thread.interrupted())
  17. throw new InterruptedException();
  18. // 添加当前线程到 等待线程队列中,有 lastWaiter/firstWaiter 维护
  19. Node node = addConditionWaiter();
  20. // 释放当前lock中持有的锁,详情且看下文
  21. int savedState = fullyRelease(node);
  22. // 从以下开始,将不再保证线程安全性,因为当前的锁已经释放,其他线程将会重新竞争锁使用
  23. int interruptMode = 0;
  24. // 循环判定,如果当前节点不在 sync 同步队列中,那么就反复阻塞自己
  25. // 所以判断是否在 同步队列上,是很重要的
  26. while (!isOnSyncQueue(node)) {
  27. // 没有在同步队列,阻塞
  28. LockSupport.park(this);
  29. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  30. break;
  31. }
  32. // 当条件被满足后,需要重新竞争锁,详情看下文
  33. // 竞争到锁后,原样返回到 wait 的原点,继续执行业务逻辑
  34. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  35. interruptMode = REINTERRUPT;
  36. // 下面是异常处理,忽略
  37. if (node.nextWaiter != null) // clean up if cancelled
  38. unlinkCancelledWaiters();
  39. if (interruptMode != 0)
  40. reportInterruptAfterWait(interruptMode);
  41. }
  42. /**
  43. * Invokes release with current state value; returns saved state.
  44. * Cancels node and throws exception on failure.
  45. * @param node the condition node for this wait
  46. * @return previous sync state
  47. */
  48. final int fullyRelease(Node node) {
  49. boolean failed = true;
  50. try {
  51. int savedState = getState();
  52. // 预期的,都是释放锁成功,如果失败,说明当前线程并并未获取到锁,引发异常
  53. if (release(savedState)) {
  54. failed = false;
  55. return savedState;
  56. } else {
  57. throw new IllegalMonitorStateException();
  58. }
  59. } finally {
  60. if (failed)
  61. node.waitStatus = Node.CANCELLED;
  62. }
  63. }
  64. /**
  65. * Releases in exclusive mode. Implemented by unblocking one or
  66. * more threads if {@link #tryRelease} returns true.
  67. * This method can be used to implement method {@link Lock#unlock}.
  68. *
  69. * @param arg the release argument. This value is conveyed to
  70. * {@link #tryRelease} but is otherwise uninterpreted and
  71. * can represent anything you like.
  72. * @return the value returned from {@link #tryRelease}
  73. */
  74. public final boolean release(int arg) {
  75. // tryRelease 由客户端自定义实现
  76. if (tryRelease(arg)) {
  77. Node h = head;
  78. if (h != null && h.waitStatus != 0)
  79. unparkSuccessor(h);
  80. return true;
  81. }
  82. return false;
  83. }
  84. // 如何判定当前线程是否在同步队列中或者可以进行同步队列?
  85. /**
  86. * Returns true if a node, always one that was initially placed on
  87. * a condition queue, is now waiting to reacquire on sync queue.
  88. * @param node the node
  89. * @return true if is reacquiring
  90. */
  91. final boolean isOnSyncQueue(Node node) {
  92. // 如果上一节点还没有被移除,当前节点就不能被加入到同步队列
  93. if (node.waitStatus == Node.CONDITION || node.prev == null)
  94. return false;
  95. // 如果当前节点的下游节点已经存在,则它自身必定已经被移到同步队列中
  96. if (node.next != null) // If has successor, it must be on queue
  97. return true;
  98. /*
  99. * node.prev can be non-null, but not yet on queue because
  100. * the CAS to place it on queue can fail. So we have to
  101. * traverse from tail to make sure it actually made it. It
  102. * will always be near the tail in calls to this method, and
  103. * unless the CAS failed (which is unlikely), it will be
  104. * there, so we hardly ever traverse much.
  105. */
  106. // 最终直接从同步队列中查找,如果找到,则自身已经在同步队列中
  107. return findNodeFromTail(node);
  108. }
  109. /**
  110. * Returns true if node is on sync queue by searching backwards from tail.
  111. * Called only when needed by isOnSyncQueue.
  112. * @return true if present
  113. */
  114. private boolean findNodeFromTail(Node node) {
  115. Node t = tail;
  116. for (;;) {
  117. if (t == node)
  118. return true;
  119. if (t == null)
  120. return false;
  121. t = t.prev;
  122. }
  123. }
  124. // 当条件被满足后,需要重新竞争锁,以保证外部的锁语义,因为之前自己已经将锁主动释放
  125. // 这个锁与 lock/unlock 时的一毛一样,没啥可讲的
  126. // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
  127. /**
  128. * Acquires in exclusive uninterruptible mode for thread already in
  129. * queue. Used by condition wait methods as well as acquire.
  130. *
  131. * @param node the node
  132. * @param arg the acquire argument
  133. * @return {@code true} if interrupted while waiting
  134. */
  135. final boolean acquireQueued(final Node node, int arg) {
  136. boolean failed = true;
  137. try {
  138. boolean interrupted = false;
  139. for (;;) {
  140. final Node p = node.predecessor();
  141. if (p == head && tryAcquire(arg)) {
  142. setHead(node);
  143. p.next = null; // help GC
  144. failed = false;
  145. return interrupted;
  146. }
  147. if (shouldParkAfterFailedAcquire(p, node) &&
  148. parkAndCheckInterrupt())
  149. interrupted = true;
  150. }
  151. } finally {
  152. if (failed)
  153. cancelAcquire(node);
  154. }
  155. }

  总结一下 wait 的逻辑:

    1. 前提:自身已获取到外部锁;
    2. 将当前线程添加到 ConditionQueue 等待队列中;
    3. 释放已获取到的锁;
    4. 反复检查进入等待,直到当前节点被移动到同步队列中;
    5. 条件满足被唤醒,重新竞争外部锁,成功则返回,否则继续阻塞;(外部锁是同一个,这也是要求两个对象必须存在依赖关系的原因)
    6. wait前线程持有锁,wait后线程持有锁,没有一点外部锁变化;

 

重点2. 厘清了 wait, 接下来,我们看 signal() 通知唤醒的实现:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
  2. /**
  3. * Moves the longest-waiting thread, if one exists, from the
  4. * wait queue for this condition to the wait queue for the
  5. * owning lock.
  6. *
  7. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  8. * returns {@code false}
  9. */
  10. public final void signal() {
  11. // 只有获取锁的实例,才可以进行signal,否则你拿什么去保证线程安全呢
  12. if (!isHeldExclusively())
  13. throw new IllegalMonitorStateException();
  14. Node first = firstWaiter;
  15. // 通知 firstWaiter
  16. if (first != null)
  17. doSignal(first);
  18. }
  19. /**
  20. * Removes and transfers nodes until hit non-cancelled one or
  21. * null. Split out from signal in part to encourage compilers
  22. * to inline the case of no waiters.
  23. * @param first (non-null) the first node on condition queue
  24. */
  25. private void doSignal(Node first) {
  26. // 最多只转移一个 节点
  27. do {
  28. if ( (firstWaiter = first.nextWaiter) == null)
  29. lastWaiter = null;
  30. first.nextWaiter = null;
  31. } while (!transferForSignal(first) &&
  32. (first = firstWaiter) != null);
  33. }
  34. // 将一个节点从 等待队列 移动到 同步队列中,即可参与下一轮竞争
  35. // 只有确实移动成功才会返回 true
  36. // 说明:当前线程是持有锁的线程
  37. // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
  38. /**
  39. * Transfers a node from a condition queue onto sync queue.
  40. * Returns true if successful.
  41. * @param node the node
  42. * @return true if successfully transferred (else the node was
  43. * cancelled before signal)
  44. */
  45. final boolean transferForSignal(Node node) {
  46. /*
  47. * If cannot change waitStatus, the node has been cancelled.
  48. */
  49. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  50. return false;
  51. /*
  52. * Splice onto queue and try to set waitStatus of predecessor to
  53. * indicate that thread is (probably) waiting. If cancelled or
  54. * attempt to set waitStatus fails, wake up to resync (in which
  55. * case the waitStatus can be transiently and harmlessly wrong).
  56. */
  57. // 同步队列由 head/tail 指针维护
  58. Node p = enq(node);
  59. int ws = p.waitStatus;
  60. // 注意,此处正常情况下并不会唤醒等待线程,仅是将队列转移。
  61. // 因为当前线程的锁保护区域并未完成,完成后自然会唤醒其他等待线程
  62. // 否则将会存在当前线程任务还未执行完成,却被其他线程抢了先去,那接下来的任务当如何??
  63. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  64. LockSupport.unpark(node.thread);
  65. return true;
  66. }

  总结一下,notify 的功能原理如下:

    1. 前提:自身已获取到外部锁;
    2. 转移下一个等待队列的节点到同步队列中;
    3. 如果遇到下一节点被取消情况,顺延到再下一节点直到为空,至多转移一个节点;
    4. 正常情况下不做线程的唤醒操作;

  所以,实现 wait/notify, 最关键的就是维护两个队列,等待队列与同步队列,而且都要求是在有外部锁保证的情况下执行。

  到此,我们也能回答一个问题:为什么wait/notify一定要在锁模式下才能运行?

  因为wait是等待条件成立,此时必定存在竞争需要做保护,而它自身又必须释放锁以使外部条件可成立,且后续需要做恢复动作;而notify之后可能还有后续工作必须保障安全,notify只是锁的一个子集。。。

 

四、通知所有线程的实现:notifyAll

  有时条件成立后,可以允许所有线程通行,这时就可以进行 notifyAll, 那么如果达到通知所有的目的呢?是一起通知还是??

  以下是 AQS 中的实现:

  1. // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
  2. public final void signalAll() {
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignalAll(first);
  8. }
  9. /**
  10. * Removes and transfers all nodes.
  11. * @param first (non-null) the first node on condition queue
  12. */
  13. private void doSignalAll(Node first) {
  14. lastWaiter = firstWaiter = null;
  15. do {
  16. Node next = first.nextWaiter;
  17. first.nextWaiter = null;
  18. transferForSignal(first);
  19. first = next;
  20. } while (first != null);
  21. }

  可以看到,它是通过遍历所有节点,依次转移等待队列到同步队列(通知)的,原本就没有人能同时干几件事的!

  本文从java实现的角度去解析同步锁的原理与实现,但并不局限于java。道理总是相通的,只是像操作系统这样的大佬,能干的活更纯粹:比如让cpu根本不用调度一个线程。

 

版权声明:本文为yougewe原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yougewe/p/11922194.html