當前JDK對并發編程的支持
Sun在Java5中引入了concurrent包,它對Java的并發編程提供了強大的支持。首先,它提供了Lock接口,可用了更細粒度的控制鎖的區域,它的實現類有ReentrantLock,ReadLock,WriteLock,其中ReadLock和WriteLock共同用于實現ReetrantReadWriteLock(它繼承自ReadWriteLock,但是沒有實現Lock接口,ReadWriteLock接口也沒有繼承Lock接口)。而且,它還提供了一些常用并發場景下的類工具:Semaphore、CountDownLatch和CyclicBarrier。它們個字的應用場景:
- Semaphore(信號量)
有n個非線程安全的資源(資源池),這些資源使用一個Semaphore(計數信號量)保護,每個線程在使用這些資源時需要首先獲得一個信號量(acquire)表示當前資源池還有可用資源,然后線程從該資源池中獲取并移除一個資源,在使用完后,將該資源交回給資源池,并釋放已經獲得信號量(release)(這里的“移除”、“交回”并不一定需要顯示操作,只是一種形象的描述,之所以這么描述是應為這里的各個資源是一樣的,因而對一個線程它每次拿到的資源不一定是同一個資源,用于區分Stripe的使用場景),其中Pool是一種典型的應用。
- CountDownLatch(閉鎖)
有n個Task,它們執行完成后需要執行另外一個收尾的Task(Aggregated Task),比如在做Report計算中,有n個Report要計算,而在所有Report計算完成后需要生成一個基于所有Report結果的一個總的Report,而這個總的Report需要等到所有Report計算出結果后才能開始,此時就可以定義一個CountDownLatch,其初始值是n,在總的Report計算前調用CountDownLatch的await方法等待其他Report執行完成,而其他Report在完成后都會調用CountDownLatch中的countDown方法。
- CyclicBarrier(關卡)
每個線程執行完成后需要等待,直到n個線程都執行完成后,才能繼續執行,在n個線程執行完成之后,而下一次執行開始之前可以添加自定義邏輯(通過構建CyclicBarrier實例時傳入一個Runnable實例自定義邏輯),即在每個線程執行完成后調用CyclicBarrier的await方法并等待(即所謂的關卡),當n個線程都完成后,自定義的Runnable實例會自動被執行(如果存在這樣的Runnable實例的話),然后所有線程繼續下一次執行。這個現實中的例子沒有想到比較合適的。。。。
- Exchanger(交換者)
Exchanger是一種特殊的CyclicBarrier,它只有兩個線程參與,一個生產者,一個消費者,有兩個隊列共同參與,生產者和消費者各自有一個隊列,其中生產者向它的隊列添加數據,而消費者從它包含的隊列中拿數據,當生產者中的隊列滿時調用exchange方法,傳入自己原有的隊列,期待交換得到消費者中空的隊列;而當消費者中的隊列滿時同樣調用exchange方法,傳入自己的原有隊列,期待獲取到生產者中已經填滿的隊列。這樣,生產者和消費者可以和諧的生產消費,并且它們的步驟是一致的(不管哪一方比另一方快都會等待另一方)。
最后,Java5中還提供了一些atomic類以實現簡單場景下高效非lock方式的線程安全,以及BlockingQueue、Synchronizer、CompletionService、ConcurrentHashMap等工具類。
在這里需要特別添加對ConcurrentHashMap的描述,因為Guava中的Stripe就是對ConcurrentHashMap實現思想的抽象。在《
Java Core系列之ConcurrentHashMap實現(JDK 1.7)》一文中已經詳細講述了ConcurrentHashMap的實現,我們都知道ConcurrentHashMap的實現是基于Segment的,它內部包含了多個Segment,因而它內部的鎖是基于Segment而不是整個Map,從而減小了鎖的粒度,提升了性能。而這種分段鎖不僅僅在HashMap用到。
Stripe的應用場景
雖然JDK中已經為我們提供了很多用于并發編程的工具類,但是它并沒有提供對以下應用場景的支持:有n個資源,我們希望對每個資源的操作都是線程安全的,這里我們不能用Semaphore,因為Semaphore是一個池的概念,它所管理的資源是同質的,比如從數據庫的連接池中獲取Connection操作的一種實現方式是內部保存一個Semaphore變量,在每次獲取Connection時,先調用Semaphore的acquire方法以保證連接池中還有空閑的Connection,如果有,則可以隨機的選擇一個Connection實例,當Connection實例返回時,該Connection實例必須從空閑列表中移除,從而保證只有一個線程獲取到Connection,以保證一次只有一個線程使用一個Connection(在Java中數據庫的Connection是線程安全,但是我們在使用時依然會用連接池的方式創建多個Connection而不是在一個應用程序中只用一個Connection是因為有些數據庫廠商在實現Connection時,一個Connection內的所有操作都時串行的,而不是并行的,比如MySQL的Connection實現,因而為了提升并行性,采用多個Connection方式)。而這里的需求是對每個資源的操作都是線程安全的,比如對JDK中HashMap的實現采用一個數組鏈表的結構(參考《
Java Core系列之HashMap實現》),如果我們將鏈表作為一個資源單位(這里的鏈表資源和上述的數據庫連接資源是不一樣的,對數據庫連接每個線程只需要拿到任意一個Connection實例即可,而這里的鏈表資源則是不同鏈表是不一樣的,因而對每個操作,我們需要獲取特定的鏈表,然后對鏈表以線程安全的方式操作,因為這里多個線程會對同一個鏈表同時操作),那么為了保證對各個單獨鏈表操作的線程安全(如HashMap的put操作,不考慮rehash的情況,有些其他操作需要更大粒度的線程安全,比如contains等),其中一種簡單的實現方式是為每條鏈表關聯一個鎖,對每條鏈表的讀寫操作使用其關聯鎖即可。然而如果鏈表很多,就需要使用很多鎖,會消耗很多資源,雖然它的鎖粒度最小,并發性很高。然而如果各個鏈表之間沒有很高的并發性,我們就可以讓多個鏈表共享一個鎖以減少鎖的使用量,雖然增大了鎖的粒度,但是如果這些鏈表的并發程度并不是很高,那增大的鎖的粒度對并發性并沒有很大的影響。
在實際應用中,我們有一個Cache系統,它包含key和payload的鍵值對(Map),在Cache中Map的實現已經是線程安全了,然而我們不僅僅是向Cache中寫數據要保證線程安全,在操作payload時,也需要保證線程安全。因為我們在Cache中的數據量很大,為每個payload配置一個單獨的鎖顯然不現實,也不需要因為它們沒有那么高的并發行,因而我們需要一種機制將key分成不同的group,而每個group共享一個鎖(這就是ConcurrentHashMap的實現思路)。通過key即可獲得一個鎖,并且每個相同的key獲得的鎖實例是相同的(獲得相同鎖實例的key它們不一定相等,因為這是一對多的關系)。
Stripe的簡單實現
根據以上應用場景,Stripe的實現很簡單,只需要內部保存一個Lock數組,對每個給定的key,計算其hash值,根據hash值計算其鎖對應的數組下標,而該下標下的Lock實例既是和該key關聯的Lock實例。這里通過hash值把key和Lock實例關聯起來,為了擴展性,在實現時還可以把計算數組下標的邏輯抽象成一個接口,用戶可以通過傳入自定義該接口的實現類實例加入用戶自定義的關聯邏輯,默認采用hash值關聯方式。
Stripe在Guava中的實現
在Guava中,Stripe以抽象類的形式存在,它定義了通過給定key或index獲得相應Lock/Semaphore/ReadWriteLock實例:
public abstract class Striped<L> {
/**
* Returns the stripe that corresponds to the passed key. It is always guaranteed that if
* {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
*
* @param key an arbitrary, non-null key
* @return the stripe that the passed key corresponds to
*/ public abstract L get(Object key);
/**
* Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
* {@code size()}, exclusively.
*
* @param index the index of the stripe to return; must be in {@code [0
size())}
* @return the stripe at the specified index
*/ public abstract L getAt(
int index);
/**
* Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
*/ abstract int indexFor(Object key);
/**
* Returns the total number of stripes in this instance.
*/ public abstract int size();
/**
* Returns the stripes that correspond to the passed objects, in ascending (as per
* {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
* by this method are guaranteed to not deadlock each other.
*
* <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
* {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number
* of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
* are needed for a pair of them to match). Please consider carefully the implications of the
* number of stripes, the intended concurrency level, and the typical number of keys used in a
* {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
* in Bins model</a> for mathematical formulas that can be used to estimate the probability of
* collisions.
*
* @param keys arbitrary non-null keys
* @return the stripes corresponding to the objects (one per each object, derived by delegating
* to {@link #get(Object)}; may contain duplicates), in an increasing index order.
*/ public Iterable<L> bulkGet(Iterable<?> keys);
}
可以使用一下幾個靜態工廠方法創建相應的Striped實例,其中lazyWeakXXX創建的Striped實例中鎖以弱引用的方式存在(在什么樣的場景中使用呢?):
/**
* Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks.
* Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<Lock>}
*/
public static Striped<Lock> lock(int stripes);
/**
* Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks.
* Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<Lock>}
*/
public static Striped<Lock> lazyWeakLock(int stripes);
/**
* Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
* with the specified number of permits.
*
* @param stripes the minimum number of stripes (semaphores) required
* @param permits the number of permits in each semaphore
* @return a new {@code Striped<Semaphore>}
*/
public static Striped<Semaphore> semaphore(int stripes, final int permits);
/**
* Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
* with the specified number of permits.
*
* @param stripes the minimum number of stripes (semaphores) required
* @param permits the number of permits in each semaphore
* @return a new {@code Striped<Semaphore>}
*/
public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits);
/**
* Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
* read-write locks. Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<ReadWriteLock>}
*/
public static Striped<ReadWriteLock> readWriteLock(int stripes);
/**
* Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
* read-write locks. Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<ReadWriteLock>}
*/
public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes);
Striped有兩個具體實現類,CompactStriped和LazyStriped,他們都繼承自PowerOfTwoStriped(用于表達內部保存的stripes值是2的指數值)。PowerOfTwoStriped實現了indexFor()方法,它使用hash值做映射函數:
private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
/** Capacity (power of two) minus one, for fast mod evaluation */
final int mask;
@Override final int indexFor(Object key) {
int hash = smear(key.hashCode());
return hash & mask;
}
}
private static int smear(int hashCode) {
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
}
CompactStriped類使用一個數組保存所有的Lock/Semaphore/ReadWriteLock實例,在初始化時就建立所有的鎖實例;而LazyStriped類使用一個值為WeakReference的ConcurrentMap做為數據結構,index值為key,Lock/Semaphore/ReadWriteLock的WeakReference為值,所有鎖實例在用到時動態創建。在CompactStriped中創建鎖實例時對ReentrantLock/Semaphore創建采用PaddedXXX版本,不知道為何要做Pad。
Striped類實現的類圖如下:
posted on 2013-12-25 10:03
DLevin 閱讀(4216)
評論(3) 編輯 收藏 所屬分類:
Guava