其实ConcurrentHashMap我自己已经看过很多遍了,但是今天在面试阿里的时候自己在描述ConcurrentHashMap发现自己根本讲不清楚什么是ConcurrentHashMap,以及里面是怎么实现的,搞的我突然发现自己什么都不懂,所以我想要再次的来分析一下这个源码,完全理解ConcurrentHashMap,而不是以为自己懂了,实际上自己不懂。

首先我们看一下put方法,put方法会调用到putVal方法上面。

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode());
  4. //如果put进去的是个链表,这个参数表示链表的大小
  5. int binCount = 0;
  6. for (Node<K,V>[] tab = table;;) {
  7. Node<K,V> f; int n, i, fh;
  8. if (tab == null || (n = tab.length) == 0)
  9. //初始化链表
  10. tab = initTable();
  11. //如果这个槽位没有数据
  12. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  13. //使用CAS将这个新的node设置到hash桶里面去
  14. if (casTabAt(tab, i, null,
  15. new Node<K,V>(hash, key, value, null)))
  16. break; // no lock when adding to empty bin
  17. }
  18. //帮助迁移
  19. else if ((fh = f.hash) == MOVED)
  20. tab = helpTransfer(tab, f);
  21. else {
  22. //获取锁
  23. V oldVal = null;
  24. synchronized (f) {
  25. //双重检查锁
  26. if (tabAt(tab, i) == f) {
  27. //如果hash值大于等于0,那么代表这个节点里的数据是链表
  28. if (fh >= 0) {
  29. binCount = 1;
  30. //每次遍历完后binCount加1,表示链表长度
  31. for (Node<K,V> e = f;; ++binCount) {
  32. K ek;
  33. //如果hash值和key值都相同,那么覆盖,break结束循环
  34. if (e.hash == hash &&
  35. ((ek = e.key) == key ||
  36. (ek != null && key.equals(ek)))) {
  37. oldVal = e.val;
  38. if (!onlyIfAbsent)
  39. e.val = value;
  40. break;
  41. }
  42. //下一个节点为null,说明遍历到尾节点了,那么直接在尾节点设值一个新的值
  43. Node<K,V> pred = e;
  44. if ((e = e.next) == null) {
  45. pred.next = new Node<K,V>(hash, key,
  46. value, null);
  47. break;
  48. }
  49. }
  50. }
  51. //如果是红黑树
  52. else if (f instanceof TreeBin) {
  53. Node<K,V> p;
  54. binCount = 2;
  55. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  56. value)) != null) {
  57. oldVal = p.val;
  58. if (!onlyIfAbsent)
  59. p.val = value;
  60. }
  61. }
  62. }
  63. }
  64. if (binCount != 0) {
  65. if (binCount >= TREEIFY_THRESHOLD)
  66. //如果链表个数大于8,那么就调用这个方法
  67. treeifyBin(tab, i);
  68. if (oldVal != null)
  69. return oldVal;
  70. break;
  71. }
  72. }
  73. }
  74. addCount(1L, binCount);
  75. return null;
  76. }

解释一下上面的源码做了什么:

  1. 首先做一下判断,不允许key和value中任意一个为空,否则抛出异常
  2. 计算key的hash值,然后遍历table数组
  3. 如果table数组为null或为空,那么就调用initTable做初始化
  4. 为了保证可见性,会使用tab去table数组里获取数据,如果没有数据,那么用casTabAt通过CAS将新Node设置到table数组里。(注:这里也体现了和hashmap不一样的地方,hashmap直接通过数据拿就好了, 这个获取数据和设值都要保证可见性和线程安全性)
  5. 如果当前槽位所对应的hash值是MOVED,说明当前的table正在扩容迁移节点,那么就调用helpTransfer帮助迁移
  6. 走到这里,说明这个槽位里面的元素不止一个,有很多个,所以给头节点加上锁
  7. 如果当前的hash所对应的的槽位不是空的,并且hash值大于等于0,那么就说明这个槽位里面的对象是一个链表,那么就遍历链表
    1. 如果所遍历的链表里面有元素的hash值并且key和当前要插入的数据的是一样的,那么就覆盖原来的值
    2. 如果遍历到最后的节点都没有元素和要插入的值key是一样的,那么就新建一个Node节点,插入到链表的最后
    3. 每遍历一个节点就把binCount+1
  8. 如果当前的节点是TreeBin,那么说明该槽位里面的数据是红黑树,那么调用相应方法插入数据
  9. 最后如果binCount已经大于或等于8了,那么就调用treeifyBin

