多线程十二之ConcurrentHashMap1.8实现分析
简介
本文是基于JDK8分析ConcurrentHashMap的实现,在前一篇博文多线程十一之ConcurrentHashMap1.7源码分析中分析了JDK7中是如何实现满足并发且线程安全的ConcurrentHashMap:ConcurrentHashMap根据初始的并发度concurrencyLevel构建片段Segment数组,最多可以实现有concurrencyLevel个线程进行并发读写,相对于HashTable利用synchronized
关键字对每个方法做同步大大提高了效率。但是JDK7中ConcurrentHashMap的实现仍然有不足:Segment数组无法扩容,并发度受到Segment数组大小限制,只能支持N个线程的并发(N为Segment数组长度且N不可变)
JDK8中ConcurrentHashMap做了如下改进:并发的最小单位不再是JDK7中的Segment片段,而是存储键值对的节点Node,线程操作ConcurrentHashMap时锁住的是装载Node的桶,锁的粒度变得更小了,意味着并发能力进一步增强,另外引入了红黑树来解决多个节点哈希冲突造成查询效率下降的问题。
数据结构
JDK8中不再使用Segment片段锁,而是使用CAS+synchronized来保证并发时的线程安全。
源码分析
ConcurrentHashMap定义了很多常量,如下:
常量及成员变量
//Node桶数组的最大长度:2^30
private static final int MAXIMUM_CAPACITY = 1 << 30;
//Node数组默认长度,必须是2的幂次方
private static final int DEFAULT_CAPACITY = 16;
//转换为数组的最大长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//为了兼容性,Java8中没用到这个属性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//默认的HashMap加载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阈值,同一个桶中因为hash冲突的链表节点超过8个就转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
//树化最小Node数组长度
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容时单个线程推进最小步长,在扩容那里详细解释用途
private static final int MIN_TRANSFER_STRIDE = 16;
//用于对Node数组tab生成一个stamp
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,用于帮助扩容的最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//特定Node的hash值,程序中根据这些hash值判定Node类型
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//运行环境CPU数量,transfer扩容时会用到这个参数
static final int NCPU = Runtime.getRuntime().availableProcessors();
sizeCtl:控制标识符,用来控制table初始化和扩容操作的,当ConcurrentHashMap处于不同的状态阶段时sizeCtl
的值也不同,代表的含义也不同
- sizeCtl = -1,表示有线程正在进行真正的初始化操作
- sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作
- sizeCtl > 0,分两种情况:如果table数组还没有初始化,这就是初始化数组的长度;如果已经初始化过了,sizeCtl是table数组长度的0.75倍,代表扩容阈值。
- sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量
//存放键值对Node数组
transient volatile Node<K,V>[] table;
//扩容时指向新生成的数组,不扩容为null
private transient volatile Node<K,V>[] nextTable;
//用于统计哈希表中键值对个数
private transient volatile long baseCount;
//控制标识符,当ConcurrentHashMap处于初始化/扩容等不同状态,sizeCtl的值也不同,表示的意义也不同
private transient volatile int sizeCtl;
//扩容时线程把数据从老数组向转移的起点
private transient volatile int transferIndex;
构造方法
可以看到在调用构造方法创建ConcurrentHashMap对象时,只是根据传入参数计算桶数组初始长度赋值给sizeCtl
,并没有初始化table数组,延迟加载。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
//初始化时根据这个值作为桶数组table的长度
this.sizeCtl = cap;
}
内部类
Node
用于存储key-value键值对,当哈希冲突映射到同一个桶中会形成链表,Node是ConcurrentHashMap中最基础的数据结构。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//get方法会调用find方法,子类也会重写find方法
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
ForwardingNode
继承父类Node,不存储键值对,是一个标记节点,ForwardingNode的hash值固定为MOVED
(-1),当ConcurrentHashMap处于扩容阶段时会把数据从老的table数组赋值到新的扩容后的table数组,每当完成一个桶,就会在老数组的桶的位置放上ForwardingNode表示这个位置已经复制完成,如果要这个这个地方的Node键值对,就去扩容后的nextTable中去寻找。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
//寻找key为k且hash值为h的键值对节点
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
TreeBin
当链表转换为红黑树时,table数组存储的并不是红黑树的根节点,而是TreeBin节点,用来标识这里存放的是一个红黑树,不存储键值对数据,而是指向红黑树的根节点root。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
/**
* Creates bin with initial set of nodes headed by b.
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
}
TreeNode
用于构建红黑树的基础数据结构。
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
put(K key, V value)
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//校验参数合法性
if (key == null || value == null) throw new NullPointerException();
//计算键的hash值
int hash = spread(key.hashCode());
//用来桶中节点个数,判断是否需要转换为红黑树
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table桶数组还没有初始化,那么调用initTable初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//key的hash值映射到的桶为空,也就是没有存放过键值对,直接把键值对存放到这个桶里
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//桶里的节点是ForwardingNode,ConcurrentHashMap处于扩容阶段,让当前线程帮助扩容
else if ((fh = f.hash) == MOVED)
//helpTransfer在扩容里详细分析
tab = helpTransfer(tab, f);
//如果不是以上情况,说明桶中已经有元素,可能是一个链表,可能是红黑树
else {
V oldVal = null;
//把桶中元素锁住
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh是桶中节点hash值,大于零说明不是TreeBin,是链表
if (fh >= 0) {
binCount = 1;
//循环遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
//链表中有键值对的键与要插入的键相同,直接替换value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//把键值对插入链表末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//fh小于0,桶中存放的是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//检查是否需要把链表转为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//更新ConcurrentHashMap中存放键值对个数
addCount(1L, binCount);
return null;
}
initTable()
通过构造器的源码我们可以知道构造器新建ConcurrentHashMap对象时只设定了table数组的长度,没有初始化,通过initTable来实现延迟加载的。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl < 0 表示有其他线程在初始化,挂起当前线程
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//sizeCtl设为-1,由当前线程负责桶数组的初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//根据参数设置桶数组长度
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
//初始化数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sizeCtl为0.75n,达到0.75n的阈值就开始扩容
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
get(Object key)
get方法比较简单,根据key
获取保存的键值对的value,主要有以下几步:
- 计算key的hash值
- 根据hash值映射到桶数组对应的桶,映射:
(n - 1) & h)
- 根据桶中元素是否是链表寻找对应的key
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算key的hash值
int h = spread(key.hashCode());
//判断桶数组不为空并找到key对应的桶
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//桶中的第一个元素就是要找的键
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//桶中元素是ForWardingNode或TreeBin
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//继续在桶中的单链表寻找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
size()
size方法用来统计ConcurrentHashMap中存储的键值对个数
public int size() {
//调用sumCount方法
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
sumCount()
可以看到sumCount就是统计baseCount
和CountCell
数组的和,为什么要这样统计呢?在putVal方法的末尾会调用addCount方法更新键值对的个数,去看看addCount方法。
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
addCount(long x, int check)更新baseCount部分
addCount方法分为两部分,一部分是更新键值对数量,另一部分是检查是否需要扩容。更新键值对数量先通过CAS更新baseCount,如果CAS更新失败,说明线程并发竞争激烈,就通过CAS加死循环把要更新的值加到CounterCell数组中,所以键值对的总数是baseCount以及遍历CounterCell的和。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//CAS更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//CAS更新CountCell
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//检查是否需要扩容
......
}
mappingCount()
我们在回头看一下size方法:因为n是long类型的,而size方法的返回值是int类型,所以会比较n是否超出了int范围:
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
如果超出了范围就返回Integer.MAX_VALUE
。为什么键值对的数量会超出int范围呢?虽然ConcurrentHashMap中定义了常量MAXIMUM_CAPACITY
限定了table数组的最大长度(2^30),但是由于hash冲突的原因一个桶中可能存储多个键值对,数据量大的情境下是有可能超过int范围,所以JDK建议使用mappingCount方法,实现与size方法一致返回值为long类型,不受int范围限制。
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
扩容
addCount(long x, int check)检查扩容部分
ConcurrentHashMap的扩容就是通过transfer,transfer方法可以说将线程并发利用到极致。在详细分析transfer实现之前我们得知道是什么方法触发了ConcurrentHashMap的扩容动作呢?答案就是之前分析了一半的addCount方法,主要有以下几个步骤:
- 检查当前当前ConcurrentHashMap中存储的键值对个数是否超过阈值sizeCtl:
s>=(long)(sc=sizeCtl)
- 检查是否有别的线程已经开始给ConcurrentHashMap扩容,如果已经开始:
- 检查是否完毕,扩容完毕,跳出检查
- 还在扩容中,当前线程加入扩容,sizeCtl加一。
- 没有别的线程开始扩容,当前线程是开启扩容的第一个线程,CAS设置sizeCtl小于0,表示扩容已经开始
从上面的分析中可以看到在扩容的流程中sizeCtl是一个很重要的状态量,sizeCtl可以表示扩容是否开始,以及参与扩容线程的个数,下面分析sizeCtl在扩容中的变换:
private final void addCount(long x, int check) {
//更新baseCount
......
//检查是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//键值对个数超过阈值sizeCtl
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//根据当前当前桶数组长度生成扩容戳
int rs = resizeStamp(n);
//sc<0,扩容已经开始
if (sc < 0) {
//扩容完毕
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//当前线程假如扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是开始扩容的第一个线程
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
sizeCtl
sizeCtl与扩容在扩容流程中有重要作用,如果线程是第一个开始扩容的线程的话,会把sizeCtl以CAS的方式设置为(rs << RESIZE_STAMP_SHIFT) + 2)
,这个rs是由resizeStamp方法生成的:
resizeStamp是根据传入的桶数组长度生成一个扩容戳(resizeStamp),我们知道桶数组长度,也就是传入的n是2的幂次方,Integer.numberOfLeadingZeros(n)
返回n的最高非零位前面的0的个数,再与1左移15位的结果异或得到生成的戳,如果我们传入默认桶数组长度16,返回的结果为32795,计算过程如下:
可以看到当桶数组长度为16的ConcurrentHashMap开始扩容时,resizeStamp的返回值是0000 0000 0000 0000 1000 0000 0001 1011
,也就是32795。
当第一个线程开始扩容时,会设置sizeCtl的值为(rs << RESIZE_STAMP_SHIFT) + 2)
,如下:
此时sizeCtl
的值二进制形式为1000 0000 0001 1011 0000 0000 0000 0010
,sizeCtl
分为两部分,它的高16位由risizeCtl(n)的结果组成,如果有n个线程加入扩容,低16位的值为1+n
。由于此时sizeCtl的符号位为1,所以处于扩容状态sizeCtl
的值总是小于0的负数。
static final int resizeStamp(int n) {
//numberOfLeadingZeros:该方法的作用是返回无符号整型i的最高非零位前面的0的个数,包括符号位在内
//1 << (RESIZE_STAMP_BITS - 1):把1左移(RESIZE_STAMP_BITS - 1)位,也就是15位
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
transfer(Node<K,V>[ ] tab, Node<K,V>[ ] nextTab)
transfer是扩容的核心方法,也可以说是ConcurrentHashMap中设计最精巧的方法了。传入两个参数,tab
指向扩容前的数组,nextTab
指向数据要转移过去的新数组,第一个开始扩容的线程调用transfer方法传入的nextTab
参数为空,后续扩容线程传入的是成员变量nextTable
。
transfer方法开始把键值对从扩容前的数组转移到新数组,这个方法并不是同步的,不是说开始扩容的线程负责把转移所有的数据,并发大师Doug Lea每个参与扩容的线程负责转移老数组的一部分数据,转移完了之后如果扩容还没有结束再取一段数据,转移数据的过程是并发的,最多可以有MAX_RESIZERS
(2^16)个线程同时参与,大致过程如下:
从上图可以看出,线程转移数据是从后往前开始的,当转移过程发现index<0
说明扩容结束。
具体ConcurrentHashMap是怎么迁移数据,下面以桶中的链表为例说明。如下图所示8号桶中有6个链表节点,扩容线程会把这些节点分为两部分:节点hash值与数组长度做与运算结果为0的节点放到低位桶,结果为1的放到低位桶,然后把低位桶放到新数组索引为8的桶中,把高位桶放到索引为24(i+n)的桶中。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//确定每个线程一次扩容负责处理多少个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//扩容的新桶数组nextTable若为空,构建数组
if (nextTab == null) { // initiating
try {
//新数组长度是现在数组长度两倍,N=2n
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//更新成员变量
nextTable = nextTab;
//初始化transferIndex,从旧数组的末尾开始
transferIndex = n;
}
int nextn = nextTab.length;
//当线程完成一个桶的数据转移,就在旧数组桶的位置放上ForwardingNode标记节点,指向转移后的新节点
//用来标记这个桶数据已经迁移完成
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,
//那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
//扩容是否完成标志位
boolean finishing = false; // to ensure sweep before committing nextTab
//在这个死循环内线程负责转移期间内桶数组数据
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//advance为true,线程向前推进,使得i--,i就是要处理的桶的索引,bound表示区间的下界
//transferIndex是区间的上界,i<bound说明线程处理完这个区间数据
while (advance) {
int nextIndex, nextBound;
//i必然大于bound
if (--i >= bound || finishing)
advance = false;
//transferIndex<=0 扩容完毕
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//检查扩容是否完成或当前线程是否完成任务
//i<0 扩容完成
//i >= tab.length或i + tab.length >= nextTable.length 都不可能发生
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//finishing为true,说明扩容已经完成
if (finishing) {
//把nextTable置为空
nextTable = null;
//table指向扩容后的桶数组
table = nextTab;
//设置下次扩容阈值sizeCtl,这里的n是扩容前旧数组的长度
//sizeCtl=2n-0.5n=1.5n 新数组长度N=2n。加载因子:1.5n/2n=0.75
//从这里可以看出即使构造方法传入的加载因子不是0.75也不影响
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//finishng不为true,尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。
//也就是说,扩容结束了。不相等说明没有结束,但是当前线程的扩容任务完成了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//相等,说明扩容已经完成
finishing = advance = true;
i = n; // recheck before commit
}
}
//正式开始迁移数据到新的桶数组
//如果i对应的桶为null,直接把标志节点放到桶中
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//根据hash值判断i对应的桶中节点为ForwardingNode,说明这个桶已经被处理了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//桶中节点可能是链表节点Node,也可能是红黑树节点TreeBin
else {
//把桶中节点锁住,防止插入或删除
synchronized (f) {
if (tabAt(tab, i) == f) {
//ln:低位桶 hn:高位桶
Node<K,V> ln, hn;
//桶中节点hash值大于0,是链表类型节点Node
if (fh >= 0) {
//runBit由节点hash值与旧数组的长度做与运算,由于数组长度是2的幂次方,
//所以runBit要么为1,要么为0
int runBit = fh & n;
//从链表尾部到laseRun节点,runBit值相同
Node<K,V> lastRun = f;
//循环遍历找lastRun节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//runBit为0,从lastRun到链表尾节点都放到ln低位桶
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//否则放到高位桶
else {
hn = lastRun;
ln = null;
}
//从链表头部遍历到lastRun节点,节点的runBit为0放到地位桶,为1放到高位桶
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
//头插法
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//插入到新桶数组中
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//在旧数组的桶中放入标记节点
setTabAt(tab, i, fwd);
advance = true;
}
//桶中节点为红黑树,处理逻辑与上面相似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
helpTransfer(Node<K,V>[ ] tab, Node<K,V> f)
除了addCount
会在检查是否需要扩容的时候触发transfer
方法,当putVal
方法添加的键值对映射到对应的桶中节点类型为ForwardingNode
时,也会触发扩容,也就是如下代码:
会调用helpTransfer
方法帮助扩容,下面重点分析程序中的这一段判断扩容是否完成的if条件:
-
sc >>>RESIZE_STAMP_SHIFT)!=rs
在我们分析sizeCtl
时知道:当第一个开始给ConcurrentHashMap扩容时,会设置sizeCtl
的值为rs << RESIZE_STAMP_SHIFT) + 2
,从transfer
方法可以知道,当扩容结束时会设置sizeCtl = (n << 1) - (n >>> 1)
重新变为正数。所以当sc >>>RESIZE_STAMP_SHIFT)!=rs
说明此时sizeCtl
的值为(n << 1) - (n >>> 1)
,扩容已经结束了。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//检查扩容是否结束
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//扩容完毕
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//当前线程加入扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
总结
写到这里关于JDK8中的ConcurrentHashMap已经学习完毕,花了差不多5天的时间。ConcurrentHashMap作为多线程环境下的HashMap,向我们展示了如何在多线程环境下保持线程安全、提高并发效率的技巧:即减小锁的粒度、利用CAS无锁算法,尤其是其中的transfer
方法,是值得我们再三学习和挖掘的。