前言

请看上篇:Java 对象头那点事

文章中的源码都有不同程度缩减,来源于openjdk8的开源代码(tag:jdk8-b120)


锁粗化过程
image

偏向锁

①:markword中保存的线程ID是自己且epoch等于class的epoch,则说明是偏向锁重入。
②:偏向锁若已禁用,进行撤销偏向锁。
③:偏向锁开启,都进行进行重偏向操作。
④:若进行了锁撤销操作或重偏向操作失败,则需要升级为轻量级锁或者进一步升级为重量级锁。

锁对象在发送锁竞争后会升级为偏向锁,不过当不发生锁竞争时,锁对象依然会升级为偏向锁,这种情况叫匿名偏向。
当jvm启动4s后,会默认给新建的对象加上偏向锁。


上代码:

  1. <dependency>
  2. <groupId>org.openjdk.jol</groupId>
  3. <artifactId>jol-core</artifactId>
  4. <version>0.8</version>
  5. </dependency>

这个包下的工具类的功能有:

  1. // 查看对象内部结构
  2. System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
  3. // 查看对象外部信息
  4. System.out.println(GraphLayout.parseInstance(bingo).toPrintable());
  5. // 查看对象总大小
  6. System.out.println(GraphLayout.parseInstance(bingo).totalSize());

默认JVM是开启指针压缩,可以通过vm参数开启关闭指针压缩:-XX:-UseCompressedOops


当创建锁对象前不进行休眠4s的操作:

  1. @Test
  2. public void mark() throws InterruptedException {
  3. Bingo bingo = new Bingo();
  4. bingo.setP(1);
  5. bingo.setB(false);
  6. // 查看对象内部结构
  7. System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
  8. System.out.println("\n++++++++++++++++++++++++++\n");
  9. synchronized (bingo) {
  10. // 查看对象内部结构
  11. System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
  12. }
  13. }

image

看我标红线的后三位的值,由于启动过快,锁直接从无锁升级成了轻量级锁。


当创建锁对象前进行休眠4s的操作:

  1. @Test
  2. public void mark() throws InterruptedException {
  3. TimeUnit.SECONDS.sleep(4);
  4. Bingo bingo = new Bingo();
  5. bingo.setP(1);
  6. bingo.setB(false);
  7. // 查看对象内部结构
  8. System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
  9. System.out.println("\n++++++++++++++++++++++++++\n");
  10. synchronized (bingo) {
  11. // 查看对象内部结构
  12. System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
  13. }
  14. }

image

当在程序启动4s后创建锁对象,就会默认偏向。

因为偏向锁不会自动释放,因此当锁对象处于偏向锁时,另一个线程进来只能依托VM判断上一个获取偏向锁的线程是否存活、是否退出持有锁来决定是锁升级还是进行重偏向。

①:偏向锁的撤销必须等待VM全局安全点(安全点指所有java线程都停在安全点,只有vm线程运行)。
②:撤销偏向锁恢复到无锁(标志位为 01)或轻量级锁(标志位为 00)的状态。
③:只要发生锁竞争,就会进行锁撤销。

备注:
当开启偏向锁时,若持有偏向锁的线程仍然存活且未退出同步代码块,锁升级为轻量级锁/重量级锁之前会进行偏向锁撤销操作。
如果是升级为轻量级锁,撤销之后需要创建Lock Record 来保存之前的markword信息。


批量偏向/撤销概念:
参考1:https://www.cnblogs.com/LemonFive/p/11248248.html

  • 批量重偏向
    当一个线程同时持有同一个类的多个对象的偏向锁时(这些对象的锁竞争不激烈),执行完同步代码块后,如果另一个线程也要持有这些对象的锁,当对象数量达到一定程度时,会触发批量重偏向机制(进行过批量重偏向的对象不可再进行批量重偏向)。
  • 批量锁撤销
    当触发批量重偏向后,会触发批量撤销机制。

阈值定义在globals.hpp中:

  1. // 批量重偏向阈值
  2. product(intx, BiasedLockingBulkRebiasThreshold, 20)
  3. // 批量锁撤销阈值
  4. product(intx, BiasedLockingBulkRevokeThreshold, 40)

可以在VM启动参数中通过-XX:BiasedLockingBulkRebiasThreshold-XX:BiasedLockingBulkRevokeThreshold 来手动设置阈值。