接下来我们先看initTable 方法,再看treeifyBin和helpTransfer

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. //一开始的时候sizeCtl为0
  5. if ((sc = sizeCtl) < 0)
  6. Thread.yield(); // lost initialization race; just spin
  7. //将sizeCtl用CAS设置成-1
  8. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  9. try {
  10. if ((tab = table) == null || tab.length == 0) {
  11. //因为sc一开始为0,所以n取DEFAULT_CAPACITY为16
  12. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  13. @SuppressWarnings("unchecked")
  14. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  15. //将table赋值为大小为16的Node数组
  16. table = tab = nt;
  17. //将sc的设置为总容量的75%,如果 n 为 16 的话,那么这里 sc = 12
  18. sc = n - (n >>> 2);
  19. }
  20. } finally {
  21. //最后将sizeCtl设置为sc的值
  22. sizeCtl = sc;
  23. }
  24. break;
  25. }
  26. }
  27. return tab;
  28. }

这个方法里面初始化了一个很重要的变量sizeCtl,初始值为总容量的75%,table初始化为一个容量为16的数组

下面我们在看看treeifyBin方法

  1. private final void treeifyBin(Node<K,V>[] tab, int index) {
  2. Node<K,V> b; int n, sc;
  3. if (tab != null) {
  4. //如果数据的长度小于64,那么调用tryPresize进行扩容
  5. if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
  6. tryPresize(n << 1);
  7. //如果这个槽位里面的元素是链表
  8. else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
  9. //给链表头加上锁
  10. synchronized (b) {
  11. if (tabAt(tab, index) == b) {
  12. TreeNode<K,V> hd = null, tl = null;
  13. //遍历链表,然后初始化红黑树对象
  14. for (Node<K,V> e = b; e != null; e = e.next) {
  15. TreeNode<K,V> p =
  16. new TreeNode<K,V>(e.hash, e.key, e.val,
  17. null, null);
  18. if ((p.prev = tl) == null)
  19. hd = p;
  20. else
  21. tl.next = p;
  22. tl = p;
  23. }
  24. //给tab槽位为index的元素设置新的对象
  25. setTabAt(tab, index, new TreeBin<K,V>(hd));
  26. }
  27. }
  28. }
  29. }
  30. }

treeifyBin这个方法里面并不是只是将链表转化为红黑树,而是当tab的长度大于64的时候才会将链表转成红黑树,否则的话,会调用tryPresize方法。

然后我们进入到tryPresize方法里面看看,tryPresize传入的参数是当前tab数组长度的两倍。

  1. private final void tryPresize(int size) {
  2. //原本传进来的size已经是两倍了,这里会再往上取最近的 2 的 n 次方
  3. int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
  4. tableSizeFor(size + (size >>> 1) + 1);
  5. int sc;
  6. while ((sc = sizeCtl) >= 0) {
  7. Node<K,V>[] tab = table; int n;
  8. // 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
  9. if (tab == null || (n = tab.length) == 0) {
  10. n = (sc > c) ? sc : c;
  11. if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  12. try {
  13. if (table == tab) {
  14. @SuppressWarnings("unchecked")
  15. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  16. table = nt;
  17. sc = n - (n >>> 2);
  18. }
  19. } finally {
  20. sizeCtl = sc;
  21. }
  22. }
  23. }
  24. else if (c <= sc || n >= MAXIMUM_CAPACITY)
  25. break;
  26. else if (tab == table) {
  27. int rs = resizeStamp(n);
  28. //一开始进来的时候sc是大于0的
  29. if (sc < 0) {
  30. Node<K,V>[] nt;
  31. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  32. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  33. transferIndex <= 0)
  34. break;
  35. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  36. transfer(tab, nt);
  37. }
  38. //将SIZECTL设置为一个很大的复数
  39. else if (U.compareAndSwapInt(this, SIZECTL, sc,
  40. (rs << RESIZE_STAMP_SHIFT) + 2))
  41. transfer(tab, null);
  42. }
  43. }
  44. }

