- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章JAVA核心知识之ConcurrentHashMap源码分析由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
ConcurrentHashMap是基于Hash表的Map接口实现,键与值均不允许为NULL,他是一个线程安全的Map。同时他也是一个无序的Map,不同时间进行遍历可能会得到不同的顺序。在JDK1.8之前,ConcurrentHashMap使用分段锁以在保证线程安全的同时获得更大的效率。JDK1.8开始舍弃了分段锁,使用自旋+CAS+sync关键字来实现同步。本文所述便是基于JDK1.8。 ConcurrentHashMap与HashMap有共同之处,一些HashMap的基本概念与实现,本文不再赘述.
1
2
|
public
class
ConcurrentHashMap<K,V>
extends
AbstractMap<K,V>
implements
ConcurrentMap<K,V>, Serializable
|
可以看到ConcurrentHashMap继承了AbstractMap及ConcurrentMap抽象类,并实现了Serializable接口,这说明ConcurrentHashMap是一个线程安全的标准Map,且允许序列化。与HashMap不同是ConcurrentHashMap不允许Clone.
ConcurrentHashMap同样是采用懒初始化的方式,有实际元素时才进行容器的初始化。因此其构造方法与HashMap相差无几.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 无参构造
public
ConcurrentHashMap() {
}
// 带有初始容量的构造
public
ConcurrentHashMap(
int
initialCapacity) {
if
(initialCapacity <
0
)
throw
new
IllegalArgumentException();
int
cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>>
1
)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>>
1
) +
1
));
//求最小二次幂
this
.sizeCtl = cap;
// sizeCtl 暂存容量
}
// 带有初始集合的构造
public
ConcurrentHashMap(Map<?
extends
K, ?
extends
V> m) {
this
.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 指定初始容量和装载因子的构造
public
ConcurrentHashMap(
int
initialCapacity,
float
loadFactor) {
this
(initialCapacity, loadFactor,
1
);
}
|
以上构造并没有什么特别的,逻辑也相对简单,不再详细解析,感兴趣的话可以到前言提到的HashMap篇了解。 除此之外,ConcurrentHashMap拥有另外一个值得注意的构造方法: 指定初始容量,装载因子以及并发级别的构造方法: 源码:
1
2
3
4
5
6
7
8
9
10
11
|
public
ConcurrentHashMap(
int
initialCapacity,
float
loadFactor,
int
concurrencyLevel) {
if
(!(loadFactor >
0
.0f) || initialCapacity <
0
|| concurrencyLevel <=
0
)
// 基础校验
throw
new
IllegalArgumentException();
if
(initialCapacity < concurrencyLevel)
// 取初始容量和并发级别的最大值
initialCapacity = concurrencyLevel;
long
size = (
long
)(
1.0
+ (
long
)initialCapacity / loadFactor);
// 指定了装载因子,因此就用初始容量和装载因子计算出实际要的初始容量
int
cap = (size >= (
long
)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((
int
)size);
// 确定计算出初始容量,
this
.sizeCtl = cap;
}
|
解析: 除了初始容量与装载因子外,此构造方法还有一个并发级别concurrencyLevel的参数。在jdk1.7时,并发级别作为分段锁的标准进行分段。但是jdk1.8开始舍弃了分段锁,为了版本兼容,此构造方法依然存在,但是concurrencyLevel也不再具有其分段依据的意义,而是作为初始容量的定义依据.
ConcurrentHashMap采用懒初始化的方式,在第一次putAvl时如果容器为空,则会调用initTable()进行容器的初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private
final
Node<K,V>[] initTable() {
Node<K,V>[] tab;
int
sc;
while
((tab = table) ==
null
|| tab.length ==
0
) {
// tab为空时就一直循环
if
((sc = sizeCtl) <
0
)
// sc小于0说明正在初始化或者正在复制,放弃CPU等下一次
Thread.yield();
// lost initialization race; just spin
else
if
(U.compareAndSwapInt(
this
, SIZECTL, sc, -
1
)) {
// 否则的话就用CAS把sizectl设置成-1,标识正在初始化,且成功的话
try
{
// CAS之后再判断一下,这是为了避免并发,比如上面判断了tab为空,然后另一个线程做了初始化操作,结束时sc被设置为扩容阈值,然后继续(sc = sizeCtl) < 0,这时cs还是大于0,所以还是能走进来的,所以这里再判断一下
if
((tab = table) ==
null
|| tab.length ==
0
) {
int
n = (sc >
0
) ? sc : DEFAULT_CAPACITY;
// sc最初始是存在的初始容量的
@SuppressWarnings
(
"unchecked"
)
Node<K,V>[] nt = (Node<K,V>[])
new
Node<?,?>[n];
// 建立一个容器
table = tab = nt;
// 然后复制到table属性上
sc = n - (n >>>
2
);
// sc等于 n>>>2就是n/4就是0.25n sc = 0.75n,相当于设置扩容阈值
}
}
finally
{
sizeCtl = sc;
// sizeCtl赋值成扩容阈值
}
break
;
// 只要走到这里面,那么说明一定已经初始化结束了,无论初始化是自己做的还是其他线程做的,然后就跳出
}
}
return
tab;
// 返回新的tab
}
|
解析: 容器初始化完成之前不断的进行循环。这是因为ConcurrentHashMap是一个支持并发的Map,可能同时会有多个线程进入initTable方法,但是只有一个线程执行初始化操作,那么剩下的线程就需要等待初始化完成再跳出initTable方法,以满足接下来的putVal操作。 为了保证只有一个线程执行初始化操作,使用sizeCtl来作为标识,sizeCtl为-1时即说明当前已有线程正在初始化,则放弃CPU继续循环。sizeCtl不为负数时,则使用CAS(通过Unsafe+偏移量的方式实现)将sizeCtl置为-1,如果当前线程成功的话则进入实际的扩容操作。 通过CAS锁定成功后再次判断容器是否为空,这是为了避免并发,比如上面判断了tab为空,然后另一个线程做了初始化操作,结束时sc被设置为扩容阈值,然后继续(sc = sizeCtl) < 0,这时cs还是大于0,所以还是能走进来的,所以这里再判断一下。 如果sc(sizeCtl原值)大于0,则以sc做为初始容量。这个在构造方法篇提到过,对于有初始容量要求的构造,会以sizeCtl暂存初始容量。否则的话就取默认的初始容量DEFAULT_CAPACITY(16)。 接下来就建立目标容量的容器并赋值给容器属性table,计算该容量下的扩容阈值赋值给sizeCtl 。sizeCtl的赋值放到finally里面的原因是因为无论容器创建成功还是失败,都需要放开以sizeCtl为负值作为判断条件的锁,以保证在本线程创建失败的情况其它线程能继续竞争锁继续进行容器创建的工作。 至此,容器的初始化便完成了.
ConcurrentHashMap新数据载入主要通过putVal实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
final
V putVal(K key, V value,
boolean
onlyIfAbsent) {
if
(key ==
null
|| value ==
null
)
throw
new
NullPointerException();
int
hash = spread(key.hashCode());
int
binCount =
0
;
for
(Node<K,V>[] tab = table;;) {
// 会一直循环,直到break
Node<K,V> f;
int
n, i, fh;
if
(tab ==
null
|| (n = tab.length) ==
0
)
tab = initTable();
// 初始化table
else
if
((f = tabAt(tab, i = (n -
1
) & hash)) ==
null
) {
// unsafe获取i处的数据,如果为null
if
(casTabAt(tab, i,
null
,
new
Node<K,V>(hash, key, value,
null
)))
// cas操作,如果i处为null就构造新元素,成功就break 失败就继续循环
break
;
// no lock when adding to empty bin
}
else
if
((fh = f.hash) == MOVED)
// f处的元素hash是-1 -1意味着这是一个迁移节点
tab = helpTransfer(tab, f);
// 辅助迁移开始
else
{
V oldVal =
null
;
synchronized
(f) {
// 用f进行同步
if
(tabAt(tab, i) == f) {
// 这里要保证i处还是f,而不是已经被其它元素并发占据了
if
(fh >=
0
) {
// f处的hash要大于等于0,这是为什么呢?除了MOVED还会有其它负值吗
binCount =
1
;
for
(Node<K,V> e = f;; ++binCount) {
// 从e往后找,并且记录元素数量
K ek;
if
(e.hash == hash &&
((ek = e.key) == key ||
(ek !=
null
&& key.equals(ek)))) {
// 一系列判断Key是否相等,如果相等
oldVal = e.val;
// 记录旧值
if
(!onlyIfAbsent)
// 不是仅允许新增的话
e.val = value;
// 记录新值
break
;
// 成功的话就跳出大循环
}
Node<K,V> pred = e;
if
((e = e.next) ==
null
) {
// e向后移动, 如果移动之后是nul,就是在末尾的话
pred.next =
new
Node<K,V>(hash, key,
value,
null
);
// 建立一个新节点
break
;
// 成功的话就跳出大循环
}
}
}
else
if
(f
instanceof
TreeBin) {
// 如果是一个树化节点
Node<K,V> p;
binCount =
2
;
// 树化的节点的话固定元素数量为2,这是一个相对特殊的值,即会扩容又不会重复树化
if
((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) !=
null
) {
// 往树里面新增元素,如果存在就返回那个节点,不存在就是新增节点
oldVal = p.val;
// 记录旧值
if
(!onlyIfAbsent)
// 不是仅允许新增的话
p.val = value;
// 记录新值
}
}
}
}
if
(binCount !=
0
) {
// 如果元素数不为0的话
if
(binCount >= TREEIFY_THRESHOLD)
// 判断是否到了树化阈值
treeifyBin(tab, i);
// 到了的话进行树化
if
(oldVal !=
null
)
// 覆盖操作直接返回旧值不会执行addCount
return
oldVal;
break
;
// binCount != 0才跳出大循环,为0是tabAt(tab, i) != f导致的,即i处已经被其他线程覆盖了
}
}
}
addCount(1L, binCount);
//
return
null
;
}
|
解析: 完成为空校验后,通过spread方法来计算出hash以确定下标位置,spread的计算方式为(h ^ (h >>> 16)) & HASH_BITS,HASH_BITS的值为0x7fffffff,描述出来就是将hashCode的高16位和低16位做异或操作,并保证最高位符号位为0(结果是一个正数),注解表示这样做的原因是为了使数据更加分散,尽可能的避免hash冲突。 如果容器tab为空或者长度为0说明容器未初始化,那么就调用initTable进行容器初始化。 否则的话对hash与现容量进行与运算得出数据应处的下标,判断此下标所在处节点是否为空,为空说明没有元素,直接创建一个新节点作为头元素放进去,这一步通过CAS操作实现,避免并发情况下另外的线程先一步完成头节点创建操作。 如果下标所在处节点不为空,说明该处已经有元素了,此时判断头节点的hash是否为MOVED,MOVED的值为-1。上面提到过,正常元素通过spread方法计算出来的hash值都会使正数。此处-1为一个特殊值,意味着此节点正在进行扩容迁移工作,那么此时就调用8 辅助扩容-helpTransfer方法进行辅助迁移工作。 如果节点hash不为MOVED,意味着这是一个正常节点,就执行元素载入工作,使用synchronized关键字实现同步, 同步块内开始还要使用tabAt(tab, i) == f判断进入同步后i处的节点还是原节点,接下来就是元素的载入工作了,整体和HashMap的流程是相差无几的,感兴趣的话可以去文首提到的解析HashMap的文章了解一下。synchronized块将元素载入节点后,接着会对binCount进行判断,binCount在节点还处于链表模式下的情况下记录节点在新元素载入后的元素总数量,此处判断binCount达到TREEIFY_THRESHOLD(8) 树化阈值后,就会对该节点进行树化操作。树化操作依然是通过synchronized来完成的,这里不做过多延伸。 以上整个新元素载入操作都是一个自旋的过程,一直到新元素载入成功后通过break跳出自旋过程。 新元素载入节点后,需要对count实时维护,count通过 6 维护与启动扩容-addCount 方法完成同步维护工作.
ConcurrentHashMap通过addCount方法实时维护内部元素数量,并在达到扩容阈值的情况下启动扩容操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
private
final
void
addCount(
long
x,
int
check) {
CounterCell[] as;
long
b, s;
if
((as = counterCells) !=
null
||
// counterCells不为空,如果没经历过并发肯定是空的,这个判断意思是维护count的过程中已经经历过并发,已经处于不使用baseCount维护的阶段了
!U.compareAndSwapLong(
this
, BASECOUNT, b = baseCount, s = b + x)) {
//这个比较唯一会出现false的情况就是b = baseCount执行之后,还没进行cas时,baseCount又被改了。意思是如果没有并发就用baseCount维护,并发导致baseCount维护失败就进入counterCells维护
CounterCell a;
long
v;
int
m;
boolean
uncontended =
true
;
// 记录是否遭遇了并发
if
(as ==
null
|| (m = as.length -
1
) <
0
||
// as等于null意味着baseCount维护遭遇并发了,且是初次(非初次counterCells不会为null) (m = as.length - 1) < 0说明counterCells长度为0,即没有实际内容
(a = as[ThreadLocalRandom.getProbe() & m]) ==
null
||
// counterCells是一个分段计算的概念,这里就是随机找一个槽,如果这个槽位空的话
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 槽不为空的话就用cas把槽里面的值赋值给v+a.val 如果失败的话
fullAddCount(x, uncontended);
// 用fullAddCount 就是LongAdder的模式将x放进去,初始化counterCells
return
;
}
if
(check <=
1
)
// check小于等于1(1表示是当前节点的第一个元素,-1标识是replace方法或者clear过来的,这些都不需要扩容)
return
;
s = sumCount();
// 计算一下现在的总量
}
if
(check >=
0
) {
//baseCount成功的情况且 check大于0,即不是replace或者clear方法进来的
Node<K,V>[] tab, nt;
int
n, sc;
/**
1.s >= (long)(sc = sizeCtl) s是总数量 s大于等于扩容阈值sizeCtl时,或者sizeCtl为负数,正在扩容时
2.(tab = table) != null 并且容器tab不为空,意思就是表已经初始化完毕了,是执行扩容而不是执行初始化
3. (n = tab.length) < MAXIMUM_CAPACITY) 并且容量还没有达到最大容量时
*/
while
(s >= (
long
)(sc = sizeCtl) && (tab = table) !=
null
&&
(n = tab.length) < MAXIMUM_CAPACITY) {
int
rs = resizeStamp(n);
// 获得容量特征值:容量n的左0数量并将高位置1
if
(sc <
0
) {
// 如果sc小于0 意味这个正在进行并发扩容
/**
sc >>> RESIZE_STAMP_SHIFT != rs是判断现在的容量与sc中记录的长度是不是不一样长,不一样长的话说明不是同一长度的扩容,就要break-保证是同等长度的扩容
sc == rs + 1 这个地方在看来是一个bug,如果已经处于扩容中,sc目前处于高16位是容量特征值,低16位标识扩容线程量特征值的状态,而此时的rs是容量特征值的状态,也就是目前sc的高16位,这里原意应该是判断目前是不是只剩下一个非初始扩容线程
,为什么这么说呢,因为初始扩容线程会把sc设置称高16位是容量特征值即rs,低16位标识扩容线程量特征值(初始扩容线程+2,非初始扩容线程+1),这一步判断的就是如过只剩下一个非初始扩容线程了,说明初始扩容线程已经结束,现在已经进入扩容的最后阶段了,就不需要再继续进行辅助扩容了
,正确的方式应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1
sc == rs + MAX_RESIZERS 和上面+1一样,不过这个是判断参与resize的线程是否已经达到所允许的最大值 ,看起来也是一个bug,正确的应是sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
(nt = nextTable) == null nextTable为空说明已经扩容完毕了,不再需要辅助扩容了
transferIndex <= 0 // 判断下边界是不是已经到0了,扩容是根据CPU算出步长,一个扩容线程扩容一段,从高到低分配,transferIndex记录的就行目前分配到哪个下标了,如果这个下标小于等于0说明整个容器已经分配完了,就不需要在辅助扩容了
*/
if
((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +
1
||
sc == rs + MAX_RESIZERS || (nt = nextTable) ==
null
||
transferIndex <=
0
)
break
;
if
(U.compareAndSwapInt(
this
, SIZECTL, sc, sc +
1
))
// 如果是辅助扩容的sc已经处于高16位是容量特征值,低16位标识扩容线程量特征值的状态了+1让地位的扩容线程量特征值再+1
transfer(tab, nt);
// 进行扩容
}
else
if
(U.compareAndSwapInt(
this
, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) +
2
))
// 初始扩容现在是+2 执行完这一步sc的值高16位是容量特征值,低16位标识扩容线程量特征值,因为resizeStamp时将高位置为1是的其又是一个负数,表示正在扩容
transfer(tab,
null
);
s = sumCount();
// 计算总数
}
// 一直循环扩容
}
}
|
解析: count的维护有两种方式:未产生并发场景时通过baseCount维护,经历过并发场景后转变为通过counterCells维护。baseCount模式为直接使用int进行加减运算,cas保证同步,counterCells模式类似于1.8之前的ConcurrentHashMap,采用一个分段的概念,运算时随机找一个槽,通过cas保证一个槽的值加算同步,count即为所以槽的加和(使用LongAdder实现)。 addCount方法初始即通过counterCells是否为空(为空说明未经历初始化,使用的baseCount模式)来判断当前是否为counterCells模式,如果处于counterCells模式则进入counterCells模式计算逻辑,否则的话使用baseCount模式来维护count并记录为s,如果遭遇并发导致维护失败,则转换为counterCells模式进入counterCells模式计算逻辑。 进入counterCells模式,首先看counterCells是否已完成初始化,若为初始化进入fullAddCount,已初始化的话则随机找到counterCells中的一个槽位,如果这个槽位未初始化则进入fullAddCount,如果已初始化则对这个操作进行CAS的加操作来维护count,CAS成功则维护完成,并发导致CAS维护失败则进入fullAddCount。fullAddCount有点类似于CMS的full GC退化机制,会完成counterCells的初始化以及并发冲突场景下同步完成count维护。counterCells最后则判断check是否小于等于1(1表示是当前节点的第一个元素,-1标识是replace方法或者clear过来的,这些都不需要扩容)则直接结束,不进行扩容判定,否则的话通过sumCount获得当前count并记录为s以准备后续的扩容判定。 check大于等于0(即不是replace或者clear方法引起的count改变)时进行扩容判定,执行扩容需要满足三个条件:1.count大于扩容阈值或者当前正在执行扩容操作2.容器已初始化完毕3.容量未达到最大容量。满足以上条件则需判定当前是否正在扩容,因为扩容时会将SIZECTL设置为一个特征值,这个特征值为负值,因此通过sc是否小于0来判定当前是否处于扩容状态。接下来使用resizeStamp获得容量特征值(一个表示容量n的左0数量并将高位置1的值),如果当前未处于扩容过程,则通过容量特征值获得扩容特征值(高16位为容量特征值,低16位为扩容线程量特征值)并通过CAS修改SIZECTL为扩容特征值,成功则当前线程作为第一个扩容线程启动扩容,失败则重新计算count并进行扩容判定。 如果已处于扩容过程,则判断是否还要写辅助扩容线程,满足以下几个条件则:1.此次扩容的容量与当前正在进行的容量一致2.扩容线程未达到最大额定值3.不是处于扩容收尾阶段(只剩下一个辅助扩容线程,nextTable已经为空,扩容下标已为0无法继续分配)则使用CAS修改扩容特征值成功后当前线程作为辅助扩容线程加入扩容任务,不满足条件则重新计算count并自旋进行扩容判定.
ConcurrentHashMap的实际扩容操作通过transfer来完成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
private
final
void
transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int
n = tab.length, stride;
// CPU大于1的话 步长就是0.25n/CPU数 否则就是n 算出来的步长小于最小步长的话 就按最小步长来
if
((stride = (NCPU >
1
) ? (n >>>
3
) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// subdivide range
if
(nextTab ==
null
) {
// initiating // nextTab == null 说明是第一个来扩容的线程
try
{
@SuppressWarnings
(
"unchecked"
)
Node<K,V>[] nt = (Node<K,V>[])
new
Node<?,?>[n <<
1
];
// 建立一个二倍长度的数组,然后赋值给nextTab
nextTab = nt;
}
catch
(Throwable ex) {
// try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
// 异常的话就是OOM? 唯一的异常也就是n过大?
return
;
}
nextTable = nextTab;
// 这个赋值为什么不做同步呢?不怕并发问题吗,哦 不会 因为调用这个方法的地方已经确定了,只会有一个初始线程过来
transferIndex = n;
// n就是新的长度
}
int
nextn = nextTab.length;
// 新容量
ForwardingNode<K,V> fwd =
new
ForwardingNode<K,V>(nextTab);
// 创建一个中继节点
boolean
advance =
true
;
//一个终止的标记
boolean
finishing =
false
;
// to ensure sweep before committing nextTab // 是否已经处于收尾扫描提交阶段
for
(
int
i =
0
, bound =
0
;;) {
// 一直循环着按步长扩容
Node<K,V> f;
int
fh;
while
(advance) {
// 计算出bound和i 赋值TRANSFERINDEX
int
nextIndex, nextBound;
if
(--i >= bound || finishing)
// i大于bound,说明什么呢,说明i已经在区间内了
advance =
false
;
else
if
((nextIndex = transferIndex) <=
0
) { 如果下一次的上边界已经到
0
了 也停止
i = -
1
;
advance =
false
;
}
else
if
(U.compareAndSwapInt
(
this
, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
//nextBound作为新的上边界
nextIndex - stride :
0
))) {
bound = nextBound;
//nextBound作为这次的下边界
i = nextIndex -
1
;
// 从nextIndex(上边界) - 1开始依次递减到bound区域
advance =
false
;
}
}
if
(i <
0
|| i >= n || i + n >= nextn) {
// 如果i不在0-n范围内,已经越界
int
sc;
if
(finishing) {
// 如果已经在收尾阶段
nextTable =
null
;
// 重置nextTable
table = nextTab;
// 容器确认
sizeCtl = (n <<
1
) - (n >>>
1
);
// 2n-0.5n = 0.75*2n 就是新的扩容阈值
return
;
}
if
(U.compareAndSwapInt(
this
, SIZECTL, sc = sizeCtl, sc -
1
)) {
// sc减1 意味当前扩容线程结束
if
((sc -
2
) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 如果sc-2和容量特征值一样时,就说明现在是最后的一个扩容线程了
return
;
// 直接返回
finishing = advance =
true
;
// 不是最后的话 设置为收尾阶段?,确实,进入这里面已经越界了...
i = n;
// recheck before commit // 提交前的重复检查?
}
}
else
if
((f = tabAt(tab, i)) ==
null
)
// 如果i处还没有使用
advance = casTabAt(tab, i,
null
, fwd);
// cas设置i处的新fwdnode,成功还是失败会记录在advance上面
else
if
((fh = f.hash) == MOVED)
// 如果这节点已经是一个中继节点 说明什么呢?说明已经有人处理过了上一步
advance =
true
;
// already processed
else
{
// 如果有值,并且不是中继节点的话
synchronized
(f) {
// 同步处理f,所以只会有一个线程作为参与者
if
(tabAt(tab, i) == f) {
// 同步之后还要再判断一下 万一f被并发处理掉了呢
Node<K,V> ln, hn;
if
(fh >=
0
) {
// fh>=0说明是一个还没树化的链表?
int
runBit = fh & n;
Node<K,V> lastRun = f;
for
(Node<K,V> p = f.next; p !=
null
; p = p.next) {
int
b = p.hash & n;
if
(b != runBit) {
// 这是找最后一段的开头节点
runBit = b;
lastRun = p;
}
}
if
(runBit ==
0
) {
// 最后一段的开头节点是低位的话
ln = lastRun;
// 开头节点赋值到ln 也就是说从lastRun开始,之后的都没有变化了,都是在同一位
hn =
null
;
}
else
{
hn = lastRun;
ln =
null
;
}
for
(Node<K,V> p = f; p != lastRun; p = p.next) {
// 只需遍历到lastRun即可
int
ph = p.hash; K pk = p.key; V pv = p.val;
if
((ph & n) ==
0
)
ln =
new
Node<K,V>(ph, pk, pv, ln);
// 往前补
else
hn =
new
Node<K,V>(ph, pk, pv, hn);
// 往前补
}
setTabAt(nextTab, i, ln);
// 赋值新容器
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
// 旧容器节点变为中继节点,但是如果在这一步之前就有新元素来了怎么办,不会来,因为新增也是sync(f)的
advance =
true
;
}
else
if
(f
instanceof
TreeBin) {
// 如果是一个树化节点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo =
null
, loTail =
null
;
TreeNode<K,V> hi =
null
, hiTail =
null
;
int
lc =
0
, hc =
0
;
for
(Node<K,V> e = t.first; e !=
null
; e = e.next) {
int
h = e.hash;
TreeNode<K,V> p =
new
TreeNode<K,V>
(h, e.key, e.val,
null
,
null
);
if
((h & n) ==
0
) {
if
((p.prev = loTail) ==
null
)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else
{
if
((p.prev = hiTail) ==
null
)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
// 退化或者重新树化
(hc !=
0
) ?
new
TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc !=
0
) ?
new
TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance =
true
;
}
}
}
}
}
}
|
解析: transfer采用分段扩容的方式,从n-1->0,每个线程每次会占用一个步长的区间,然后针对这个区间进行扩容,扩容完毕再去占用下一个区间,直到无法占用新的区间则结束扩容流程。 步长的计算与应用的CPU数有关,如果当前应用CPU数为1则步长为n,即单线程库扩容。如果当前应用CPU数不为1,则为0.25n/CPU,但是如果算出的步长小于最小MIN_TRANSFER_STRIDE(16),则以MIN_TRANSFER_STRIDE为步长。 接下来通过nextTable 是否为空判断是否为初始扩容线程,如果是的话就创建一个长度为2n的新容器并记录到nextTable属性,所有扩容线程共同使用nextTable作为新容器,同时记录当前扩容进度下标transferIndex为n(扩容占用是从n到0的,因此初始未占用时就是n)。这里虽然没进行加锁但是依然不会有并发问题,因为nextTable为空的只有初始线程,而初始线程的创建则是线程安全的。 步长确认完毕,新容器创建完毕,扩容进度下标初始化完成,接下来就开始进行区间占用了:使用自旋+CAS来进行线程安全的区间占用,advance作为自旋结束的条件:如果i大于bound意味着本次占用的区间还没处理完则结束自旋。finishing意味着扩容已经进入尾声了,也无需再加入扩容,结束自旋。扩容进度下标transferIndex小于等于0说明所有的区间都已经被占用了也无需再加入扩容,结束自旋。如果以上条件不成立,则使用CAS修改扩容进度下标transferIndex尝试占用一个步长的区间,如果失败则自旋继续判定区间占用,修改成功则意味着区间占用成功,赋值本次区间的扩容游标i以及下标边界bound。 区间占用成功后开始从i到bound的扩容阶段,首先需要判断i的合法性(需要在i和n之间,即判断是不是已扩容完毕,如–i的循环会使得i为负数表示扩容完毕),如果没占据到区间则判断是不是处于于finishing阶段,处于的话说明自己以及是最后一个扩容线程了,将新容器nextTable赋值为table,并设置新的扩容阈值。如果不是处于finishing阶段则因为当前线程没占据到区间需要退出扩容,那么响应的扩容特征值也要相应的-1来完成扩容线程量特征值的维护,如果扩容特征值维护失败则需要继续自旋尝试退出,如果维护成功则判断自己退出之后是不是只能先一个扩容线程,因为起始扩容线程的线程扩容量特征值为+2,所以通过-2来判断,如果自己退出之后仅剩一个线程在扩容,那么就把finishing设置为true,以使其在扩容工作结束后进行新容器的赋值以及新扩容阈值的维护。 如果i处于合法范围内,说明处于正常扩容中,那么获取i处的节点,节点为空则利用一个CAS为其赋值一个ForwardingNode节点,这个节点是一个中继节点,意味着这个节点正处于扩容状态,其hash值为特殊值MOVED(-1),在5 新数据载入-putVal 的过程中如果发现目标节点hash值为MOVED,那么putVal线程就会暂停put操作,作为辅助扩容线程先行扩容容器,扩容完毕后再进行put操作。 如果i处节点不会空但是hash值已经是MOVED,那么说明节点已经处于迁移状态则跳过。 如果以上均不符合,说明i合法,且i处拥有实际的数据节点,那么使用i处的节点f作为锁通过synchronized来达成线程安全的扩容,因为putVal过程中也是用i处的节点f作为锁进行synchronized的,意味着对i处的操作扩容和赋值只有一个过程能操作,以此来保证putVal和transfer的并发安全性。synchronized内部的扩容过程因为已经保证了线程同步,因此和HashMap的扩容过程区别并不大,这里不再描述,感兴趣的话可以查看9: 从源码看HashMap:一文看懂HashMap了解.
在5 新数据载入-putVal 中以及7 进行扩容-transfer中都提到过如果putVal过程中发现目标节点是一个中继节点ForwardingNode(hash值为特殊值MOVED)那么putVal会暂停put操作,通过helpTransfer方法进行辅助扩容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
final
Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab;
int
sc;
if
(tab !=
null
&& (f
instanceof
ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) !=
null
) {
int
rs = resizeStamp(tab.length);
while
(nextTab == nextTable && table == tab &&
(sc = sizeCtl) <
0
) {
if
((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +
1
||
sc == rs + MAX_RESIZERS || transferIndex <=
0
)
break
;
if
(U.compareAndSwapInt(
this
, SIZECTL, sc, sc +
1
)) {
transfer(tab, nextTab);
break
;
}
}
return
nextTab;
}
return
table;
}
|
解析: 这个方法是不是看起来很熟悉,是的,除了ForwardingNode判断,其主要逻辑和6 维护与启动扩容-addCount判定扩容时的逻辑大致相当。因此也不在额外表述。这里贴出来的目的仅仅是为了完整的体现putVal遭遇扩容并发的处理过程.
以上就是JAVA核心知识之从源码看ConcurrentHashMap的详细内容,更多关于JAVA核心知识之从源码看ConcurrentHashMap的资料请关注我其它相关文章! 。
原文链接:https://blog.csdn.net/yue_hu/article/details/113754357 。
最后此篇关于JAVA核心知识之ConcurrentHashMap源码分析的文章就讲到这里了,如果你想了解更多关于JAVA核心知识之ConcurrentHashMap源码分析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
积累和总结,是长期持续的过程 01 最近,很多朋友微信私聊关于「 butte-java-note 」仓库的话题; 这个「 Git仓库 」每年都会
我即将参加挑战测试,所以我不必参加数据库处理类(class)。尽管在过去的 5 年里我一直在使用数据库,但我还是忍不住对测试感到紧张。这是 50 个问题,有 2 部分:真/假部分和我实际编写 SQL
我的 groovy 代码将 Rabbit Native Plugin 用于 grails: def handleMessage(def body, MessageContext context) {
我想看看是否有人可以就我在 .NET 环境中的进一步知识提供任何建议... 这里有一点背景。我上了一所大学并获得了计算机科学学士学位(主要从事 C、Java 和 C++ 方面的工作)。大学毕业后在一家
我在 postgres 数据库中有一个表,该表用于测试环境,我们需要一次添加和删除多个列。问题是 postgres 最多有 1600 列,并且这个计数包括丢弃的列。我的表永远不会有 1600 个“未丢
作为业余程序员 3 年(主要是 Python 和 C)并且从未编写过超过 500 行代码的应用程序,我发现自己面临两个选择: (1) 学习数据结构和算法设计的基本知识,使我成为一名 l33t 计算机科
有人能告诉我为什么 Android 工作需要 Linux 知识吗?许多 Android 工作都以 Linux 作为先决条件。我可以很好地从 Windows 机器开发 Android 应用程序吗? 最佳
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 10 年前。 Improve thi
是否可以在 Drools 中保持知识 session ?如果是这样,如何? 我将事实存储在数据库中,并且每次添加新事实时,我都希望避免在新 session 中重新加载所有事实。 目前,当有新事实时,该
我对 C++ 有很好的了解,但从未深入研究 STL。我必须学习 STL 的哪一部分才能提高工作效率并减少工作中的缺陷? 谢谢。 最佳答案 I have good knowledge of C++ 恕我
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 9 年前。 Improve
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 7 年前。 Improve
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
在我从 SO answers here 和许多 BackBoneJs 示例中选择的示例之一中,我看到初始化函数知道模型将使用哪个 View 进行渲染。我不知道我现在有点偏见,这是一个好的做法还是取决于
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 12 年前。 Improve thi
我在我的网站上使用 C# 和 ASP.NET MVC 3 实现 OpenID 和 OAuth。我基于 DotNetOpenAuth用于后端和openid-selector对于前端。 我喜欢 openi
很长一段时间以来,我都在思考和研究C语言编译器以汇编形式的输出,以及CPU架构。我知道这对你来说可能很愚蠢,但在我看来有些东西是非常无效的。如果我错了,请不要生气,我不明白所有这些原则是有原因的。如果
我有一些 Python 知识,但我从来不认为自己对这门语言特别流利。我正在开发一个潜在的机器视觉项目,该项目将从 SimpleCV 中受益匪浅,但从时间的角度来看,我宁愿不必非常流利地使用 pytho
我是一名优秀的程序员,十分优秀!