ConcurrentHashMap類似Hashtable,是HashMap更高效的線程安全版本的實現。不同于Hashtable簡單的將所有方法標記為synchronized,它將內部數組分成多個Segment,每個Segment類似一個Hashtable,從而減少鎖的粒度,并且它內部有一些比較tricky實現,讓get操作很多時候甚至不需要鎖(本文代碼基于JDK 1.7,它在JDK 1.6的基礎上做了進一步的優化,想要看JDK 1.6的實現,可以參考http://www.iteye.com/topic/344876以及http://www.ibm.com/developerworks/cn/java/j-jtp08223/。并且個人感覺這篇文章已經說的很好了,原本我沒必要再寫,繼續決定寫只是為了加深自己的印象。這是我的寫之前的原話,當時當我寫完以后,我發現我還是有必要寫這一篇文章的,因為JDK 1.7版本和1.6版本的ConcurrentHashMap的實現已經有了一些優化,我在中間做了一些比較,通過這些比較,我們可以清楚的看到一些這些代碼的演化過程,也為我們在自己的項目里做一些優化提供參考,雖然如我推薦的第一篇文章中的末尾寫道:ConcurrentHashMap的實現值得學習,不值得模仿。我表示贊同,因為你如果有一點沒考慮好的話,就容易出錯,在性能和正確性上,我們當然選擇正確性。。。。)。當時也要注意,雖然ConcurrentHashMap在性能上比Hashtable提高了很多,但是它也有它自己的限制:
1. 它沒有一個基于整個Map的鎖,因而如果需要基于整個Map做操作,則需要自己額外的在外層套鎖。當然由于它的線程安全特性,你可以不額外加鎖,因為你在遍歷的時候可以繼續添加、刪除,然而有些時候這并不符合你的需求(具體理由參考第2點)。
2. 它實現了一種更加細粒度的happens-before的關系。由于上述提到的當一個線程在遍歷時,可以有其他線程同時在做添加、刪除等操作。而有些情況下,真實需求時當一個遍歷發生在添加、刪除中間時,該遍歷線程應該等到添加、刪除線程完成后再開始,從而遍歷線程能看到添加、刪除后的結果;同樣對有線程正在遍歷時,希望其他的添加、刪除線程能夠等待。雖然這種細微的差別好像對一般程序來手影響不大。
HashEntry定義
類似HashMap,Segment中內部數組的每一項都是一個單項鏈節點,它包含了key、hash、value等信息:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
}
在JDK 1.6中,HashEntry中的next指針也定義為final,并且每次插入將新添加節點作為鏈的頭節點(同HashMap實現),而且每次刪除一個節點時,會將刪除節點之前的所有節點拷貝一份組成一個新的鏈,而將當前節點的上一個節點的next指向當前節點的下一個節點,從而在刪除以后有兩條鏈存在,因而可以保證即使在同一條鏈中,有一個線程在刪除,而另一個線程在遍歷,它們都能工作良好,因為遍歷的線程能繼續使用原有的鏈。因而這種實現是一種更加細粒度的happens-before關系,即如果遍歷線程在刪除線程結束后開始,則它能看到刪除后的變化,如果它發生在刪除線程正在執行中間,則它會使用原有的鏈,而不會等到刪除線程結束后再執行,即看不到刪除線程的影響。如果這不符合你的需求,還是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()做的包裝。
另一個不同于1.6版本中的實現是它提供setNext()方法,而且這個方法調用了Unsafe類中的putOrderedObject()方法,該方法只對volatile字段有用,關于這個方法的解釋如下:
Sets the value of the object field at the specified offset in the supplied object to the given value. This is an ordered or lazy version of putObjectVolatile(Object,long,Object), which doesn't guarantee the immediate visibility of the change to other threads. It is only really useful where the object field is volatile, and is thus expected to change unexpectedly.
我對這個函數的理解:對volatile字段,按規范,在每次向它寫入值后,它更新后的值立即對其他線程可見(可以簡單的認為對volatile字段,每次讀取它的值時都直接從內存中讀取,而不會讀緩存中的數據,如CPU的緩存;對寫入操作也是直接寫入內存),而這個函數可以提供一種選擇,即使對volatile字段的寫操作,我們也可以使用該方法將它作為一種普通字段來對待。這里setNext()方法的存在是為了在remove時不需要做拷貝額外鏈進行的優化,具體可以參看remove操作。
Segment中的put操作
在之前的JDK版本中,Segment的put操作開始時就會先加鎖,直到put完成才解鎖。在JDK 1.7中采用了自旋的機制,進一步減少了加鎖的可能性。
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)i << TSHIFT) + TBASE);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
先不考慮自旋等待的問題,假如put一開始就拿到鎖,那么它會執行以下邏輯:
- 根據之前計算出來的hash值找到數組相應bucket中的第一個鏈節點。這里需要注意的是:
a. 因為ConcurrentHashMap在計算Segment中數組長度時會保證該值是2的倍數,而且Segment在做rehash時也是每次增長一倍,因而數組索引只做"(tab.length - 1) & hash"計算即可。
b. 因為table字段時一個volatile變量,因而在開始時將該引用賦值給tab變量,可以減少在直接引用table字段時,因為該字段是volatile而不能做優化帶來的損失,因為將table引用賦值給局不變量后就可以把它左右普通變量以實現編譯、運行時的優化。
c. 因為之前已經將volatile的table字段引用賦值給tab局不變量了,為了保證每次讀取的table中的數組項都是最新的值,因而調用entryAt()方法獲取數組項的值而不是通過tab[index]方式直接獲取(在put操作更新節點鏈時,它采用Unsafe.putOrderedObject()操作,此時它對鏈頭的更新只局限與當前線程,為了保證接下來的put操作能夠讀取到上一次的更新結果,需要使用volatile的語法去讀取節點鏈的鏈頭)。
- 遍歷數組項中的節點鏈,如果在節點中能找到key相等的節點,并且當前是put()操作而不是putIfAbsent()操作,紀錄原來的值,更新該節點的值,并退出循環,put()操作完成。
- 如果在節點鏈中沒有找到key相等的節點,創建一個新的節點,并將該節點作為當前鏈頭插入當前鏈,并將count加1。和讀取節點鏈連頭想法,這里使用setEntryAt()操作以實現對鏈頭的延時寫,以提升性能,因為此時并不需要將該更新寫入到內存,而在鎖退出后該更新自然會寫入內存[參考Java的內存模型,注1]。然后當節點數操作閥值(capacity*loadFactor),而數組長度沒有達到最大數組長度,會做rehash。另外,如果scanAndLockForPut()操作返回了一個非空HashEntry,則表示在scanAndLockForPut()遍歷key對應節點鏈時沒有找到相應的節點,此時很多時候需要創建新的節點,因而它預創建HashEntry節點(預創建時因為有些時候它確實不需要再創建),所以不需要再創建,只需要更新它的next指針即可,這里使用setNext()實現延時寫也時為了提升性能,因為當前修改并不需要讓其他線程知道,在鎖退出時修改自然會更新到內存中,如果采用直接賦值給next字段,由于next時volatile字段,會引起更新直接寫入內存而增加開銷。
Segment中的scanAndLockForPut操作
如put源碼所示,當put操作嘗試加鎖沒成功時,它不是直接進入等待狀態,而是調用了scanAndLockForPut()操作,該操作持續查找key對應的節點鏈中是已存在該機節點,如果沒有找到已存在的節點,則預創建一個新節點,并且嘗試n次,直到嘗試次數操作限制,才真正進入等待狀態,計所謂的自旋等待。對最大嘗試次數,目前的實現單核次數為1,多核為64:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在這段邏輯中,它先獲取key對應的節點鏈的頭,然后持續遍歷該鏈,如果節點鏈中不存在要插入的節點,則預創建一個節點,否則retries值資增,直到操作最大嘗試次數而進入等待狀態。這里需要注意最后一個else if中的邏輯:當在自旋過程中發現節點鏈的鏈頭發生了變化,則更新節點鏈的鏈頭,并重置retries值為-1,重新為嘗試獲取鎖而自旋遍歷。
Segment中的rehash操作
rehash的邏輯比較簡單,它創建一個大原來兩倍容量的數組,然后遍歷原來數組以及數組項中的每條鏈,對每個節點重新計算它的數組索引,然后創建一個新的節點插入到新數組中,這里需要重新創建一個新節點而不是修改原有節點的next指針時為了在做rehash時可以保證其他線程的get遍歷操作可以正常在原有的鏈上正常工作,有點copy-on-write思想。然而Doug Lea繼續優化了這段邏輯,為了減少重新創建新節點的開銷,這里做了兩點優化:1,對只有一個節點的鏈,直接將該節點賦值給新數組對應項即可(之所以能這么做是因為Segment中數組的長度也永遠是2的倍數,而將數組長度擴大成原來的2倍,那么新節點在新數組中的位置只能是相同的索引號或者原來索引號加原來數組的長度,因而可以保證每條鏈在rehash是不會相互干擾);2,對有多個節點的鏈,先遍歷該鏈找到第一個后面所有節點的索引值不變的節點p,然后只重新創建節點p以前的節點即可,此時新節點鏈和舊節點鏈同時存在,在p節點相遇,這樣即使有其他線程在當前鏈做遍歷也能正常工作:
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
Segment中的remove操作
在JDK 1.6版本中,remove操作比較直觀,它先找到key對應的節點鏈的鏈頭(數組中的某個項),然后遍歷該節點鏈,如果在節點鏈中找到key相等的節點,則為該節點之前的所有節點重新創建節點并組成一條新鏈,將該新鏈的鏈尾指向找到節點的下一個節點。這樣如前面rehash提到的,同時有兩條鏈存在,即使有另一個線程正在該鏈上遍歷也不會出問題。然而Doug Lea又挖掘到了新的優化點,為了減少新鏈的創建同時利用CPU緩存的特性,在1.7中,他不再重新創建一條新的鏈,而是只在當起緩存中將鏈中找到的節點移除,而另一個遍歷線程的緩存中繼續存在原來的鏈。當移除的是鏈頭是更新數組項的值,否則更新找到節點的前一個節點的next指針。這也是HashEntry中next指針沒有設置成final的原因。當然remove操作如果第一次嘗試獲得鎖失敗也會如put操作一樣先進入自旋狀態,這里的scanAndLock和scanAndLockForPut類似,只是它不做預創建節點的步驟,不再細說:
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
Segment中的其他操作
ConcurrentHashMap添加了replace接口,它和put的區別是put操作如果原Map中不存在key會將傳入的鍵值對添加到Map中,而replace不會這么做,它只是簡單的返回false。Segment中的replace操作先加鎖或自旋等待,然后遍歷相應的節點鏈,如果找到節點,則替換原有的值,返回true,否則返回false,比較簡單,不細究。
Segment中的clear操作不同于其他操作,它直接請求加鎖而沒有自旋等待的步驟,這可能是因為它需要對整個table做操作,因而需要等到所有在table上的操作的線程退出才能執行,而不象其他操作只是對table中的一條鏈操作,對一條鏈操作的線程執行的比較快,因而自旋可以后獲得鎖的可能性比較大,對table操作的等待相對要比較久,因而自旋等待意義不大。clear操作只是將數組的每個項設置為null,它使用setEntryAt的延遲設置,從而保證其他讀線程的正常工作。
Segment類的實現是ConcurrentHashMap實現的核心,因而理解了它的實現,要看ConcurrentHashMap的其他代碼就感覺很簡單和直觀了。
ConcurrentHashMap中的hash算法
這個貌似沒什么好說的,我也不知道為什么它要這么做,它的實現和HashMap類似,從1.6到1.7的變化也不大。類似HashMap實現,在每次操作都先用該方法計算出hash值,然后根據該值計算出Segment數組中的索引(在Segment中計算出HashEntry的索引),計算Segment數組的索引和計算Segment中HashEntry的索引不太一樣,在計算Segment數組索引時取的時hash值高位的值和(segments.length - 1)的值做'&'操作,而Segmnt中計算HashEntry的索引則使用低位值。
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
初始化segments數組
在ConcurrentHashMap構造函數中,它根據傳入的concurrencyLevel決定segments數組的長度,默認值為16,而將傳入的initialCapacity(保證2的倍數)除以segments數組的長度(最小值2)作為第一個segments數組中第一個Segment的HashEntry數組的長度,而在每次找到一個要插入的segments數組項的值為null時,參考第一個Segment實例的參數創建一個新的Segment實例賦值該對應的segments數組項:
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
ConcurrentHashMap中的get、containsKey、put、putIfAbsent、replace、Remove、clear操作
由于前面提到Segment中對HashEntry數組以及數組項中的節點鏈遍歷操作是線程安全的,因而get、containsKey操作只需要找到相應的Segment實例,通過Segment實例找到節點鏈,然后遍歷節點鏈即可,不細說。
對put、putIfAbsent、replace、remove、clear操作,它們在Segment中都實現,只需要通過hash值找到Segment實例,然后調用相應方法即可。
ConcurrentHashMap中的size、containsValue、contains、isEmpty操作
因為這些操作需要全局掃瞄整個Map,正常情況下需要先獲得所有Segment實例的鎖,然后做相應的查找、計算得到結果,再解鎖,返回值。然而為了竟可能的減少鎖對性能的影響,Doug Lea在這里并沒有直接加鎖,而是先嘗試的遍歷查找、計算2遍,如果兩遍遍歷過程中整個Map沒有發生修改(即兩次所有Segment實例中modCount值的和一致),則可以認為整個查找、計算過程中Map沒有發生改變,我們計算的結果是正確的,否則,在順序的在所有Segment實例加鎖,計算,解鎖,然后返回。以containsValue為例:
public boolean containsValue(Object value) {
// Same idea as size()
if (value == null)
throw new NullPointerException();
final Segment<K,V>[] segments = this.segments;
boolean found = false;
long last = 0;
int retries = -1;
try {
outer: for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
for (int j = 0; j < segments.length; ++j) {
HashEntry<K,V>[] tab;
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null && (tab = seg.table) != null) {
for (int i = 0 ; i < tab.length; i++) {
HashEntry<K,V> e;
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) {
found = true;
break outer;
}
}
}
sum += seg.modCount;
}
}
if (retries > 0 && sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}
其他的關于Collection和Iterator的實現和HashMap的實現類似,不再詳述。需要注意的一點是由于ConcurrentHashMap的線程安全性,因而它沒有如HashMap一樣實現fail-fast原則,即在遍歷時,依然可以對其做修改(put、remove),而HashMap不可以,否則會拋出ConcurrentModificationException。另一點區別時ConcurrentHashMap同樣不支持key或value為null的情況。
注1(http://www.ibm.com/developerworks/cn/java/j-jtp08223/):
JMM 掌管著一個線程對內存的動作 (讀和寫)影響其他線程對內存的動作的方式。由于使用處理器寄存器和預處理 cache 來提高內存訪問速度帶來的性能提升,Java 語言規范(JLS)允許一些內存操作并不對于所有其他線程立即可見。有兩種語言機制可用于保證跨線程內存操作的一致性――synchronized和volatile。
按照 JLS 的說法,“在沒有顯式同步的情況下,一個實現可以自由地更新主存,更新時所采取的順序可能是出人意料的。”其意思是說,如果沒有同步的話,在一個給定線程中某種順序的寫操作對于另外一個不同的線程來說可能呈現出不同的順序, 并且對內存變量的更新從一個線程傳播到另外一個線程的時間是不可預測的。
雖然使用同步最常見的原因是保證對代碼關鍵部分的原子訪問,但實際上同步提供三個獨立的功能――原子性、可見性和順序性。原子性非常簡單――同步實施一個可重入的(reentrant)互斥,防止多于一個的線程同時執行由一個給定的監視器保護的代碼塊。不幸的是,多數文章都只關注原子性方面,而忽略了其他方面。但是同步在 JMM 中也扮演著很重要的角色,會引起 JVM 在獲得和釋放監視器的時候執行內存壁壘(memory barrier)。
一個線程在獲得一個監視器之后,它執行一個讀屏障(read barrier)――使得緩存在線程局部內存(比如說處理器緩存或者處理器寄存器)中的所有變量都失效,這樣就會導致處理器重新從主存中讀取同步代碼塊使用的變量。與此類似,在釋放監視器時,線程會執行一個寫屏障(write barrier)――將所有修改過的變量寫回主存。互斥獨占和內存壁壘結合使用意味著只要您在程序設計的時候遵循正確的同步法則(也就是說,每當寫一個后面可能被其他線程訪問的變量,或者讀取一個可能最后被另一個線程修改的變量時,都要使用同步),每個線程都會得到它所使用的共享變量的正確的值。
如果在訪問共享變量的時候沒有同步的話,就會發生一些奇怪的事情。一些變化可能會通過線程立即反映出來,而其他的則需要一些時間(這由關聯緩存的本質所致)。結果,如果沒有同步您就不能保證內存內容必定一致(相關的變量相互間可能會不一致),或者不能得到當前的內存內容(一些值可能是過時的)。避免這種危險情況的常用方法(也是推薦使用的方法)當然是正確地使用同步。然而在有些情況下,比如說在像ConcurrentHashMap 之類的一些使用非常廣泛的庫類中,在開發過程當中還需要一些額外的專業技能和努力(可能比一般的開發要多出很多倍)來獲得較高的性能。
posted on 2013-10-18 22:24
DLevin 閱讀(10704)
評論(0) 編輯 收藏 所屬分類:
Core Java