偏向锁的撤销和重偏向的代码(过于复杂)在biasedLocking.cpp中:

  1. void BiasedLocking::revoke_at_safepoint(Handle h_obj) {
  2. assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");
  3. oop obj = h_obj();
  4. HeuristicsResult heuristics = update_heuristics(obj, false);
  5. if (heuristics == HR_SINGLE_REVOKE) {
  6. // 重偏向
  7. revoke_bias(obj, false, false, NULL);
  8. } else if ((heuristics == HR_BULK_REBIAS) ||
  9. (heuristics == HR_BULK_REVOKE)) {
  10. // 批量撤销或重偏向
  11. bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);
  12. }
  13. clean_up_cached_monitor_info();
  14. }

参考2:

对于存在明显多线程竞争的场景下使用偏向锁是不合适的,比如生产者-消费者队列。生产者线程获得了偏向锁,消费者线程再去获得锁的时候,就涉及到这个偏向锁的撤销(revoke)操作,而这个撤销是比较昂贵的。那么怎么判断这些对象是否适合偏向锁呢?jvm采用以类为单位的做法,其内部为每个类维护一个偏向锁计数器,对其对象进行偏向锁的撤销操作进行计数。当这个值达到指定阈值的时候,jvm就认为这个类的偏向锁有问题,需要进行重偏向(rebias)。对所有属于这个类的对象进行重偏向的操作叫批量重偏向(bulk rebias),之前的做法是对heap进行遍历,后来引入epoch。当需要bulk rebias时,对这个类的epoch值加1,以后分配这个类的对象的时候mark字段里就是这个epoch值了,同时还要对当前已经获得偏向锁的对象的epoch值加1,这些锁数据记录在方法栈里。这样判断这个对象是否获得偏向锁的条件就是:mark字段后3位是101,thread字段跟当前线程相同,epoch字段跟所属类的epoch值相同。如果epoch值不一样,即使thread字段指向当前线程,也是无效的,相当于进行过了rebias,只是没有对对象的mark字段进行更新。如果这个类的revoke计数器继续增加到一个阈值,那个jvm就认为这个类不适合偏向锁了,就要进行bulk revoke。于是多了一个判断条件,要查看所属类的字段,看看是否允许对这个类使用偏向锁。

轻量级锁

轻量级体现在线程会尝试在自己的堆栈中创建Lock Record存储锁对象的相关信息,不需要在内核态和用户态之间进行切换,不需要操作系统进行调度。

拿到轻量级锁线程堆栈:
image

Lock Record主要分为两部分:

  • obj
    指向锁对象本身。重入时也如此。
  • displaced header(缩写为hdr)
    第一次拿到锁时hdr存放的是encode加密后的markword,重入时存放null。

思考:为什么锁重入时hdr存放的是null,而不是用计数器来实现呢?
假设一个场景,当一个线程同时拿到A、B、C…N 多个锁的时候,那么线程的堆栈中,肯定有多个锁对象的Lock Record,
如:

  1. synchronized(a){
  2. synchronized(b){
  3. synchronized(c){
  4. // do something
  5. synchronized(a){
  6. // do something
  7. }
  8. }
  9. }
  10. }

当锁a重入时,如果用计数器,还得遍历当前线程堆栈拿到第一次的Lock Record,解锁时也要遍历,效率必然低下。作为jdk底层代码必然讲究效率。
以上纯属个人看法(欢迎交流)。

①:使用遍历方式将当前线程堆栈中属于该锁对象的Lock Record 指向Null。
②:CAS还原markword为无锁状态。
③:第②步失败需要升级为重量级锁。
image

  • 优点
    在线程接替/交替执行的情况下,锁竞争比较小,可以避免成为重量级锁而引起的性能问题。

  • 缺点
    当锁竞争比较激烈、多线程同事竞争锁的时候,需要从轻量级升级为重量级,产生了额外的开销。

加锁
加锁、解锁流程的代码在InterpreterRuntime.cpp中。
这是我从github拉下来的源码:

  1. /**
  2. * (轻量级锁)加锁流程
  3. * */
  4. CASE(_monitorenter): {
  5. // (锁对象本身)
  6. oop lockee = STACK_OBJECT(-1);
  7. // derefing's lockee ought to provoke implicit null check
  8. CHECK_NULL(lockee);
  9. // find a free monitor or one already allocated for this object
  10. // if we find a matching object then we need a new monitor
  11. // since this is recursive enter
  12. BasicObjectLock* limit = istate->monitor_base();
  13. BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  14. // (这个entry就是大家常说的Lock Record吧)
  15. BasicObjectLock* entry = NULL;
  16. while (most_recent != limit ) {
  17. if (most_recent->obj() == NULL) entry = most_recent;
  18. else if (most_recent->obj() == lockee) break;
  19. most_recent++;
  20. }
  21. if (entry != NULL) {
  22. entry->set_obj(lockee);
  23. // (构建一个无锁状态的mark word)
  24. markOop displaced = lockee->mark()->set_unlocked();
  25. // (放到lock record 中)
  26. entry->lock()->set_displaced_header(displaced);
  27. // 锁对象的markword是否为这个无锁的displaced markword
  28. // (CAS替换失败说明锁对象的markword 不是无所状态)
  29. if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
  30. // Is it simple recursive case?
  31. // (判断是否是锁重入)
  32. if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
  33. // (如果是重入场景,那么新的Lock Record 设置为Null)
  34. entry->lock()->set_displaced_header(NULL);
  35. } else {
  36. // (不是锁重入,且抢锁失败,说明锁竞争激烈,升级为重量级。进入重量级锁抢锁流程)
  37. CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
  38. }
  39. }
  40. UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  41. } else {
  42. istate->set_msg(more_monitors);
  43. UPDATE_PC_AND_RETURN(0); // Re-execute
  44. }
  45. }