这个方法里面,会对tab数据进行校验,如果没有初始化的话会重新进行初始化大小,如果是第一次进来的话会将SIZECTL设置成一个很大的复数,然后调用transfer方法,传如当前的tab数据和null。

接着我们来看transfer方法,这个方法比较长,主要的扩容和转移节点都在这个方法里面实现,我们将这个长方法分成代码块,一步步分析:

  1. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2. //如果当前tab数组长度为16
  3. int n = tab.length, stride;
  4. //那么(n >>> 3) / NCPU = 0 小于MIN_TRANSFER_STRIDE
  5. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  6. //将stride设置为 16
  7. stride = MIN_TRANSFER_STRIDE; // subdivide range
  8. if (nextTab == null) { // initiating
  9. try {
  10. @SuppressWarnings("unchecked")
  11. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  12. //如果n是16,那么nextTab就是一个容量为32的空数组
  13. nextTab = nt;
  14. } catch (Throwable ex) { // try to cope with OOME
  15. sizeCtl = Integer.MAX_VALUE;
  16. return;
  17. }
  18. nextTable = nextTab;
  19. //将transferIndex赋值为16
  20. transferIndex = n;
  21. }
  22. ...
  23. }

这个代码块主要是做nextTable、transferIndex 、stride的赋值操作。

  1. ...
  2. //初始化nextn为32
  3. int nextn = nextTab.length;
  4. //新建一个ForwardingNode对象,里面放入长度为32的nextTab数组
  5. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  6. boolean advance = true;
  7. boolean finishing = false;
  8. //初始化bound为0
  9. for (int i = 0, bound = 0;;) {
  10. ...
  11. }

下面的代码会全部包裹在这个for循环里面,所以我们来分析一下这个for循环里面的代码

  1. for (int i = 0, bound = 0;;) {
  2. Node<K,V> f; int fh;
  3. while (advance) {
  4. int nextIndex, nextBound;
  5. if (--i >= bound || finishing)
  6. advance = false;
  7. //将nextIndex设置为transferIndex,一开始16
  8. else if ((nextIndex = transferIndex) <= 0) {
  9. i = -1;
  10. advance = false;
  11. }
  12. //一开始的时候nextIndex是和stride相同,那么nextBound为0,TRANSFERINDEX也为0
  13. else if (U.compareAndSwapInt
  14. (this, TRANSFERINDEX, nextIndex,
  15. nextBound = (nextIndex > stride ?
  16. nextIndex - stride : 0))) {
  17. //这里bound也直接为0
  18. bound = nextBound;
  19. //i = 15
  20. i = nextIndex - 1;
  21. advance = false;
  22. }
  23. }
  24. ...
  25. }

这个方法是为了设置transferIndex这个属性,transferIndex一开始是原tab数组的长度,每次会向前移动stride大小的值,如果transferIndex减到了0或小于0,那么就设置I等于-1,i在下面的代码会说到。

  1. for (int i = 0, bound = 0;;) {
  2. ...
  3. //在上面一段代码块中,如果transferIndex已经小于等于0了,就会把i设置为-1
  4. if (i < 0 || i >= n || i + n >= nextn) {
  5. int sc;
  6. //表示迁移已经完成
  7. if (finishing) {
  8. //将nextTable置空,表示不需要迁移了
  9. nextTable = null;
  10. //将table设置为新的数组
  11. table = nextTab;
  12. //sizeCtl设置为n的 1.5倍
  13. sizeCtl = (n << 1) - (n >>> 1);
  14. return;
  15. }
  16. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  17. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  18. return;
  19. // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
  20. // 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
  21. finishing = advance = true;
  22. i = n; // recheck before commit
  23. }
  24. }
  25. ...
  26. }

