當(dāng)前JDK對(duì)并發(fā)編程的支持
Sun在Java5中引入了concurrent包,它對(duì)Java的并發(fā)編程提供了強(qiáng)大的支持。首先,它提供了Lock接口,可用了更細(xì)粒度的控制鎖的區(qū)域,它的實(shí)現(xiàn)類(lèi)有ReentrantLock,ReadLock,WriteLock,其中ReadLock和WriteLock共同用于實(shí)現(xiàn)ReetrantReadWriteLock(它繼承自ReadWriteLock,但是沒(méi)有實(shí)現(xiàn)Lock接口,ReadWriteLock接口也沒(méi)有繼承Lock接口)。而且,它還提供了一些常用并發(fā)場(chǎng)景下的類(lèi)工具:Semaphore、CountDownLatch和CyclicBarrier。它們個(gè)字的應(yīng)用場(chǎng)景:
- Semaphore(信號(hào)量)
有n個(gè)非線(xiàn)程安全的資源(資源池),這些資源使用一個(gè)Semaphore(計(jì)數(shù)信號(hào)量)保護(hù),每個(gè)線(xiàn)程在使用這些資源時(shí)需要首先獲得一個(gè)信號(hào)量(acquire)表示當(dāng)前資源池還有可用資源,然后線(xiàn)程從該資源池中獲取并移除一個(gè)資源,在使用完后,將該資源交回給資源池,并釋放已經(jīng)獲得信號(hào)量(release)(這里的“移除”、“交回”并不一定需要顯示操作,只是一種形象的描述,之所以這么描述是應(yīng)為這里的各個(gè)資源是一樣的,因而對(duì)一個(gè)線(xiàn)程它每次拿到的資源不一定是同一個(gè)資源,用于區(qū)分Stripe的使用場(chǎng)景),其中Pool是一種典型的應(yīng)用。
- CountDownLatch(閉鎖)
有n個(gè)Task,它們執(zhí)行完成后需要執(zhí)行另外一個(gè)收尾的Task(Aggregated Task),比如在做Report計(jì)算中,有n個(gè)Report要計(jì)算,而在所有Report計(jì)算完成后需要生成一個(gè)基于所有Report結(jié)果的一個(gè)總的Report,而這個(gè)總的Report需要等到所有Report計(jì)算出結(jié)果后才能開(kāi)始,此時(shí)就可以定義一個(gè)CountDownLatch,其初始值是n,在總的Report計(jì)算前調(diào)用CountDownLatch的await方法等待其他Report執(zhí)行完成,而其他Report在完成后都會(huì)調(diào)用CountDownLatch中的countDown方法。
- CyclicBarrier(關(guān)卡)
每個(gè)線(xiàn)程執(zhí)行完成后需要等待,直到n個(gè)線(xiàn)程都執(zhí)行完成后,才能繼續(xù)執(zhí)行,在n個(gè)線(xiàn)程執(zhí)行完成之后,而下一次執(zhí)行開(kāi)始之前可以添加自定義邏輯(通過(guò)構(gòu)建CyclicBarrier實(shí)例時(shí)傳入一個(gè)Runnable實(shí)例自定義邏輯),即在每個(gè)線(xiàn)程執(zhí)行完成后調(diào)用CyclicBarrier的await方法并等待(即所謂的關(guān)卡),當(dāng)n個(gè)線(xiàn)程都完成后,自定義的Runnable實(shí)例會(huì)自動(dòng)被執(zhí)行(如果存在這樣的Runnable實(shí)例的話(huà)),然后所有線(xiàn)程繼續(xù)下一次執(zhí)行。這個(gè)現(xiàn)實(shí)中的例子沒(méi)有想到比較合適的。。。。
- Exchanger(交換者)
Exchanger是一種特殊的CyclicBarrier,它只有兩個(gè)線(xiàn)程參與,一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,有兩個(gè)隊(duì)列共同參與,生產(chǎn)者和消費(fèi)者各自有一個(gè)隊(duì)列,其中生產(chǎn)者向它的隊(duì)列添加數(shù)據(jù),而消費(fèi)者從它包含的隊(duì)列中拿數(shù)據(jù),當(dāng)生產(chǎn)者中的隊(duì)列滿(mǎn)時(shí)調(diào)用exchange方法,傳入自己原有的隊(duì)列,期待交換得到消費(fèi)者中空的隊(duì)列;而當(dāng)消費(fèi)者中的隊(duì)列滿(mǎn)時(shí)同樣調(diào)用exchange方法,傳入自己的原有隊(duì)列,期待獲取到生產(chǎn)者中已經(jīng)填滿(mǎn)的隊(duì)列。這樣,生產(chǎn)者和消費(fèi)者可以和諧的生產(chǎn)消費(fèi),并且它們的步驟是一致的(不管哪一方比另一方快都會(huì)等待另一方)。
最后,Java5中還提供了一些atomic類(lèi)以實(shí)現(xiàn)簡(jiǎn)單場(chǎng)景下高效非lock方式的線(xiàn)程安全,以及BlockingQueue、Synchronizer、CompletionService、ConcurrentHashMap等工具類(lèi)。
在這里需要特別添加對(duì)ConcurrentHashMap的描述,因?yàn)镚uava中的Stripe就是對(duì)ConcurrentHashMap實(shí)現(xiàn)思想的抽象。在《
Java Core系列之ConcurrentHashMap實(shí)現(xiàn)(JDK 1.7)》一文中已經(jīng)詳細(xì)講述了ConcurrentHashMap的實(shí)現(xiàn),我們都知道ConcurrentHashMap的實(shí)現(xiàn)是基于Segment的,它內(nèi)部包含了多個(gè)Segment,因而它內(nèi)部的鎖是基于Segment而不是整個(gè)Map,從而減小了鎖的粒度,提升了性能。而這種分段鎖不僅僅在HashMap用到。
Stripe的應(yīng)用場(chǎng)景
雖然JDK中已經(jīng)為我們提供了很多用于并發(fā)編程的工具類(lèi),但是它并沒(méi)有提供對(duì)以下應(yīng)用場(chǎng)景的支持:有n個(gè)資源,我們希望對(duì)每個(gè)資源的操作都是線(xiàn)程安全的,這里我們不能用Semaphore,因?yàn)镾emaphore是一個(gè)池的概念,它所管理的資源是同質(zhì)的,比如從數(shù)據(jù)庫(kù)的連接池中獲取Connection操作的一種實(shí)現(xiàn)方式是內(nèi)部保存一個(gè)Semaphore變量,在每次獲取Connection時(shí),先調(diào)用Semaphore的acquire方法以保證連接池中還有空閑的Connection,如果有,則可以隨機(jī)的選擇一個(gè)Connection實(shí)例,當(dāng)Connection實(shí)例返回時(shí),該Connection實(shí)例必須從空閑列表中移除,從而保證只有一個(gè)線(xiàn)程獲取到Connection,以保證一次只有一個(gè)線(xiàn)程使用一個(gè)Connection(在Java中數(shù)據(jù)庫(kù)的Connection是線(xiàn)程安全,但是我們?cè)谑褂脮r(shí)依然會(huì)用連接池的方式創(chuàng)建多個(gè)Connection而不是在一個(gè)應(yīng)用程序中只用一個(gè)Connection是因?yàn)橛行?shù)據(jù)庫(kù)廠(chǎng)商在實(shí)現(xiàn)Connection時(shí),一個(gè)Connection內(nèi)的所有操作都時(shí)串行的,而不是并行的,比如MySQL的Connection實(shí)現(xiàn),因而為了提升并行性,采用多個(gè)Connection方式)。而這里的需求是對(duì)每個(gè)資源的操作都是線(xiàn)程安全的,比如對(duì)JDK中HashMap的實(shí)現(xiàn)采用一個(gè)數(shù)組鏈表的結(jié)構(gòu)(參考《
Java Core系列之HashMap實(shí)現(xiàn)》),如果我們將鏈表作為一個(gè)資源單位(這里的鏈表資源和上述的數(shù)據(jù)庫(kù)連接資源是不一樣的,對(duì)數(shù)據(jù)庫(kù)連接每個(gè)線(xiàn)程只需要拿到任意一個(gè)Connection實(shí)例即可,而這里的鏈表資源則是不同鏈表是不一樣的,因而對(duì)每個(gè)操作,我們需要獲取特定的鏈表,然后對(duì)鏈表以線(xiàn)程安全的方式操作,因?yàn)檫@里多個(gè)線(xiàn)程會(huì)對(duì)同一個(gè)鏈表同時(shí)操作),那么為了保證對(duì)各個(gè)單獨(dú)鏈表操作的線(xiàn)程安全(如HashMap的put操作,不考慮rehash的情況,有些其他操作需要更大粒度的線(xiàn)程安全,比如contains等),其中一種簡(jiǎn)單的實(shí)現(xiàn)方式是為每條鏈表關(guān)聯(lián)一個(gè)鎖,對(duì)每條鏈表的讀寫(xiě)操作使用其關(guān)聯(lián)鎖即可。然而如果鏈表很多,就需要使用很多鎖,會(huì)消耗很多資源,雖然它的鎖粒度最小,并發(fā)性很高。然而如果各個(gè)鏈表之間沒(méi)有很高的并發(fā)性,我們就可以讓多個(gè)鏈表共享一個(gè)鎖以減少鎖的使用量,雖然增大了鎖的粒度,但是如果這些鏈表的并發(fā)程度并不是很高,那增大的鎖的粒度對(duì)并發(fā)性并沒(méi)有很大的影響。
在實(shí)際應(yīng)用中,我們有一個(gè)Cache系統(tǒng),它包含key和payload的鍵值對(duì)(Map),在Cache中Map的實(shí)現(xiàn)已經(jīng)是線(xiàn)程安全了,然而我們不僅僅是向Cache中寫(xiě)數(shù)據(jù)要保證線(xiàn)程安全,在操作payload時(shí),也需要保證線(xiàn)程安全。因?yàn)槲覀冊(cè)贑ache中的數(shù)據(jù)量很大,為每個(gè)payload配置一個(gè)單獨(dú)的鎖顯然不現(xiàn)實(shí),也不需要因?yàn)樗鼈儧](méi)有那么高的并發(fā)行,因而我們需要一種機(jī)制將key分成不同的group,而每個(gè)group共享一個(gè)鎖(這就是ConcurrentHashMap的實(shí)現(xiàn)思路)。通過(guò)key即可獲得一個(gè)鎖,并且每個(gè)相同的key獲得的鎖實(shí)例是相同的(獲得相同鎖實(shí)例的key它們不一定相等,因?yàn)檫@是一對(duì)多的關(guān)系)。
Stripe的簡(jiǎn)單實(shí)現(xiàn)
根據(jù)以上應(yīng)用場(chǎng)景,Stripe的實(shí)現(xiàn)很簡(jiǎn)單,只需要內(nèi)部保存一個(gè)Lock數(shù)組,對(duì)每個(gè)給定的key,計(jì)算其hash值,根據(jù)hash值計(jì)算其鎖對(duì)應(yīng)的數(shù)組下標(biāo),而該下標(biāo)下的Lock實(shí)例既是和該key關(guān)聯(lián)的Lock實(shí)例。這里通過(guò)hash值把key和Lock實(shí)例關(guān)聯(lián)起來(lái),為了擴(kuò)展性,在實(shí)現(xiàn)時(shí)還可以把計(jì)算數(shù)組下標(biāo)的邏輯抽象成一個(gè)接口,用戶(hù)可以通過(guò)傳入自定義該接口的實(shí)現(xiàn)類(lèi)實(shí)例加入用戶(hù)自定義的關(guān)聯(lián)邏輯,默認(rèn)采用hash值關(guān)聯(lián)方式。
Stripe在Guava中的實(shí)現(xiàn)
在Guava中,Stripe以抽象類(lèi)的形式存在,它定義了通過(guò)給定key或index獲得相應(yīng)Lock/Semaphore/ReadWriteLock實(shí)例:
public abstract class Striped<L> {
/**
* Returns the stripe that corresponds to the passed key. It is always guaranteed that if
* {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
*
* @param key an arbitrary, non-null key
* @return the stripe that the passed key corresponds to
*/ public abstract L get(Object key);
/**
* Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
* {@code size()}, exclusively.
*
* @param index the index of the stripe to return; must be in {@code [0
size())}
* @return the stripe at the specified index
*/ public abstract L getAt(
int index);
/**
* Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
*/ abstract int indexFor(Object key);
/**
* Returns the total number of stripes in this instance.
*/ public abstract int size();
/**
* Returns the stripes that correspond to the passed objects, in ascending (as per
* {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
* by this method are guaranteed to not deadlock each other.
*
* <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
* {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number
* of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
* are needed for a pair of them to match). Please consider carefully the implications of the
* number of stripes, the intended concurrency level, and the typical number of keys used in a
* {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
* in Bins model</a> for mathematical formulas that can be used to estimate the probability of
* collisions.
*
* @param keys arbitrary non-null keys
* @return the stripes corresponding to the objects (one per each object, derived by delegating
* to {@link #get(Object)}; may contain duplicates), in an increasing index order.
*/ public Iterable<L> bulkGet(Iterable<?> keys);
}
可以使用一下幾個(gè)靜態(tài)工廠(chǎng)方法創(chuàng)建相應(yīng)的Striped實(shí)例,其中l(wèi)azyWeakXXX創(chuàng)建的Striped實(shí)例中鎖以弱引用的方式存在(在什么樣的場(chǎng)景中使用呢?):
/**
* Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks.
* Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<Lock>}
*/
public static Striped<Lock> lock(int stripes);
/**
* Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks.
* Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<Lock>}
*/
public static Striped<Lock> lazyWeakLock(int stripes);
/**
* Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
* with the specified number of permits.
*
* @param stripes the minimum number of stripes (semaphores) required
* @param permits the number of permits in each semaphore
* @return a new {@code Striped<Semaphore>}
*/
public static Striped<Semaphore> semaphore(int stripes, final int permits);
/**
* Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
* with the specified number of permits.
*
* @param stripes the minimum number of stripes (semaphores) required
* @param permits the number of permits in each semaphore
* @return a new {@code Striped<Semaphore>}
*/
public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits);
/**
* Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
* read-write locks. Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<ReadWriteLock>}
*/
public static Striped<ReadWriteLock> readWriteLock(int stripes);
/**
* Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
* read-write locks. Every lock is reentrant.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<ReadWriteLock>}
*/
public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes);
Striped有兩個(gè)具體實(shí)現(xiàn)類(lèi),CompactStriped和LazyStriped,他們都繼承自PowerOfTwoStriped(用于表達(dá)內(nèi)部保存的stripes值是2的指數(shù)值)。PowerOfTwoStriped實(shí)現(xiàn)了indexFor()方法,它使用hash值做映射函數(shù):
private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
/** Capacity (power of two) minus one, for fast mod evaluation */
final int mask;
@Override final int indexFor(Object key) {
int hash = smear(key.hashCode());
return hash & mask;
}
}
private static int smear(int hashCode) {
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
}
CompactStriped類(lèi)使用一個(gè)數(shù)組保存所有的Lock/Semaphore/ReadWriteLock實(shí)例,在初始化時(shí)就建立所有的鎖實(shí)例;而LazyStriped類(lèi)使用一個(gè)值為WeakReference的ConcurrentMap做為數(shù)據(jù)結(jié)構(gòu),index值為key,Lock/Semaphore/ReadWriteLock的WeakReference為值,所有鎖實(shí)例在用到時(shí)動(dòng)態(tài)創(chuàng)建。在CompactStriped中創(chuàng)建鎖實(shí)例時(shí)對(duì)ReentrantLock/Semaphore創(chuàng)建采用PaddedXXX版本,不知道為何要做Pad。
Striped類(lèi)實(shí)現(xiàn)的類(lèi)圖如下:
posted on 2013-12-25 10:03
DLevin 閱讀(4217)
評(píng)論(3) 編輯 收藏 所屬分類(lèi):
Guava