可以看得出来,这部分代码并没有体现出偏向锁的逻辑,有大佬给出原因,可以参考这篇博客:https://www.jianshu.com/p/4758852cbff4


其他大佬解析后的代码:

点击查看代码
  1. CASE(_monitorenter): {
  2. // lockee 就是锁对象
  3. oop lockee = STACK_OBJECT(-1);
  4. // derefing's lockee ought to provoke implicit null check
  5. CHECK_NULL(lockee);
  6. // code 1:找到一个空闲的Lock Record
  7. BasicObjectLock* limit = istate->monitor_base();
  8. BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  9. BasicObjectLock* entry = NULL;
  10. while (most_recent != limit ) {
  11. if (most_recent->obj() == NULL) entry = most_recent;
  12. else if (most_recent->obj() == lockee) break;
  13. most_recent++;
  14. }
  15. //entry不为null,代表还有空闲的Lock Record
  16. if (entry != NULL) {
  17. // code 2:将Lock Record的obj指针指向锁对象
  18. entry->set_obj(lockee);
  19. int success = false;
  20. uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
  21. // markoop即对象头的mark word
  22. markOop mark = lockee->mark();
  23. intptr_t hash = (intptr_t) markOopDesc::no_hash;
  24. // code 3:如果锁对象的mark word的状态是偏向模式
  25. if (mark->has_bias_pattern()) {
  26. uintptr_t thread_ident;
  27. uintptr_t anticipated_bias_locking_value;
  28. thread_ident = (uintptr_t)istate->thread();
  29. // code 4:这里有几步操作,下文分析
  30. anticipated_bias_locking_value =
  31. (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
  32. ~((uintptr_t) markOopDesc::age_mask_in_place);
  33. // code 5:如果偏向的线程是自己且epoch等于class的epoch
  34. if (anticipated_bias_locking_value == 0) {
  35. // already biased towards this thread, nothing to do
  36. if (PrintBiasedLockingStatistics) {
  37. (* BiasedLocking::biased_lock_entry_count_addr())++;
  38. }
  39. success = true;
  40. }
  41. // code 6:如果偏向模式关闭,则尝试撤销偏向锁
  42. else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
  43. markOop header = lockee->klass()->prototype_header();
  44. if (hash != markOopDesc::no_hash) {
  45. header = header->copy_set_hash(hash);
  46. }
  47. // 利用CAS操作将mark word替换为class中的mark word
  48. if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
  49. if (PrintBiasedLockingStatistics)
  50. (*BiasedLocking::revoked_lock_entry_count_addr())++;
  51. }
  52. }
  53. // code 7:如果epoch不等于class中的epoch,则尝试重偏向
  54. else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
  55. // 构造一个偏向当前线程的mark word
  56. markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
  57. if (hash != markOopDesc::no_hash) {
  58. new_header = new_header->copy_set_hash(hash);
  59. }
  60. // CAS替换对象头的mark word
  61. if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
  62. if (PrintBiasedLockingStatistics)
  63. (* BiasedLocking::rebiased_lock_entry_count_addr())++;
  64. }
  65. else {
  66. // 重偏向失败,代表存在多线程竞争,则调用monitorenter方法进行锁升级
  67. CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
  68. }
  69. success = true;
  70. }
  71. else {
  72. // 走到这里说明当前要么偏向别的线程,要么是匿名偏向(即没有偏向任何线程)
  73. // code 8:下面构建一个匿名偏向的mark word,尝试用CAS指令替换掉锁对象的mark word
  74. markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
  75. if (hash != markOopDesc::no_hash) {
  76. header = header->copy_set_hash(hash);
  77. }
  78. markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
  79. // debugging hint
  80. DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
  81. if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
  82. // CAS修改成功
  83. if (PrintBiasedLockingStatistics)
  84. (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
  85. }
  86. else {
  87. // 如果修改失败说明存在多线程竞争,所以进入monitorenter方法
  88. CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
  89. }
  90. success = true;
  91. }
  92. }
  93. // 如果偏向线程不是当前线程或没有开启偏向模式等原因都会导致success==false
  94. if (!success) {
  95. // 轻量级锁的逻辑
  96. //code 9: 构造一个无锁状态的Displaced Mark Word,并将Lock Record的lock指向它
  97. markOop displaced = lockee->mark()->set_unlocked();
  98. entry->lock()->set_displaced_header(displaced);
  99. //如果指定了-XX:+UseHeavyMonitors,则call_vm=true,代表禁用偏向锁和轻量级锁
  100. bool call_vm = UseHeavyMonitors;
  101. // 利用CAS将对象头的mark word替换为指向Lock Record的指针
  102. if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
  103. // 判断是不是锁重入
  104. if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
  105. //code 10: 如果是锁重入,则直接将Displaced Mark Word设置为null
  106. entry->lock()->set_displaced_header(NULL);
  107. } else {
  108. CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
  109. }
  110. }
  111. }
  112. UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  113. } else {
  114. // lock record不够,重新执行
  115. istate->set_msg(more_monitors);
  116. UPDATE_PC_AND_RETURN(0); // Re-execute
  117. }
  118. }

