从源码来看ReentrantLock和ReentrantReadWriteLock
从源码来看ReentrantLock和ReentrantReadWriteLock
上一篇花了点时间将同步器看了一下,心中对锁的概念更加明确了一点,知道我们所使用到的锁是怎么样获取同步状态的,我们也写了一个自定义同步组件Mutex,讲到了它其实就是一个简版的ReentrantLock,本篇文章我们就来看看ReentrantLock底层是怎么样的!
目录结构:
- ReentrantLock
- 公平锁与非公平锁
- ReentrantReadWriteLock
ReentrantLock
ReentrantLock我们叫做可重入锁,也就是我们可以重复进入的意思,也就是表示,一个线程可以对指定资源进行重复加锁,并且还能够选择公平锁和非公平锁。
公平锁:先请求获取加锁的线程先被满足
非公平锁:XXXX
可重入锁可以对一个资源重复加锁,同时,在释放锁时也要进行多次release,所以不难想到,ReentrantLock只要维持一个值,用来控制这个资源加锁的次数就Ok了,初始化为零,当加锁时对这个值+1,release时-1。
public class ReentrantLock implements Lock, java.io.Serializable{
.
.
.
}
这个是ReentrantLock的定义,上一节我就无耻的把它贴出来了→.→
接下来我们来看看ReentrantLock是如何对资源进行加锁的!
首先肯定要定义一个继承自AbstractQueuedSynchronizer的内部类:
abstract static class Sync extends AbstractQueuedSynchronizer {
.
.
.
}
构造函数的话,因为它可以选择公平和非公平锁,所以是否公平就是由构造函数决定的:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
FairSync()和NonfairSync()都是构造函数,当然都是内部类啦,他们可以分别创建公平锁和非公平锁。
我自己粗略的看了一下源码,将ReentrantLock的大概结构画了一下:
刚才我们也说了,根据参数ReentrantLock会根据我们的需要创建对应的锁,当没有参数的时候我们来看看它默认的是什么?
/**
* Creates an instance of {@code ReentrantLock}.
* 创建一个实例
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();//sync是Reentrant内的Sync类型的成员变量
}
从代码中我们可以知道,当我们使用无参构造的时候ReentrantLock会为我们创建一个非公平锁(事实上,大大多数情况下非公平锁的效率会更好);
那么我们先来看一下非公平锁是怎么获得同步状态的:
//非公平锁是继承Sync是毋庸置疑的
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//这个是初次加锁的时候,判断同步状态是否被占用
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
//当被占用后开始调用这个方法了
//(当然后面的方法还会对这占用同步状态的线程是否是当前线程,
//因为这个锁是独占的,仅仅允许一个线程多次加锁)
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
(一)
上面的compareAndSetState我们已经不陌生了,上一篇文章我们在写叫Mutex的自定义同步组件的时候看到过,它的功能是:判断当前的state是否为0,如果为0那么获取锁,如果不为0,那么将state设置为1,:
//这个方法是同步器提供的方法,参数是expect和update,它继续调用了unsafe内的方法(内部是通过CAS来实现原子操作),
//将this(当前状态)和expect比较,如果相同返回true,如果不同则将state设置为update
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
(二)
接着便是setExclusiveOwnerThread(Thread thread),这个方法比较陌生,我们继续跟源码,
//这个方法是在AbstractOwnableSynchronizer内,从名字估计大家应该知道是干嘛的了,
//我们看看这个类的注释
/**
* A synchronizer that may be exclusively owned by a thread. This
* class provides a basis for creating locks and related synchronizers
* that may entail a notion of ownership. The
* {@code AbstractOwnableSynchronizer} class itself does not manage or
* use this information. However, subclasses and tools may use
* appropriately maintained values to help control and monitor access
* and provide diagnostics.
*博主英语不太好,勉强知道意思是:这个类是一个线程独占的同步器,
*这个类提供创建锁和相关同步器的基础(伴随着所有权的概念),
*这个类本身不控制和使用信息,子类和和工具可以适当使用维持的值去帮助控制和监视访问,提 * 供诊断(额,好几个单词不会0.0)
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
所以我们可以看出这个是干嘛的了:
//同步器继承自AbstractOwnableSynchronizer,Sync继承自同步器,
//NonfairSync 继承自Sync,所以这下该该明白了!
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
}
NonfairSync 间接继承AbstractOwnableSynchronized,所以他可以调用它使用的方法,将exclusiveOwnerThread 变量设置为当前线程
//这个变量是AbstractOwnableSynchronized的成员变量
private transient Thread exclusiveOwnerThread;
所以为我们知道这个设置是用来帮助控制与监视用的!
(三)
接下来我们继续向下看,如果无法第一次无法获取同步状态,调用acquire方法,之前我们也见过这个方法,这个方法尝试获取同步状态,否则将当前线程组装成一个节点放入同步队列中,但是在这里,ReentrantLock对它进行了重写:
public final void acquire(int arg) {
//其他的都没变,我们继续跟tryAcquire
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里的tryAcquire不再是 throw new UnsupportedOperationException();,
而是:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
所以最终调用nonfairTryAcquire,至此我们真正的主角才上场!(鼓掌!!)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再次判断当前状态
if (c == 0) {
//下面这行代码我就不用说了吧(参考上面的lock方法)
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前的线程为我们之前存的线程(就是前面已经获取同步状态的线程)
else if (current == getExclusiveOwnerThread()) {
//c为当前的状态,acquires为参数为1
//当前状态最小值为0,表示当前无线程获取同步状态
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//将当前状态设置为nextc,也就是在原来c的基础上加1了
setState(nextc);
return true;
}
return false;
}
以上就是非公平锁的加锁方法,如果彻底明白了加锁的方法,那么release方法也不成问题了,接着看释放方法:
我们都知道ReentrantLock是同过unlock来释放锁的:
public void unlock() {
sync.release(1);
}
继续往下面跟release方法(记得参数是1):
//tryRelease方法就是不同的地方下面我们说
public final boolean release(int arg) {
//这个tryRelease方法被我们的ReentrantLock重写过,
//不再是抛出UnsupportedOperationException错误了
//下面我们详解tryRelease
if (tryRelease(arg)) {
//获取要释放的节点
Node h = head;
//如果节点不为空,且waitStatus不为0(0为初始状态)
if (h != null && h.waitStatus != 0)
//这个方法使用LockSupport来唤醒下一个节点
unparkSuccessor(h);
return true;
}
return false;
}
如果tryRelease释放成功(即不同步状态未0),那么获取要释放的节点,并判断当前节点是否存在,这个等待状态不能为0,只有这样才能进行下一步的唤醒操作!
记得在上一篇文章中,我就提到了release方法,但是没有详细说明,只是说unparkSuccessor方法是用来唤醒下一个节点的,这次来看看unparkSuccessor方法:
private void unparkSuccessor(Node node) {
//获取当前节点的waitStatus
int ws = node.waitStatus;
//如果ws小于零即表示当前节点满足可以通知下一个节点
if (ws < 0)
//CAS操作,将waitStatus设置为0(node的waitStatus一定是相等的)
compareAndSetWaitStatus(node, ws, 0);
//获取释放节点的下一个节点
Node s = node.next;
//如果s节点为null或者waitStatus > 0(即不是初始状态)
//那么s是空的
if (s == null || s.waitStatus > 0) {
s = null;
//以下为同步队列的变动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果s不为空,那么使用LockSupport来唤醒s
//LockSupport是用来唤醒阻塞中断的线程的,后面一章我们详细来讲解
if (s != null)
LockSupport.unpark(s.thread);
}
以上就完成了锁的释放和唤醒下一个节点,貌似我们少说了什么,对就是tryRelease
//boolean类型,如果释放成功则返回true,否则返回false
protected final boolean tryRelease(int releases) {
//我们之前说过了这个参数为1
//获取当前同步状态并减一
int c = getState() - releases;
//判断当前线程是否为独占线程,如果不是抛出错误
//(错误释放错误,都不是你加锁的,你来凑什么热闹)
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果c为0,即getState为1,那么free为true,同步状态未空闲
if (c == 0) {
free = true;
//将独占线程成员变量设置为空
setExclusiveOwnerThread(null);
}
//将同步状态设置为0
setState(c);
这个时候返回free
return free;
}
以上便是ReentrantLock的解锁代码,因为是可重入的,所以当同步状态部位0的时候(大于零),我们可以多次调用unlock方法来调用释放同步状态!
以上就是非公平锁的的基本操作,接下来看看非公平锁是怎么样的:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
//获得当前线程
final Thread current = Thread.currentThread();
//获得同步状态
int c = getState();
//如果同步状态为0(同步状态空闲)
if (c == 0) {
//如果空闲,那么判断当前线程是否有前驱(意思是让当前线程不能插队)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁里面同样头lock方法并重写了tryAcquire方法,lock方法里面调用acquire方法,acquire方法和之前的一样:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
所以唯一不同的还是这个tryAcquire方法:这个方法和非公平锁不同在哪里?就是这个方法里多了一个判断hasQueuedPredecessors,这个方法判断同步队列中是否有前驱节点,如果这个方法返回true,表示有前驱节点,有线程比当前线程更早的获取锁,所以要前驱线程获取锁后释放才能继续获取锁,其他的代码都和上面的相同,我们不必纠结!
ReentrantReadWriteLock
在前面我写了一篇文章是关于读写锁的应用,主要的内容是:读读共享,读写互斥,写写互斥,读写锁维护了两把锁,一把读锁和一把写锁,通过分离读锁和写锁来提高并发性,因为在大多数并发情况下都是读数据,所以这样可以提升并发处理的效率。
我们先看下简单的结构图:
//ReentrantReadWriteLock 实现ReadWriteLock接口
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
.
.
.
}
ReadWriteLock的接口其实很简单,只是规范了两个方法:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*/
Lock readLock();
}
看看ReentrantReadWriteLock有哪些字段:
//这个是读锁ReadLock,这个类是ReentrantReadWriteLock的内部类
private final ReentrantReadWriteLock.ReadLock readerLock;
这个是读锁WriteLock,这个类是ReentrantReadWriteLock的内部类
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
构造方法:
//这个构造函数通过this调用下面的这个构造函数
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
//读写锁同样有公平锁和非公平锁
//通过无参构造和当前的构造方法我们可以看出默认的是new一个非公平锁
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
实现接口的方法:
//分别返回读锁和写锁的方法
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}
我们上面看到了final类型的 sync,当然毋庸置疑,ReentrantReadWriteLock里面也有继承同步器的内部类:
//这里的内部类是抽象类型的哟
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//下面定义了四个常亮
//这个常亮表示共享位移为16
static final int SHARED_SHIFT = 16;
//共享单元
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//最大数目
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//独占掩码
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//共享数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//独占数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
//为当前类的内部类,继承ThreadLocal,以调用线程为key,
//HoldCounter为value的对象
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
//一系列的字段
//当前线程读锁的持有量
private transient ThreadLocalHoldCounter readHolds;
//最后一个线程成功获取读锁的持有量
private transient HoldCounter cachedHoldCounter;
//第一个获取读锁的线程
private transient Thread firstReader = null;
//第一个获取读锁线程的锁持有量
private transient int firstReaderHoldCount;
//无参构造
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
//这个和ReentrantLock中的相同(因为继承了同步器,所以这些方法方法要实现)
//以下如果我们看见和之前和同步器内的方法名相同,那么你不用怀疑,它就是一样的
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
//这个是读写锁特有的方法:释放共享锁(继承同步器后自定义的方法)
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
//这个是读写锁特有的方法:获得共享锁(继承同步器后自定义的方法)
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Methods relayed to outer class
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
final int getReadLockCount() {
return sharedCount(getState());
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
final int getCount() { return getState(); }
}
上面我代码做了简单的注释,也pass了一些方法,为了代码的完整性所以将Sync所有的的代码都贴了出来,整体上看着有点乱,我们理一下,首先ReentrantReadWriteLock和ReentrantLock结构基本相同,都有公平锁和非公平锁,实现方式是一样的,不同的地方是读写锁内部维护了两把锁,读锁和写锁,ReentrantLock同步状态表示被同一个线程获取的次数,ReentrantReadWriteLock同样需要有一个同步状态值来表示当前锁(读写锁),被同一个线程获取的次数,在读写锁中如果在一个整型变量中维护多种状态,就需要“按位切割使用”这个变量,它的高16位用来记录读状态,低16位用来记录写状态:
我们来看看具体是如何操作的:
写锁:
当重进入时候仅需加1,当释放的时候减1,获取当前状态的时候进行与(&)操作(0000000000000000 1111111111111111)将高的16位置为0即可获得当前的写状态。
读锁:
当重进入的时候仅需加1<<16,释放的时候减去1>>>16(无符号补零右移16位),获取当前状态时整体右移16位(左边补零)。
写所得获取与释放,在读写锁中,同样是重写了同步器的tryAcquire方法,和ReentrantLock不同的是在获取之前需要判定一下读锁是否存在,如果读锁存在那么获取失败!写锁的释放操作和ReentrantLock基本一致,无其他特别。
读锁的获取与释放,在获取读锁的时候判断当前读锁容量是否充足(因为存储16位,所以这个读锁会有一个最大值),如果充足还要判断当前状态是否大于零,如果大于零,那么无非三种情况,
①读锁状态位为0,写锁状态位不为→当前有写锁占用读锁进入等待(01)
②读锁状态位不为0,写锁状态位0→当前读锁占用读锁可以获得锁(10)
③读状态位和写状态位都为0→读锁可以获取(00)
以上的线程安全靠CAS进行保证!
读锁的每次释放是线程安全的,每次状态位减1<<16.
读写锁还有一个特性就是锁降级,指的是将写锁降级为读锁,是指当前线程获取的是写锁,先获取读锁然后释放写锁,保留读锁。
本章到此结束!还有很多分析不到的地方,望指正,不胜感激!
2018 3.29 10:34
posted on 2018-03-29 10:37 MindMrWang 阅读(…) 评论(…) 编辑 收藏