更新時(shí)間:2022-08-26 來源:黑馬程序員 瀏覽量:
一、文章導(dǎo)讀:
這部分內(nèi)容讓大家讀懂ConcurrentHashMap源碼的底層實(shí)現(xiàn)從而在工作中合理去使用他并且在面試中能做到游刃有余,主要內(nèi)容如下:
* 核心構(gòu)造方法
* 核心成員變量
* put方法過程數(shù)據(jù)的完整過程
* get方法的無鎖實(shí)現(xiàn)
二、文章講解內(nèi)容
JDK8中ConcurrentHashMap的結(jié)構(gòu)是:數(shù)組+鏈表+紅黑樹。
因?yàn)樵趆ash沖突嚴(yán)重的情況下,鏈表的查詢效率是O(n),所以jdk8中改成了單個(gè)鏈表的個(gè)數(shù)大于8時(shí),數(shù)組長(zhǎng)度小于64就擴(kuò)容,數(shù)組長(zhǎng)度大于等于64,則鏈表會(huì)轉(zhuǎn)換為紅黑樹,這樣以空間換時(shí)間,查詢效率會(huì)變O(nlogn)。
紅黑樹在Node數(shù)組內(nèi)部存儲(chǔ)的不是一個(gè)TreeNode對(duì)象,而是一個(gè)TreeBin對(duì)象,TreeBin內(nèi)部維持著一個(gè)紅黑樹。
在JDK8中ConcurrentHashMap最經(jīng)點(diǎn)的實(shí)現(xiàn)是使用CAS+synchronized+volatile 來保證并發(fā)安全。
一、JDK7中ConcurrentHashMap的實(shí)現(xiàn)
在JDK1.7中ConcurrentHashMap是通過定義多個(gè)Segment來實(shí)現(xiàn)的分段加鎖,一個(gè)Segment對(duì)應(yīng)一個(gè)數(shù)組,如果多線程對(duì)同一個(gè)數(shù)組中的元素進(jìn)行添加那么多個(gè)線程會(huì)去競(jìng)爭(zhēng)同一把鎖,他鎖的是一個(gè)數(shù)組,有多少個(gè)segment那么就有多少把鎖,這個(gè)力度還是比較粗的。