解锁

  1. /**
  2. * (轻量级锁)解锁流程
  3. * */
  4. CASE(_monitorexit): {
  5. oop lockee = STACK_OBJECT(-1);
  6. CHECK_NULL(lockee);
  7. BasicObjectLock* limit = istate->monitor_base();
  8. BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  9. // (挨个遍历当前线程栈中的Lock Record)
  10. while (most_recent != limit ) {
  11. // (Lock Record 的obj是否是需解锁的锁对象)
  12. if ((most_recent)->obj() == lockee) {
  13. BasicLock* lock = most_recent->lock();
  14. markOop header = lock->displaced_header();
  15. // (将obj设置为null(作删除处理))
  16. most_recent->set_obj(NULL);
  17. // If it isn't recursive we either must swap old header or call the runtime
  18. if (header != NULL) {
  19. // (非重入,CAS替换对象头的markword 为Lock Rocord中的displaced markword)
  20. if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
  21. // restore object for the slow case
  22. // (替换失败,表示锁已膨胀为重量级锁,此时markword指向ObjectMonitor的地址)
  23. most_recent->set_obj(lockee);
  24. // (走重量级锁的锁退出流程)
  25. CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
  26. }
  27. }
  28. UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  29. }
  30. most_recent++;
  31. }
  32. // Need to throw illegal monitor state exception
  33. CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
  34. ShouldNotReachHere();
  35. }

重量级锁

重量级锁是基于monitor模型进行实现的。

重量级锁是如何体现重量级的?
①:需要创建monitor,包含阻塞队列、竞争队列、继承者、锁拥有者等大量数据,会占用大量内存。
②:需要调用操作系统对线程进行park、unpark操作,会涉及到cpu在用户态和内核态之间切换,开销大。
③:monitor所运行的VM线程(内核线程)需要操作系统将那些调度,耗费时间。

image

①:monitor并不是一下子初始化完成的。
②:monitor在初始化的过程中,如果有线程进来获取锁,则会进行自旋。
③:线程进入monitor后会被封装成一个ObjectWaiter(双向链表结构),然后park住当前线程。当有线程退出锁后会进行unpark操作(唤醒操作涉及到操作系统,会产生额外的开销)。

ObjectWaiter的结构:

  1. class ObjectWaiter : public StackObj {
  2. // ...
  3. ObjectWaiter * volatile _next;
  4. ObjectWaiter * volatile _prev;
  5. Thread* _thread;
  6. // ...
  7. };

image

  1. volatile markOop _header; // displaced object header word - mark
  2. void* volatile _object; // backward object pointer - strong root
  3. void * volatile _owner; // pointer to owning thread OR BasicLock
  4. volatile jlong _previous_owner_tid; // thread id of the previous owner of the monitor
  5. volatile intptr_t _recursions; // recursion count, 0 for first entry
  6. int OwnerIsThread ; // _owner is (Thread *) vs SP/BasicLock
  7. ObjectWaiter * volatile _cxq ; // LL of recently-arrived threads blocked on entry.
  8. ObjectWaiter * volatile _EntryList ; // Threads blocked on entry or reentry.
  9. Thread * volatile _succ ; // Heir presumptive thread - used for futile wakeup throttling
  10. volatile intptr_t _count;
  11. volatile intptr_t _waiters; // number of waiting threads
  12. ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor

