ConcurrentHashMap如何保证线程安全
HashMap的put,get,size等方法都不是线程安全的,而HashTable虽然保证了线程安全,但却是用了效率极低的方法,在put,get,size等方法上加上了synchronized,这就导致所有的并发进程都要竞争同一把锁,一个线程在进行同步操作时,其他线程都需要等待。
为了保证集合的线程安全性,Jdk提供了同步包装器,比如说Collections.synchronizedMap,不过它们都是利用粗粒度的同步方式,高并发情况下性能较差。
可以看看Collections.synchronizedMap的实现。
与并发安全有关的代码都使用了synchronized关键字进行了修饰,使用的锁mutex可以在构造方法中看到就是this。
虽然说不在是用synchronized来修饰方法,但是还是使用了this做mutex锁,治标不治本。
更加普遍的选择是利用并发包提供的线程安全容器类,它提供了各种并发容器,比如ConcurrentHashMap、CopyOnWriteArrayList。
各种线程安全队列,如ArrayBlockingQueue、SynchronousQueue。
ConcurrentHashMap分析
ConcurrentHashMap和HashMap一样,在Java8时结构上发生了很大变化。
之前的ConcurrentHashMap的实现是基于分离锁
,也就是内部进行分段,每一个段中都拥有HashEntry数组,hashcode相同的key也是按照链表的方式存储的。
HashEntry内部使用volatile修饰的value字段来保证可见性,也利用了不可变对象的机制以改进利用Unsafe提供的底层能力,比如volatile access,去直接完成部分操作,以最优化性能,毕竟Unsafe中的很多操作都是JVM intrinsic优化过的。
Segment的初始大小由DEFAULT_CONCURRENCY_LEVEL来确定,默认是16,在构造方法中可以自定义大小,不过必须是2的幂,如果输入7,会自动创建8。
对于get方法来说,只需要保证可见性,无需其他同步操作。
关键是put方法,首先是通过二次哈希避免哈希冲突,然后以Unsafe调用方式,直接获取相应的Segment,然后进行线程安全的put操作。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保证数据的分散性,避免哈希冲突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
put的核心实现如下。
fnal V put(K key, int hash, V value, boolean onlyIfAbsent) {
// scanAndLockForPut会去查找是否有key相同Node
// 无论如何,确保获取锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> frs = entryAt(tab, index);
for (HashEntry<K,V> e = frs;;) {
if (e != null) {
K k;
// 更新已有value...
} else {
// 放置HashEntry到特定位置,如果超过阈值,进行rehash
// ...
}
}
} fnally {
unlock();
}
return oldValue;
}
首先也是通过 Key 的 Hash 定位到具体的 Segment,在 put 之前会进行一次扩容校验。这里比 HashMap 要好的一点是:HashMap 是插入元素之后再看是否需要扩容,有可能扩容之后后续就没有插入就浪费了本次扩容(扩容非常消耗性能)。而 ConcurrentHashMap 不一样,它是在将数据插入之前检查是否需要扩容,之后再做插入操作。
ConcurrentHashMap会获取再入锁,以保证数据一致性,Segment本身就是基于ReentrantLock的扩展实现,所以,在并发修改期间,相应Segment是被锁定的。
在最初阶段,进行重复性的扫描,以确定相应key值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突
是ConcurrentHashMap的常见技巧。
另外一个Map的size方法同样需要关注,它的实现涉及分离锁的一个副作用。
试想,如果不进行同步,简单的计算所有Segment的总值,可能会因为并发put,导致结果不准确,但是直接锁定所有Segment进行计算,就会变得非常昂贵。
所以,ConcurrentHashMap的实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2),来试图获得可靠值。如果没有监控到发生变化(通过对
比Segment.modCount),就直接返回,否则获取锁进行操作。
每个Segment都有一个volatile修饰的全局变量count,求整个 ConcurrentHashMap的size时很明显就是将所有的count累加即可。但是volatile修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。
通过尝试两次将count累加,如果容器的count发生了变化再加锁来统计size,可以有效的避免加锁的问题。
以上是JDK1.7及之前的ConcurrentHashMap的结构,在1.8之后,ConcurrentHashMap结构发生了很大变化,由之前的分段锁变为了CAS+synchronized。
我在网上找到了一幅结构图。
总体结构上,它的内部存储和HashMap结构非常相似,同样是大的桶(bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要更细致一些。
其内部仍然有Segment定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。
因为不再使用Segment,初始化操作大大简化,修改为lazy-load形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。
将1.7中存放数据的HashEntry改为Node,但作用都是相同的。其中的val next 都用了volatile修饰,保证了可见性。
satic class Node<K,V> implements Map.Entry<K,V> {
fnal int hash;
fnal K key;
volatile V val;
volatile Node<K,V> next;
// …
}
在Node中,key被定义为了final,因为照常理来说,key是不会改变了,而是value会发生改变,因此在val中添加了volatile来修饰。
来看看它的put操作。
fnal V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 利用CAS去进行无锁线程安全操作,如果bin是空的
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // 不加锁,进行检查
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
// 细粒度的同步修改操作...
}
}
// Bin超过阈值,进行树化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
具体流程如下:
- 根据key计算出hashcode 。
- 判断是否需要进行初始化initTable。
- f即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
- 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
- 如果都不满足,则利用 synchronized 锁写入数据。
- 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
可以发现1.8以后的锁的颗粒度,是加在链表头上的,这是一个很重要的突破。
1.8中不依赖与segment加锁,segment数量与桶数量一致。
首先判断容器是否为空,为空则进行初始化利用volatile的sizeCtl作为互斥手段,如果发现竞争性的初始化,就暂停在那里,等待条件恢复,否则利用CAS设置排他标志(U.compareAndSwapInt(this, SIZECTL, sc, -1)),否则重试
对key hash计算得到该key存放的桶位置,判断该桶是否为空,为空则利用CAS设置新节点,否则使用synchronize加锁,遍历桶中数据,替换或新增加点到桶中。最后判断是否需要转为红黑树,转换之前判断是否需要扩容。
最后来看看1.8 ConcurrentHashMap的size方法实现。
在sumCount方法中使用到了CounterCell,对于CounterCell的操作,是基于java.util.concurrent.atomic.LongAdder进行的,是一种JVM利用空间换取更高效率的方法,利用了Striped64内部的复杂逻辑。
// 一种用于分配计数的填充单元。改编自LongAdder和Striped64。请查看他们的内部文档进行解释。
@sun.misc.Contended
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}