記得很久以前有一次面試被問到如何編寫無鎖程序,我當時覺得那個面試官腦子進水了,我們確實可以在某些情況下減少鎖的使用,但是怎么可能避免呢?當然我現在還是持這種觀點,在Java中,你可以有很多方法減少鎖的使用(至少在你自己的代碼中看起來):
1. 比如常見的可以使用volatile關鍵字來保證某個字段在一個線程中的更新對其他線程的可見性;
2. 可以使用concurrent.atomic包中的各種Atomic類來實現某些基本類型操作的,它主要采用忙等機制(CAS,compare and swap方法)以及內部的volatile變量來實現不加鎖情況下對某個字段特定更新的線程安全;
3. 使用ThreadLocal,為每個線程保存保存自己的對象,保證對它的操作永遠只有一個線程;
4. 使用concurrent包下已經實現的線程安全集合,如ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList、CopyOnWriteArraySet等,在這些集合的實現中也會看到volatile和CAS的身影,但是有些時候它們自身也不得不使用鎖;
5. 使用BlockingQueue實現生產者消費者模式,這個BlockingQueue有點類似EventBus,很多時候可以簡化多線程編程環境下的復雜性,在特定情況下還可以采用單個生產者或單個消費者的方式來避免多線程環境,但是在對一些臨界資源和類操作時,還是會需要使用鎖,而且在很多BlockingQueue的內部實現中也使用了鎖;
6. 使用Guava提供的StripLock方式來將一些特定的操作、數據映射到固定的線程中,從而保證對它們的操作的線程安全。
然而很多時候Java提供的那些所謂的線程安全類只是一些小范圍的操作,比如AtomicInteger中的incrementAndGet()/decrementAndGet(),ConcurrentHashMap中的get和put,當需要將一些操作組合在一起的時候,我們還是不得不使用鎖,比如更具一個key取出Map中的值,對值進行一定操作,然后將該值寫入Map中。以前我一直覺得這種操作不用鎖是不太可能解決的,但是最近做的一個項目需要從REUTERS中source數據,REUTERS中的很多數據tick都非常快,為了提高項目的處理能力,我們必須要減少鎖的使用,因而我開始嘗試著在不用鎖的情況下實現某些操作的組合而依然能保持線程安全,然后發現在組合使用CAS和ConcurrentMap接口中提供的putIfAbsent、replace、remove等根據傳入的值再來更新狀態的方式還真的能實現不用鎖組合某些操作而依然保持線程安全(至少在自己的代碼中無鎖)。對于這種嘗試出廠了一個ReferenceCountSet。
ReferenceCountSet實現
從這個Set的名字中應該已經能夠知道它的用途了,在我的這個項目中,我們需要自己維護對某些實例的引用計數,所以最基本的,必須有一個集合存儲一個實例到它的引用計數的映射關系,而一個實例在這個集合中必須是唯一存在,因而我自然想到了使用Map來保存;另外在對引用計數增加時,需要分以下幾個步驟實現:
1.
判斷一個新添加的實例是否已經在這個Map中存在
2.
如果不存在,則將該實例添加到這個Map中,并設置其引用計數為1
3.
如果已經存在,則需要取出已經存在的引用計數,對其加1,然后將新值寫入這個Map中。
對remove方法來說也是類似的,即需要取出Map中已經存在的計數值以后,對其引用減1,然后判斷是否為0來決定是否需要將當前實例真正從這個Map中移除。
既然需要那么多的組合操作,顯然沒有一個已經存在的Map可以實現不加鎖情況下的線程安全。因而我起初的選擇是使用HashMap<E, Integer>加鎖實現(以add為例):
synchronized(map) {
Integer refCount = map.get(element);
if (refCount == null) {
refCount = 1;
} else {
refCount += 1;
}
map.put(element, refCount);
}
但是如何不用鎖實現這些組合操作呢?秘訣就在于靈活的使用AtomicInteger和ConcurrentMap接口。首先需要將這個Map改成ConcurrentMap,如ConcurrentHashMap,然后將這個Map的值改成AtomicInteger,然后采用如下實現即可:
public boolean add(E element) {
AtomicInteger refCount = data.get(element);
if (refCount == null) {
// Avoid to put AtomicInteger(0) as during remove we need this value to compare
AtomicInteger newRefCount = new AtomicInteger(1);
refCount = data.putIfAbsent(element, newRefCount);
if (refCount == null) {
return true;
}
}
refCount.incrementAndGet();
return true;
}
在這個add方法實現中,我們首先直接使用傳入的element獲取內部存在AtomicInteger值,如果該值為null,表示當前還沒有對它有引用計數,因而我們初始化一個AtomicInteger(1)對象,但是這時我們不能直接將這個1作為該對象的引用計數,因為另一個線程可能在這中間已經添加相同對象的引用計數了,這個時候如果我們還直接寫入會覆蓋在這個中間添加的引用計數。所以我們需要使用ConcurrentMap中的putIfAbsent方法,即如果其他線程在這個還是沒有對這個對象有引用計數更新的情況下,我們才會真正寫入現在的引用計數值,從而不會覆蓋在這中間寫入的值,該方法返回原來已經在這個Map中的值,因而如果返回null,表示在這個過程中沒有其他線程對這個對象的計數值有改變,所以直接返回;如果返回不為null,表示在這個中間其他線程有對這個對象有做引用計數的改變,并且返回更新的AtomicInteger值,此時只需要像已經存在引用計數實例的情況下對這個返回的AtomicInteger只自增即可,由于AtomicInteger是線程安全的,因而這個操作也是安全的。并且由于對每個線程都使用同一個AtomicInteger引用實例,因而每個線程的自增都會正確的反映在Map的值中,因而這個操作也是正確的。
這里,我其實是將這個ConcurrentMap封裝在一個Set中,因而我們還需要實現一些其他方法,如size、contains、iterator、remove等,由于其他方法的在ConcurrentHashMap中已經是線程安全的了,因而我們只需要實現remove方法即可。這里的remove方法也包含了多個操作的組合:先取出以存在的計數,對其減1,如果發現它的計數已經減到0,將它從這個Map中移除。這里需要使用ConcurrentMap中提供的條件remove方法來實現:
public boolean remove(Object obj) {
AtomicInteger refCount = data.get(obj);
if (refCount == null) {
return false;
}
if (refCount.decrementAndGet() <= 0) {
return data.remove(obj, refCount);
}
return false;
}
這里需要解釋的就是這個條件remove方法,即當前在對這個Object對象的引用計數已經減到0的情況下,我們不能直接將其移除,因為在這個中間可能有另一個線程又增加了對它的引用計數,所以我們需要使用條件remove,即只有當前Map中對這個Object對象的值和傳入的值相等時才將它移除,這樣就保證了當前線程的操作不會覆蓋中間其他線程的結果。
在這個remove的實現中,有人可能會說在data.get(data)這句話執行完成后,加入它返回null,此時其他線程可能已經會添加了這個Object對象的引用,此時這個方法的執行結果就不對了,但是在這種情況下,即使加鎖,也無法解決這個問題,而且很多情況下的線程安全只能保證happen-before原則。關于這個類的實現也有一些其他細節的東西,具體可以查看這里:
https://github.com/dinglevin/levin-tools/blob/master/src/main/java/org/levin/tools/corejava/sets/ReferenceCountSet.java
posted on 2014-12-06 00:29
DLevin 閱讀(6554)
評論(3) 編輯 收藏 所屬分類:
Core Java 、
CodeTools 、
學習積累