image

阻塞队列中的线程进入_cxq、_EntryList队列的过程有着不同的策略:

  • policy == 0,头插_EntryList
  • policy == 1,尾插_EntryList
  • policy == 2,头插_cxq
  • policy == 3,尾插_cxq

加锁第一阶段
这部分代码并没有创建monitor。
大部分工作是对锁状态做判断、安全点的检查,考虑无锁、轻量级锁的重入情况,因为锁升级为重量级锁就直接进内核态了,消耗资源太多。


InterpreterRuntime.cpp#monitorenter源码:

  1. IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  2. #ifdef ASSERT
  3. thread->last_frame().interpreter_frame_verify_monitor(elem);
  4. #endif
  5. if (PrintBiasedLockingStatistics) {
  6. Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
  7. }
  8. Handle h_obj(thread, elem->obj());
  9. assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
  10. "must be NULL or an object");
  11. // 开启偏向锁
  12. if (UseBiasedLocking) {
  13. // Retry fast entry if bias is revoked to avoid unnecessary inflation
  14. ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  15. } else {
  16. ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  17. }
  18. assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
  19. "must be NULL or an object");
  20. #ifdef ASSERT
  21. thread->last_frame().interpreter_frame_verify_monitor(elem);
  22. #endif
  23. IRT_END

主要还是看ObjectSynchronizer::fast_enter、ObjectSynchronizer::slow_enter,这部分源码在synchronizer.cpp中。

  1. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
  2. // 开启偏向锁
  3. if (UseBiasedLocking) {
  4. if (!SafepointSynchronize::is_at_safepoint()) {
  5. // 不在安全点(安全点指所有java线程都停在安全点,只有vm线程运行),需要撤销并重偏向
  6. BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
  7. if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
  8. return;
  9. }
  10. } else {
  11. assert(!attempt_rebias, "can not rebias toward VM thread");
  12. // 在安全点进行偏向锁的撤销
  13. BiasedLocking::revoke_at_safepoint(obj);
  14. }
  15. assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
  16. }
  17. // 上述操作是要保证在进入重量级锁之前锁状态应该处于轻量级锁
  18. slow_enter (obj, lock, THREAD) ;
  19. }
  20. /**
  21. * slow enter
  22. * 主要对锁状态做判断,考虑无锁、轻量级锁的重入情况,因为锁升级为重量级锁就直接进内核态了,消耗资源太多。
  23. * */
  24. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  25. markOop mark = obj->mark();
  26. assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  27. // (mark word是无锁状态)
  28. if (mark->is_neutral()) {
  29. lock->set_displaced_header(mark);
  30. if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
  31. TEVENT (slow_enter: release stacklock) ;
  32. return ;
  33. }
  34. } else
  35. // (如果是锁重入)
  36. if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
  37. assert(lock != mark->locker(), "must not re-lock the same lock");
  38. assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
  39. lock->set_displaced_header(NULL);
  40. return;
  41. }
  42. // markword的值设置为值为marked_value的markword(不能看起来无锁,也不能看起来像持有偏向锁、轻量级锁的情况)
  43. lock->set_displaced_header(markOopDesc::unused_mark());
  44. // 膨胀为重量级锁,enter方法后面进入重量级锁的抢占流程
  45. ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
  46. }

如果是进入fast_enter(),那么就会再进行一次偏向锁开启的判断,再进入slow_enter()的逻辑中去,那么为什么不开始就直接进行slow_enter呢?就为了判断下锁偏向和撤销吗?这部分逻辑也完全可以写到slow_enter中去。这么写的原因未知。


加锁第二阶段
形成monitor,用来调度竞争锁的线程。