JDK8的實(shí)現(xiàn)是下文要重點(diǎn)探討的內(nèi)容,同時(shí)下文全部都是JDK8的實(shí)現(xiàn)。
二、核心構(gòu)造方法
/**
* Creates a new, empty map with the default initial table size (16).
無參構(gòu)造函數(shù),new一個(gè)默認(rèn)table容量為16的ConcurrentHashMap
*/
public ConcurrentHashMap() {
}/**
* Creates a new, empty map with an initial table size
* accommodating the specified number of elements without the need
* to dynamically resize.
*
* @param initialCapacity The implementation performs internal
* sizing to accommodate this many elements.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative
做了3件事:
1.如果入?yún)?lt;0,拋出異常。
2.如果入?yún)⒋笥谧畲笕萘浚瑒t使用最大容量(是2的30次方)
3.tableSizeFor方法得到一個(gè)大于負(fù)載因子入?yún)⑶易罱咏?的N次方的數(shù)作為容量
4.設(shè)置sizeCtl的值等于初始化容量。未對(duì)table進(jìn)行初始化,table的初始化要在第一次put的時(shí)候進(jìn)行。
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
特別注意:未對(duì)table進(jìn)行初始化,table的初始化要在第一次put的時(shí)候進(jìn)行。
三、核心成員變量
// 該字段控制table(也被稱作hash桶數(shù)組)的初始化和擴(kuò)容 private transient volatile int sizeCtl; // table最大容量是2的30次方 private static final int MAXIMUM_CAPACITY = 1 << 30; // table的默認(rèn)初始化容量-擴(kuò)容總是2的n次方 private static final int DEFAULT_CAPACITY = 16; // table的負(fù)載因子。當(dāng)前已使用容量 >= 負(fù)載因子*總?cè)萘康臅r(shí)候,進(jìn)行resize擴(kuò)容 private static final float LOAD_FACTOR = 0.75f; // 存儲(chǔ)數(shù)據(jù)的數(shù)組-注意添加了volatile transient volatile Node<K,V>[] table; // 當(dāng)桶內(nèi)鏈表長(zhǎng)度>=8時(shí),會(huì)將鏈表轉(zhuǎn)成紅黑樹-注意需要和MIN_TREEIFY_CAPACITY結(jié)合起來用 static final int TREEIFY_THRESHOLD = 8; // table的總?cè)萘?,要大?4,桶內(nèi)鏈表才轉(zhuǎn)換為樹形結(jié)構(gòu),否則當(dāng)桶內(nèi)鏈表長(zhǎng)度>=8時(shí)會(huì)擴(kuò)容 static final int MIN_TREEIFY_CAPACITY = 64; // 當(dāng)桶內(nèi)node小于6時(shí),紅黑樹會(huì)轉(zhuǎn)成鏈表。 static final int UNTREEIFY_THRESHOLD = 6;
四、put存儲(chǔ)數(shù)據(jù)
先從JDK的源碼中復(fù)制一段上頭的代碼,如下代碼就完成了數(shù)據(jù)的添加,在添加的時(shí)候還完成了數(shù)組的初始化、擴(kuò)容、CAS修改、分段鎖的實(shí)現(xiàn),大家大體的對(duì)該代碼有一個(gè)認(rèn)識(shí)即可我們后面會(huì)詳細(xì)的畫圖分析該過程。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 如果key或者value為空,拋出異常
if (key == null || value == null) throw new NullPointerException();
// 得到hash值
int hash = spread(key.hashCode());
// 用來記錄所在table數(shù)組中的桶的中鏈表的個(gè)數(shù),后面會(huì)用于判斷是否鏈表過長(zhǎng)需要轉(zhuǎn)紅黑樹
int binCount = 0;
// for循環(huán),直到put成功插入數(shù)據(jù)才會(huì)跳出
for (Node<K,V>[] tab = table;;) {
// f=桶頭節(jié)點(diǎn) n=table的長(zhǎng)度 i=在數(shù)組中的哪個(gè)下標(biāo) fh=頭節(jié)點(diǎn)的hash值
Node<K,V> f; int n, i, fh;
// 如果table沒有初始化
if (tab == null || (n = tab.length) == 0)
// 初始化table
tab = initTable();
// 根據(jù)數(shù)組長(zhǎng)度減1再對(duì)hash值取余得到在node數(shù)組中位于哪個(gè)下標(biāo)
// 用tabAt獲取數(shù)組中該下標(biāo)的元素
// 如果該元素為空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 直接將put的值包裝成Node用tabAt方法放入數(shù)組內(nèi)這個(gè)下標(biāo)的位置中
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果頭結(jié)點(diǎn)hash值為-1,則為ForwardingNode結(jié)點(diǎn),說明正再擴(kuò)容,
else if ((fh = f.hash) == MOVED)
// 調(diào)用hlepTransfer幫助擴(kuò)容
tab = helpTransfer(tab, f);
// 否則鎖住槽的頭節(jié)點(diǎn)
else {
V oldVal = null;
// 鎖桶的頭節(jié)點(diǎn)
synchronized (f) {
// 雙重鎖檢測(cè),看在加鎖之前,該桶的頭節(jié)點(diǎn)是不是被改過了
if (tabAt(tab, i) == f) {
// 如果桶的頭節(jié)點(diǎn)的hash值大于0
if (fh >= 0) {
binCount = 1;
// 遍歷鏈表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果遇到節(jié)點(diǎn)hash值相同,key相同,看是否需要更新value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果到鏈表尾部都沒有遇到相同的
if ((e = e.next) == null) {
// 就生成Node掛在鏈表尾部,該Node成為一個(gè)新的鏈尾。
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果桶的頭節(jié)點(diǎn)是個(gè)TreeBin
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 用紅黑樹的形式添加節(jié)點(diǎn)或者更新相同hash、key的值。
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果鏈表長(zhǎng)度>需要樹化的閾值
if (binCount >= TREEIFY_THRESHOLD)
//調(diào)用treeifyBin方法將鏈表轉(zhuǎn)換為紅黑樹,而這個(gè)方法中會(huì)判斷數(shù)組值是否大于64,如果沒有大于64則只擴(kuò)容
treeifyBin(tab, i);
if (oldVal != null)
// 如果是修改,不是新增,則返回被修改的原值
return oldVal;
break;
}
}
}
// 計(jì)數(shù)器加1,完成新增后,table擴(kuò)容,就是這里面觸發(fā)
addCount(1L, binCount);
// 新增后,返回Null
return null;
}4.1 高效的hash算法
int hash = spread(key.hashCode());
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
} 該算法其實(shí)在HashMap中我們就已經(jīng)重點(diǎn)說過了,他既保存了key值得hash的高16位也保存了低16位,從而讓key值在不同的數(shù)組中盡可能的散列,從而避免hash沖突。
4.2 數(shù)組的初始化
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();注意是在循環(huán)中判斷的table是否為空如果為空則會(huì)調(diào)用initTable完成數(shù)組的默認(rèn)初始化。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab; 以上代碼做了如下事情:
1.如果table為空,進(jìn)入while準(zhǔn)備開始初始化。
2.將sizeCtl賦值給sc。如果sizeCtl<0,線程等待,小于零時(shí)表示有其他線程在執(zhí)行初始化。
3.如果能將sizeCtl設(shè)置為-1,則開始進(jìn)行初始化操作
4.用戶有指定初始化容量,就用用戶指定的,否則用默認(rèn)的16.
5.生成一個(gè)長(zhǎng)度為16的Node數(shù)組,把引用給table。
6.重新設(shè)置sizeCtl=數(shù)組長(zhǎng)度 - (數(shù)組長(zhǎng)度 >>>2) 這是忽略符號(hào)的移位運(yùn)算符,可以理解成 n - (n / 2)。

4.3 不沖突的數(shù)據(jù)添加過程
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
} 高效的尋址算法: (n - 1) & hash這個(gè)在我們的HashMap中也是講過的了就不在贅述。
通過高效的尋址算法得到一個(gè)索引并獲取該索引的數(shù)據(jù)如果不為空則調(diào)用casTabAt方法進(jìn)行CAS的原子修改,CAS在Java層面是無鎖的所以速度會(huì)很快,但是他在硬件層面是有鎖的,他會(huì)在硬件的拉鏈散列表中加鎖。
如果有多個(gè)線程同時(shí)hash到了該索引我們的CAS也能保證只有一個(gè)線程能添加成功其他的線程其實(shí)是走分段加鎖的過程。

4.4 分段加鎖策略
需要大家特別注意的是synchronized (f)鎖了一個(gè)f對(duì)象,那么這個(gè)f對(duì)象是什么呢?其實(shí)就是我們高效尋址算法的到的一個(gè)下標(biāo)中存儲(chǔ)的對(duì)象,注意他鎖的是我們尋址到的這個(gè)對(duì)象并沒有鎖這個(gè)數(shù)組,所以我們當(dāng)前的鎖一共有多少把呢?就看你的size有多少了默認(rèn)是16那么就會(huì)有16把鎖可以來并發(fā)的修改,這個(gè)其實(shí)就是JDK1.8的分段鎖拉,他比1.7鎖的粒度更細(xì)那么并發(fā)的效果就會(huì)更好。

五、無鎖獲取
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
} 以上代碼中的內(nèi)容幾乎我們都講過了,所以就不在贅述,但是值得一說的是獲取的過程中并沒有對(duì)數(shù)組或者某一個(gè)Node元素添加鎖,獲取是無鎖的所有性能高。
還有一個(gè)問題需要注意的是獲取是無鎖的那么他如果出現(xiàn)多線程修改或者寫入的時(shí)候他就有可能會(huì)出現(xiàn)可見性的問題,因?yàn)槊恳粋€(gè)線程都有自己的工作內(nèi)存,那么ConcurrentHashMap是如何解決可見性的問題呢?
transient volatile Node<K,V>[] table;
從變量的申明中我們可以看到存儲(chǔ)數(shù)據(jù)的table其實(shí)是添加了volatile關(guān)鍵字的,所以其他線程修改了以后我們要求其他的線程把數(shù)據(jù)刷新主內(nèi)存從而保證數(shù)據(jù)的可見性。
五、總結(jié):
1. JDK1.7 使用分段鎖實(shí)現(xiàn)
2. JDK1.8 使用CAS+synchronized+volatile 的具體實(shí)現(xiàn)
3. put方法的復(fù)雜實(shí)現(xiàn)過程
4. get方法的無鎖實(shí)現(xiàn)尤其是volatile關(guān)鍵字的使用