简单看看LockSupport和AQS
这次我们可以看看并发中锁的原理,大概会说到AQS,ReentrantLock,ReentrantReadWriteLock以及JDK8中新增的StampedLock,这些都是在java并发中很重要的东西,慢慢看吧!
一.LockSupport工具类
LockSupport工具类是jdk中rt.jar里面的,主要作用是挂起和唤醒线程,该类是创建锁和创建其他同步类的基础。还有我们要知道,LockSupport这个类是以Unsafe这个类为基础,讲过前面简单的看了看Unsafe,是不是觉得还是比较熟悉的吧!
我们先看看LockSupport的park(英文翻译:停下,坐下)和unpark(英文翻译:唤醒,启动)方法,注意,这两个方法和wait和notify功能很像,但是在这里我更喜欢叫做授权!
简单的看一个例子:
package com.example.demo.study; import java.util.concurrent.locks.LockSupport; public class Study0130 { public static void main(String[] args) { System.out.println("main begin"); LockSupport.park(); System.out.println("main end"); } }
我们可以看到我们直接调用park方法的话,当前的线程就阻塞了,不能到后面去了,这里我们可以说当前线程没有被LockSupport类授权,没有许可证,所以到这里碰到park()这个路口就只能挂了;那么怎么样才能使得当前线程被授权呢?我们就需要unpark()方法进行授权
package com.example.demo.study; import java.util.concurrent.locks.LockSupport; public class Study0130 { public static void main(String[] args) { //这里就是给当前线程授权了,当前线程可以随便跑,碰到park都不会挂 LockSupport.unpark(Thread.currentThread()); System.out.println("main begin"); LockSupport.park(); System.out.println("main end"); } }
还记得以前的wait和notify的用法么?一个线程A中调用了wait方法,那么线程A就挂起了,如果在线程B中调用notify方法,那么A线程就会被唤醒;这里的park和unpark方法也可以实现这种,看以下代码:
package com.example.demo.study; import java.util.concurrent.locks.LockSupport; public class Study0130 { public static void main(String[] args) { Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println("thread1 start"); //线程1会阻塞 LockSupport.park(); System.out.println("thread1 end"); } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { System.out.println("thread2 start"); //给线程1授权 LockSupport.unpark(thread1); System.out.println("thread2 end"); } }); thread1.start(); thread2.start(); } }
我们打开LockSupport的park和unpark方法可以发现,就是调用的Unsafe实现的,可惜看不到源码…
假如我们调用park方法使得线程阻塞太久了也不是我们想看到的,我们还可以使用parkNanos设置阻塞时间,当时间到了,就会自动返回:
最后说一下,还可以调用park方法的时候传进去一个对象,比如LockSupport.park(this);这样使用可以使用jstack pid命令查看堆栈信息的时候,可以看到是那个类被阻塞了!
到此为止,应该就是LockSupport的常用方法了!
二.认识AQS
AQS全称是AbstractQueuedSynchronizer,叫做抽象同步队列,用于实现各种同步组件,比如并发包中的锁就是用这个实现的,把这个弄清楚了,那些锁的机制就差不多懂了!
那么所谓的AQS到底是什么呢?其实就是一个有顺序的双向链表(或者叫做FIFO双向队列,一样的意思),在这个双向链表中,每一个节点中都可以存放一个线程,节点的所有属性如下图所示,我们随便说几个;
prev表示指向前一个节点,next指向后一个节点,thread表示当前节点存储的一个线程,SHARED表示当前节点存储的线程是由于获取共享资源是被阻塞了才被丢到链表中的;EXCLUSIVE表示当前节点存储的线程是由于获取独占资源阻塞才被丢到链表中来的;
waitStatus表示当前节点存储的线程的状态,可能的状态有以下几种:(1)CANCELLED = 1; 表示线程被取消了 (2)SIGNAL = -1; 表示线程需要唤醒 (3)CONDITION = -2;表示线程在链表中等待 (4)PROPAGATE = -3;表示线程释放共享资源时需要通知其他节点;
注意,这里其实还有一个状态,就是waitStatus为0,表示当前节点是初始状态,所以可以知道当waitStatus大于0的时候是无效状态,小于零才是有效状态
这个Node类是AQS的一个内部类,那么怎么通过AQS来访问这个链表呢?下面我们再来看看AQS有哪些属性可以帮助我们访问这个双向链表;
//字段 //指向链表的头节点 private transient volatile Node head; //指向链表的尾节点 private transient volatile Node tail; //状态信息,这个字段在每个实现类中表达的意思都不一样,比如在ReentrantLock中表示可重入的次数, //在Semaphore中表示可用信号的个数等等用法 private volatile int state; //获取Unsafe对象,前面用过的,还记得说过为什么可以使用getUnsafe的方式获取对象,而我们自己的类中却不能用这种方式 private static final Unsafe unsafe = Unsafe.getUnsafe(); //下面的这几个属性就是获取AQS类中的字段的偏移量,在前几篇的博客已经说过了这偏移量有什么用 private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; //方法 //这几个方法都是尝试获取锁 public final void acquire(int arg) {}//独占方式 protected boolean tryAcquire(int arg) {} public final void acquireShared(int arg) {}//共享方式 public final void acquireInterruptibly(int arg){}//独占方式
public final void acquireSharedInterruptibly(int arg){}//共享方式 //这几个方法都是试图释放锁 public final boolean release(int arg) {}//独占方式 public final boolean releaseShared(int arg) {}//共享方式 protected boolean tryRelease(int arg) {} protected boolean tryReleaseShared(int arg) {}
在AQS中对线程的同步主要的是操作state,对state的操作方式分为两种,独占方式和共享方式,至于两种方式各自的获取锁和释放锁的方法在上面已经标识出来了!
这里稍微提一下什么叫做锁啊?在java多线程中可以把一个对象当做一个锁,为什么呢?我们可以简单看看一个普通的java对象(不是数组)在java堆中有哪些组成部分:
一个java对象是由对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)三部分组成,实例数据和对齐填充可以看做是一类,因为对齐填充就是起到填充空白的作用,因为java对象的字节数必须是8的倍数(对象头肯定是8的倍数,这里其实就是填充实例数据成8的倍数即可),所以对齐填充可能有也可能没有;
对象头一般有两部分组成(数组的话还有一个部分,即数组长度),如下所示:
第一部分:用于存储对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等,这部分数据的长度在32位和64位的虚拟机(未开启压缩指针)中分别为32bit和64bit,官方称它为“MarkWord”。
第二部分:对象头的另外一部分是klass类型指针,即对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。
我们可以看作一个对象就是一个锁,如果一个线程获取了某个锁,那么在这个锁对象的对象头的markword中存了某个线程的编号,也就表示该线程持有了该锁!
上面说了这么多,我们大概就知道了所谓的AQS就是如下图所示这样,维护了一个链表,每次只有头部的这个节点中的线程是运行的,当头部的线程由于某些原因阻塞了或中断了,下一个线程才会尝试获取资源,重复如此
然后我们再来说说一个线程以独占方式获取资源或者是共享方式获取资源;
三.独占方式
当一个线程要以独占方式获取该资源,说得直白一点就是实现一个独占锁,类似synchorized代码块一样,对共享资源的操作都在这个代码块中,一个线程只有先获取这个锁才能进入到代码块中操作共享资源,其他线程尝试获取锁的时候,和这个锁中对象头的线程编号比较如果不一样,那就只能将这个线程放到链表中存起来,然后该线程挂起来,等条件满足之后再唤醒,就是使用LockSupport的park和unpark方法实现的。
就以ReentrantLock为例,一个线程获取到了ReentrantLock的锁之后,在AQS中就会首先使用CAS将state从0变为1,然后设置当前锁为本线程所持有;如果当前线程继续尝试获取锁,那么只会将state从1变为2,其他的没啥变化,这也叫做可重入次数;当如果其他线程去尝试获取锁的时候,那么发现锁对象的对象头中不是自己线程编号,于是就丢进了阻塞队列中挂起;
1.当线程通过acquire(int arg)获取独占资源时:
public final void acquire(int arg) { //1.tryAcquire方法没有实现,这个方法主要是留给具体子类去实现,通过具体场景去用CAS修改state的值,修改成功返回true,否则false //2.如果修改state的值失败,就会到第二个条件这里,这里会将当前线程封装成一个Node.EXCLUSIVE类型的节点,然后存到链表尾端,最后在acquireQueued方法内部会调用
LockSupport.park(this);方法阻塞线程 //3.调用selfInterrupt方法中断当前的线程,为什么要这样呢?因为一个线程在阻塞队列中等待,这时通过某种方式把它中断了,不会立即看到效果的,
//只会在这个线程获取资源后再调用selfInterrupt方法将中断补上
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //中断当前线程 static void selfInterrupt() { Thread.currentThread().interrupt(); }
2.当线程通过release(int arg)释放独占资源时:
public final boolean release(int arg) { //tryRelease方法没有实现,子类根据具体场景是实现,其实就是修改state的值 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //这个方法在下面,里面会调用LockSupport.unpark(s.thread)方法激活阻塞队列中的一个节点的线程,而这个激活的线程会通过tryAcquire尝试当前的state是否满足自己的需要
//满足条件的话就运行,不满足的话还是会挂起 unparkSuccessor(h); return true; } return false; }
通过简单的看了这获取资源和释放资源我们可以看到底层还是使用的Unsafe的park和unpark方法,还有就是tryAcquire()方法和tryRelease()方法需要在具体的子类自己实现,在其中就是对AQS中state的修改,子类还需要定义state这个状态值的增减是什么含义;
例如ReentrantLock继承自AQS的实现中,state为0表示锁空闲,为1表示锁被占用,在重写tryAcquire()方法的时候,需要用CAS将state的值从0改为1,并且设置当前锁的持有者就是当前线程;而重写tryRelease()方法的时候,就需要用CAS将state的值从1改为0,然后设置当前锁的持有者为null
四.共享方式
知道了独占方式之后,共享方式就简单了,什么叫做共享?同一时间可以有多个线程获取资源,这就叫做共享!!!
一个线程尝试去获取资源成功后,此时另外一个线程也可以直接用CAS去尝试获取资源,成功的话就修改,失败的话就丢进链表中存起来;例如Semaphore信号量,当一个线程通过acquire()方法获取信号量的时候,信号量满足条件就通过CAS去获取,不满足就将线程丢到链表里面;
共享方式和前面的独占方式其实很像,我们也来简单的看一看:
1.当线程通过acquireShared(int arg)获取共享资源时:
public final void acquireShared(int arg) { //tryAcquireShared方法也是没有实现,留给具体子类会根据实际情况实现,会设置state的值,设置成功就直接返回 //设置失败的话就进入到doAcquireShared方法中,这个方法里会将当前线程封装为Node.SHARED类型的节点,然后放到阻塞队列的最后面
//使用LockSupport.park(this)方法挂起自己 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
2.当线程通过releaseShared(int arg)释放共享资源时:
public final boolean releaseShared(int arg) { //tryReleaseShared方法由子类实现,修改state的值,尝试释放资源 //释放资源成功的话,然后使用LockSupport.unpark(thread)去唤醒阻塞队列中的一个线程 //激活的线程会使用tryReleaseShared查看当前state的值是否符合自己的需要,满足则激活,向下运行,否则还是被放在AQS阻塞队列中挂起 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
例如读写锁ReentrantReadWriteLock就是继承自AQS的实现,由于state是int类型的,32位,高16位表示获取读锁的次数,所以读锁的tryAcquireShared方法实现中,首先检查写锁是否被其他线程持有,是则返回false,否则就用CAS将state的高16位+1;在读锁的tryReleaseShared的实现中,内部使用CAS将state的高16位减一,成功的话就返回true,失败的话返回false