先看锁的膨胀过程:

  1. ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
  2. // 自旋
  3. for (;;) {
  4. const markOop mark = object->mark() ;
  5. assert (!mark->has_bias_pattern(), "invariant") ;
  6. // The mark can be in one of the following states:
  7. // * Inflated - just return(膨胀完成,直接返回)
  8. // * Stack-locked - coerce it to inflated(轻量级加锁状态)
  9. // * INFLATING - busy wait for conversion to complete(膨胀中)
  10. // * Neutral - aggressively inflate the object.(无锁状态)
  11. // * BIASED - Illegal. We should never see this()(偏向锁,非法,这里不能出现)
  12. // CASE: inflated
  13. if (mark->has_monitor()) {
  14. ObjectMonitor * inf = mark->monitor() ;
  15. assert (inf->header()->is_neutral(), "invariant");
  16. assert (inf->object() == object, "invariant") ;
  17. assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
  18. return inf ;
  19. }
  20. // 膨胀中,进行下一轮自旋
  21. if (mark == markOopDesc::INFLATING()) {
  22. TEVENT (Inflate: spin while INFLATING) ;
  23. ReadStableMark(object) ;
  24. continue ;
  25. }
  26. // 轻量级锁状态
  27. if (mark->has_locker()) {
  28. // 为当前线程分配一个monitor
  29. ObjectMonitor * m = omAlloc (Self) ;
  30. m->Recycle();
  31. m->_Responsible = NULL ;
  32. m->OwnerIsThread = 0 ;
  33. m->_recursions = 0 ;
  34. m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
  35. // CAS操作:尝试将markword设置为INFLATING状态,失败进行下一轮自旋
  36. markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
  37. if (cmp != mark) {
  38. omRelease (Self, m, true) ;
  39. continue ; // Interference -- just retry
  40. }
  41. markOop dmw = mark->displaced_mark_helper() ;
  42. assert (dmw->is_neutral(), "invariant") ;
  43. m->set_header(dmw) ;
  44. m->set_owner(mark->locker());
  45. m->set_object(object);
  46. guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
  47. object->release_set_mark(markOopDesc::encode(m));
  48. if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
  49. TEVENT(Inflate: overwrite stacklock) ;
  50. if (TraceMonitorInflation) {
  51. if (object->is_instance()) {
  52. ResourceMark rm;
  53. tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
  54. (void *) object, (intptr_t) object->mark(),
  55. object->klass()->external_name());
  56. }
  57. }
  58. return m ;
  59. }
  60. /**
  61. * 走到这里说明1:monitor 未膨胀完成 2:monitor不在膨胀过程中 3:锁状态也不是轻量级状态
  62. * 能走到这里说明锁状态已经变为无锁状态了
  63. */
  64. assert (mark->is_neutral(), "invariant");
  65. ObjectMonitor * m = omAlloc (Self) ;
  66. m->Recycle();
  67. m->set_header(mark);
  68. m->set_owner(NULL);
  69. m->set_object(object);
  70. m->OwnerIsThread = 1 ;
  71. m->_recursions = 0 ;
  72. m->_Responsible = NULL ;
  73. m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
  74. // (省略部分代码)
  75. return m ;
  76. }
  77. }

ObjectSynchronizer::omAlloc的作用:

尝试从线程的本地omFreeList 分配。线程将首先尝试从其本地列表中分配,然后从全局列表中,只有在那些尝试失败后,线程才会尝试实例化新的监视器。线程本地空闲列表占用 加热 ListLock 并改善分配延迟,并减少共享全局列表上的一致性流量。

总之我也没看懂,大概就是分配一个monitor给该线程用…


加锁第三阶段
当monitor形成之后,线程是阻塞还是拿到锁执行同步块代码,就看线程自己的运气了。

线程进入monitor:

  1. void ATTR ObjectMonitor::EnterI (TRAPS) {
  2. // 省略部分代码...
  3. // 尝试获取锁
  4. if (TryLock (Self) > 0) {
  5. return ;
  6. }
  7. DeferredInitialize () ;
  8. // 不死心,再来一次
  9. if (TrySpin (Self) > 0) {
  10. return ;
  11. }
  12. ObjectWaiter node(Self) ;
  13. Self->_ParkEvent->reset() ;
  14. node._prev = (ObjectWaiter *) 0xBAD ;
  15. node.TState = ObjectWaiter::TS_CXQ ;
  16. ObjectWaiter * nxt ;
  17. for (;;) {
  18. // 头插_cxq
  19. node._next = nxt = _cxq ;
  20. if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
  21. // 还来?
  22. if (TryLock (Self) > 0) {
  23. return ;
  24. }
  25. }
  26. // 省略部分代码...
  27. for (;;) {
  28. if (TryLock (Self) > 0) break ;
  29. assert (_owner != Self, "invariant") ;
  30. if ((SyncFlags & 2) && _Responsible == NULL) {
  31. Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
  32. }
  33. // park self
  34. if (_Responsible == Self || (SyncFlags & 1)) {
  35. TEVENT (Inflated enter - park TIMED) ;
  36. Self->_ParkEvent->park ((jlong) RecheckInterval) ;
  37. // Increase the RecheckInterval, but clamp the value.
  38. RecheckInterval *= 8 ;
  39. if (RecheckInterval > 1000) RecheckInterval = 1000 ;
  40. } else {
  41. TEVENT (Inflated enter - park UNTIMED) ;
  42. Self->_ParkEvent->park() ;
  43. }
  44. // 唤醒后又可以进行抢锁啦~
  45. if (TryLock(Self) > 0) break ;
  46. // 省略部分代码...
  47. }
  48. return ;
  49. }