这个方法是用来表示已经迁移完毕了,可以退出。

  1. for (int i = 0, bound = 0;;) {
  2. ...
  3. //如果该槽位没有元素,那么直接把tab的i槽位设置为fwd
  4. else if ((f = tabAt(tab, i)) == null)
  5. advance = casTabAt(tab, i, null, fwd);
  6. //说明这个槽位已经有其他线程迁移过了
  7. else if ((fh = f.hash) == MOVED)
  8. advance = true; // already processed
  9. //走到这里,说明tab的这个槽位里面有数据,那么我们需要获得槽位的头节点的监视器锁
  10. else {
  11. synchronized (f) {
  12. if (tabAt(tab, i) == f) {
  13. ...
  14. }
  15. }
  16. }
  17. ...
  18. }

在这个代码块中,i会从最后一个元素一个个往前移动,然后根据i这个index来判断tab里面槽位的情况。

下面的代码我们来分析监视器锁里面的内容:

  1. synchronized (f) {
  2. if (tabAt(tab, i) == f) {
  3. //fh是当前节点的hash值
  4. if (fh >= 0) {
  5. int runBit = fh & n;
  6. //lastRun设置为头节点
  7. Node<K,V> lastRun = f;
  8. // 需要将链表一分为二,
  9. // 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
  10. // lastRun 之前的节点需要进行克隆,然后分到两个链表中
  11. for (Node<K,V> p = f.next; p != null; p = p.next) {
  12. int b = p.hash & n;
  13. if (b != runBit) {
  14. runBit = b;
  15. lastRun = p;
  16. }
  17. }
  18. if (runBit == 0) {
  19. ln = lastRun;
  20. hn = null;
  21. }
  22. else {
  23. hn = lastRun;
  24. ln = null;
  25. }
  26. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  27. int ph = p.hash; K pk = p.key; V pv = p.val;
  28. if ((ph & n) == 0)
  29. ln = new Node<K,V>(ph, pk, pv, ln);
  30. else
  31. hn = new Node<K,V>(ph, pk, pv, hn);
  32. }
  33. //其中的一个链表放在新数组的位置 i
  34. setTabAt(nextTab, i, ln);
  35. //另一个链表放在新数组的位置 i+n
  36. setTabAt(nextTab, i + n, hn);
  37. //将原数组该位置处设置为 fwd,代表该位置已经处理完毕
  38. //其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
  39. setTabAt(tab, i, fwd);
  40. //advance 设置为 true,代表该位置已经迁移完毕
  41. advance = true;
  42. }
  43. //下面红黑树的迁移和上面差不多
  44. else if (f instanceof TreeBin) {
  45. ....
  46. }
  47. }
  48. }

这个方法主要是将头节点里面的链表拆分成两个链表,然后设置到新的数组中去,再给老的数组设置为fwd,表示这个节点已经迁移过了。

到这里transfer方法已经分析完毕了。
这里我再举个例子,让大家根据透彻的明白多线程之间是怎么进行迁移工作的。

  1. 我们假设stride还是默认的16,第一次进来nextTabnull,但是tab的长度为32
  2. 一开始的赋值:
  3. 1. n会设置成32,并且n只会赋值一次,代表被迁移的数组长度
  4. 2. nextTab会被设置成一个大小为64的数组,并塞入到新的ForwardingNode对象中去。
  5. 3. transferIndex会被赋值为32
  6. 进入循环:
  7. 初始化i0bound0
  8. 第一次循环:
  9. 1. 由于advance初始化为true,所以会进入到while循环中,循环出来后,transferIndex会被设置成16bound被设置成16i设置成31。这里你可能会问
  10. 2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
  11. 第二次循环:
  12. 1. --i,变为30,--i >= bound成立,并将advance设置成false
  13. 2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
  14. 。。。
  15. 第十六次循环:
  16. 1. --i,变为15,将transferIndex设置为0bound也设置为0i设置为15
  17. 2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
  18. 第三十二次循环:
  19. 1. 这个时候--i等于-1,并且(nextIndex = transferIndex) <= 0成立,那么会将i设置为-1advance设置为false
  20. 2. 会把SIZECTLCAS设置为原来的值加1,然后设置finishingtrue
  21. 第三十三次循环:
  22. 1. 由于finishingtrue,那么nextTable设置为nulltable设置为新的数组值,sizeCtl设置为旧tab的长度的1.5

版权声明:本文为luozhiyun原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/luozhiyun/p/11406557.html