果然synchronized不是公平锁,不过这也太不公平了。


解锁第一阶段
owner在退出持有锁的时候,会根据monitor的QMode策略,决定继承者的选取方式,选定继承者之前owner仍然会持有锁,以保证并行性。

  1. void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
  2. // 省略部分代码...
  3. // 重入次数递减至0
  4. if (_recursions != 0) {
  5. _recursions--; // this is simple recursive enter
  6. TEVENT (Inflated exit - recursive) ;
  7. return ;
  8. }
  9. if ((SyncFlags & 4) == 0) {
  10. _Responsible = NULL ;
  11. }
  12. // 自旋
  13. for (;;) {
  14. // (...) 省略部分代码
  15. ObjectWaiter * w = NULL ;
  16. int QMode = Knob_QMode ;
  17. // 绕过EntryList,直接从_cxq中唤醒线程作为下一个继承者用于竞争锁
  18. if (QMode == 2 && _cxq != NULL) {
  19. w = _cxq ;
  20. assert (w != NULL, "invariant") ;
  21. assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  22. ExitEpilog (Self, w) ;
  23. return ;
  24. }
  25. // 将_cxq队列中的线程移到_EntryList尾部
  26. if (QMode == 3 && _cxq != NULL) {
  27. w = _cxq ;
  28. for (;;) {
  29. assert (w != NULL, "Invariant") ;
  30. ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
  31. if (u == w) break ;
  32. w = u ;
  33. }
  34. assert (w != NULL , "invariant") ;
  35. ObjectWaiter * q = NULL ;
  36. ObjectWaiter * p ;
  37. for (p = w ; p != NULL ; p = p->_next) {
  38. guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  39. p->TState = ObjectWaiter::TS_ENTER ;
  40. p->_prev = q ;
  41. q = p ;
  42. }
  43. ObjectWaiter * Tail ;
  44. for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
  45. if (Tail == NULL) {
  46. _EntryList = w ;
  47. } else {
  48. // _EntryList 的tail的next执行_cxq的头部
  49. Tail->_next = w ;
  50. w->_prev = Tail ;
  51. }
  52. }
  53. // 将_cxq队列中的线程移到_EntryList头部
  54. if (QMode == 4 && _cxq != NULL) {
  55. // 如此可以保证最近竞争锁线程处于_EntryList的头部
  56. w = _cxq ;
  57. for (;;) {
  58. assert (w != NULL, "Invariant") ;
  59. ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
  60. if (u == w) break ;
  61. w = u ;
  62. }
  63. assert (w != NULL , "invariant") ;
  64. ObjectWaiter * q = NULL ;
  65. ObjectWaiter * p ;
  66. for (p = w ; p != NULL ; p = p->_next) {
  67. guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  68. p->TState = ObjectWaiter::TS_ENTER ;
  69. p->_prev = q ;
  70. q = p ;
  71. }
  72. // 此时q为_cxq对了的tail线程
  73. if (_EntryList != NULL) {
  74. q->_next = _EntryList ;
  75. _EntryList->_prev = q ;
  76. }
  77. _EntryList = w ;
  78. }
  79. // 若_EntryList不为空,QMode = 3 || QMode = 4 会唤醒_EntryList头部线程作为下一位继承者,并进行unpark操作
  80. w = _EntryList ;
  81. if (w != NULL) {
  82. assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
  83. ExitEpilog (Self, w) ;
  84. return ;
  85. }
  86. w = _cxq ;
  87. if (w == NULL) continue ;
  88. /*
  89. * 能走到这里说明在这步采用线程进入_cxq队列,前面的操作中_cxq和_EntryList都是空队列
  90. */
  91. for (;;) {
  92. assert (w != NULL, "Invariant") ;
  93. ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
  94. if (u == w) break ;
  95. w = u ;
  96. }
  97. TEVENT (Inflated exit - drain cxq into EntryList) ;
  98. assert (w != NULL , "invariant") ;
  99. assert (_EntryList == NULL , "invariant") ;
  100. if (QMode == 1) {
  101. ObjectWaiter * s = NULL ;
  102. ObjectWaiter * t = w ;
  103. ObjectWaiter * u = NULL ;
  104. // 将_cxq队列反转,s为反转之后的_cxq
  105. while (t != NULL) {
  106. guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
  107. t->TState = ObjectWaiter::TS_ENTER ;
  108. u = t->_next ;
  109. t->_prev = u ;
  110. t->_next = s ;
  111. s = t;
  112. t = u ;
  113. }
  114. // 将反转倒序之后的_cxq放进_EntryList中
  115. _EntryList = s ;
  116. assert (s != NULL, "invariant") ;
  117. } else {
  118. // QMode == 0 or QMode == 2
  119. _EntryList = w ;
  120. ObjectWaiter * q = NULL ;
  121. ObjectWaiter * p ;
  122. // 将_cxq由单向链表转为双向链表
  123. for (p = w ; p != NULL ; p = p->_next) {
  124. guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  125. p->TState = ObjectWaiter::TS_ENTER ;
  126. p->_prev = q ;
  127. q = p ;
  128. }
  129. }
  130. if (_succ != NULL) continue;
  131. w = _EntryList ;
  132. if (w != NULL) {
  133. guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
  134. ExitEpilog (Self, w) ;
  135. return ;
  136. }
  137. }
  138. }

解锁第二阶段
唤醒继承者,让它去尝试获取锁。

  1. // 选取继承者、唤醒继承者队列的头部线程(代码就不看了):
  2. void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
  3. // Exit protocol:
  4. // 1. ST _succ = wakee
  5. // 2. membar #loadstore|#storestore;
  6. // 2. ST _owner = NULL
  7. // 3. unpark(wakee)
  8. }

总结

1:无论偏向锁、轻量级锁、重量级锁,都是可重入的。所以熟悉JAVA并发包的ReentrantLock重入锁机制是有必要的。
2:只有重量级锁需要操作系统去进行调度竞争锁的线程。
3:偏向锁的撤销不是为了使锁降级为无锁状态,而是需要先降级再转变为轻量级锁状态。
4:偏向锁的撤销需要等待全局安全点,且锁撤销有一定的开销。所以在多线程竞争激烈的情况下,可以实现关闭偏向锁来进行性能调优。

想看源码的看这些文件。
image


其他优化
JDK1.6 对锁的实现引入了大量的优化,如偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除、锁粗化等技术来减少锁操作的开销。

①:适应性自旋
升级为重量级锁之前,会尝试自旋一定次数(默认10次,可通过参数-XX : PreBlockSpin来更改)来延缓进入重量级锁的过程。
优点:若真的成功则可以避免锁升级,减少线程进入monitor从而带来的一系列开销。同时当前线程不会经历挂起-唤醒的过程,可以更快响应。
缺点:会一直占用cpu,若自旋失败则是额外的浪费。

②:锁粗化
将连在一起的加锁、解锁操作扩大范围,只进行一次性加锁、解锁操作。
如:

  1. Object lock = new Object();
  2. List<String> list = new ArrayList();
  3. synchronized(lock){
  4. list.add("a");
  5. }
  6. synchronized(lock){
  7. list.add("b");
  8. }
  9. synchronized(lock){
  10. list.add("c");
  11. }

优化为:

  1. Object lock = new Object();
  2. List<String> list = new ArrayList();
  3. synchronized(lock){
  4. list.add("a");
  5. list.add("b");
  6. list.add("c");
  7. }

③:锁消除
若当前线程创建的对象分配在堆,但不会被其他线程使用,那么这段代码就可以不加锁。
或者根据逃逸分析,当前线程new的对象不会被其他线程使用,那么也不需要加锁。


其他问题
①:当所状态为偏向锁时,如何存储hashcode信息?
若hashCode方法的调用是在对象已经处于偏向锁状态时调用,它的偏向状态会被立即撤销,并且锁会升级为重量级锁。

②:什么线程复用?
两个线程间隔5s启动,markword中thread信息一摸一样这个现象实际上就是JVM线程复用。


本文参考文章:
①: 小米信息部技术团队-synchronized 实现原理
②:synchronized的jvm源码加锁流程分析聊锁的意义
③:Java对象的内存布局
④:盘一盘 synchronized (二)—— 偏向锁批量重偏向与批量撤销
⑤:https://www.bbsmax.com/A/xl56qY9rJr/
⑥:Java并发编程:Synchronized底层优化(偏向锁、轻量级锁)

感触:上网搜很难看到自己想要的内容,甚至有的文章还会起误导性作用。果然还是要好好学习,厉害的大佬比比皆是。在性能调优上哪有什么最优解,只有合适与不合适,重在选择与取舍。

版权声明:本文为竹根七原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zgq7/p/16257327.html