<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    上善若水
    In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
    posts - 146,comments - 147,trackbacks - 0

    當前JDK對并發(fā)編程的支持

    Sun在Java5中引入了concurrent包,它對Java的并發(fā)編程提供了強大的支持。首先,它提供了Lock接口,可用了更細粒度的控制鎖的區(qū)域,它的實現(xiàn)類有ReentrantLock,ReadLock,WriteLock,其中ReadLock和WriteLock共同用于實現(xiàn)ReetrantReadWriteLock(它繼承自ReadWriteLock,但是沒有實現(xiàn)Lock接口,ReadWriteLock接口也沒有繼承Lock接口)。而且,它還提供了一些常用并發(fā)場景下的類工具:Semaphore、CountDownLatch和CyclicBarrier。它們個字的應用場景:
    1. Semaphore(信號量)
      有n個非線程安全的資源(資源池),這些資源使用一個Semaphore(計數(shù)信號量)保護,每個線程在使用這些資源時需要首先獲得一個信號量(acquire)表示當前資源池還有可用資源,然后線程從該資源池中獲取并移除一個資源,在使用完后,將該資源交回給資源池,并釋放已經(jīng)獲得信號量(release)(這里的“移除”、“交回”并不一定需要顯示操作,只是一種形象的描述,之所以這么描述是應為這里的各個資源是一樣的,因而對一個線程它每次拿到的資源不一定是同一個資源,用于區(qū)分Stripe的使用場景),其中Pool是一種典型的應用。
    2. CountDownLatch(閉鎖)
      有n個Task,它們執(zhí)行完成后需要執(zhí)行另外一個收尾的Task(Aggregated Task),比如在做Report計算中,有n個Report要計算,而在所有Report計算完成后需要生成一個基于所有Report結果的一個總的Report,而這個總的Report需要等到所有Report計算出結果后才能開始,此時就可以定義一個CountDownLatch,其初始值是n,在總的Report計算前調用CountDownLatch的await方法等待其他Report執(zhí)行完成,而其他Report在完成后都會調用CountDownLatch中的countDown方法。
    3. CyclicBarrier(關卡)
      每個線程執(zhí)行完成后需要等待,直到n個線程都執(zhí)行完成后,才能繼續(xù)執(zhí)行,在n個線程執(zhí)行完成之后,而下一次執(zhí)行開始之前可以添加自定義邏輯(通過構建CyclicBarrier實例時傳入一個Runnable實例自定義邏輯),即在每個線程執(zhí)行完成后調用CyclicBarrier的await方法并等待(即所謂的關卡),當n個線程都完成后,自定義的Runnable實例會自動被執(zhí)行(如果存在這樣的Runnable實例的話),然后所有線程繼續(xù)下一次執(zhí)行。這個現(xiàn)實中的例子沒有想到比較合適的。。。。
    4. Exchanger(交換者)
      Exchanger是一種特殊的CyclicBarrier,它只有兩個線程參與,一個生產者,一個消費者,有兩個隊列共同參與,生產者和消費者各自有一個隊列,其中生產者向它的隊列添加數(shù)據(jù),而消費者從它包含的隊列中拿數(shù)據(jù),當生產者中的隊列滿時調用exchange方法,傳入自己原有的隊列,期待交換得到消費者中空的隊列;而當消費者中的隊列滿時同樣調用exchange方法,傳入自己的原有隊列,期待獲取到生產者中已經(jīng)填滿的隊列。這樣,生產者和消費者可以和諧的生產消費,并且它們的步驟是一致的(不管哪一方比另一方快都會等待另一方)。
    最后,Java5中還提供了一些atomic類以實現(xiàn)簡單場景下高效非lock方式的線程安全,以及BlockingQueue、Synchronizer、CompletionService、ConcurrentHashMap等工具類。

    在這里需要特別添加對ConcurrentHashMap的描述,因為Guava中的Stripe就是對ConcurrentHashMap實現(xiàn)思想的抽象。在《Java Core系列之ConcurrentHashMap實現(xiàn)(JDK 1.7)》一文中已經(jīng)詳細講述了ConcurrentHashMap的實現(xiàn),我們都知道ConcurrentHashMap的實現(xiàn)是基于Segment的,它內部包含了多個Segment,因而它內部的鎖是基于Segment而不是整個Map,從而減小了鎖的粒度,提升了性能。而這種分段鎖不僅僅在HashMap用到。

    Stripe的應用場景

    雖然JDK中已經(jīng)為我們提供了很多用于并發(fā)編程的工具類,但是它并沒有提供對以下應用場景的支持:有n個資源,我們希望對每個資源的操作都是線程安全的,這里我們不能用Semaphore,因為Semaphore是一個池的概念,它所管理的資源是同質的,比如從數(shù)據(jù)庫的連接池中獲取Connection操作的一種實現(xiàn)方式是內部保存一個Semaphore變量,在每次獲取Connection時,先調用Semaphore的acquire方法以保證連接池中還有空閑的Connection,如果有,則可以隨機的選擇一個Connection實例,當Connection實例返回時,該Connection實例必須從空閑列表中移除,從而保證只有一個線程獲取到Connection,以保證一次只有一個線程使用一個Connection(在Java中數(shù)據(jù)庫的Connection是線程安全,但是我們在使用時依然會用連接池的方式創(chuàng)建多個Connection而不是在一個應用程序中只用一個Connection是因為有些數(shù)據(jù)庫廠商在實現(xiàn)Connection時,一個Connection內的所有操作都時串行的,而不是并行的,比如MySQL的Connection實現(xiàn),因而為了提升并行性,采用多個Connection方式)。而這里的需求是對每個資源的操作都是線程安全的,比如對JDK中HashMap的實現(xiàn)采用一個數(shù)組鏈表的結構(參考《Java Core系列之HashMap實現(xiàn)》),如果我們將鏈表作為一個資源單位(這里的鏈表資源和上述的數(shù)據(jù)庫連接資源是不一樣的,對數(shù)據(jù)庫連接每個線程只需要拿到任意一個Connection實例即可,而這里的鏈表資源則是不同鏈表是不一樣的,因而對每個操作,我們需要獲取特定的鏈表,然后對鏈表以線程安全的方式操作,因為這里多個線程會對同一個鏈表同時操作),那么為了保證對各個單獨鏈表操作的線程安全(如HashMap的put操作,不考慮rehash的情況,有些其他操作需要更大粒度的線程安全,比如contains等),其中一種簡單的實現(xiàn)方式是為每條鏈表關聯(lián)一個鎖,對每條鏈表的讀寫操作使用其關聯(lián)鎖即可。然而如果鏈表很多,就需要使用很多鎖,會消耗很多資源,雖然它的鎖粒度最小,并發(fā)性很高。然而如果各個鏈表之間沒有很高的并發(fā)性,我們就可以讓多個鏈表共享一個鎖以減少鎖的使用量,雖然增大了鎖的粒度,但是如果這些鏈表的并發(fā)程度并不是很高,那增大的鎖的粒度對并發(fā)性并沒有很大的影響。

    在實際應用中,我們有一個Cache系統(tǒng),它包含key和payload的鍵值對(Map),在Cache中Map的實現(xiàn)已經(jīng)是線程安全了,然而我們不僅僅是向Cache中寫數(shù)據(jù)要保證線程安全,在操作payload時,也需要保證線程安全。因為我們在Cache中的數(shù)據(jù)量很大,為每個payload配置一個單獨的鎖顯然不現(xiàn)實,也不需要因為它們沒有那么高的并發(fā)行,因而我們需要一種機制將key分成不同的group,而每個group共享一個鎖(這就是ConcurrentHashMap的實現(xiàn)思路)。通過key即可獲得一個鎖,并且每個相同的key獲得的鎖實例是相同的(獲得相同鎖實例的key它們不一定相等,因為這是一對多的關系)。

    Stripe的簡單實現(xiàn)

    根據(jù)以上應用場景,Stripe的實現(xiàn)很簡單,只需要內部保存一個Lock數(shù)組,對每個給定的key,計算其hash值,根據(jù)hash值計算其鎖對應的數(shù)組下標,而該下標下的Lock實例既是和該key關聯(lián)的Lock實例。這里通過hash值把key和Lock實例關聯(lián)起來,為了擴展性,在實現(xiàn)時還可以把計算數(shù)組下標的邏輯抽象成一個接口,用戶可以通過傳入自定義該接口的實現(xiàn)類實例加入用戶自定義的關聯(lián)邏輯,默認采用hash值關聯(lián)方式。

    Stripe在Guava中的實現(xiàn)

    在Guava中,Stripe以抽象類的形式存在,它定義了通過給定key或index獲得相應Lock/Semaphore/ReadWriteLock實例:
    public abstract class Striped<L> {
      /**
       * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
       * {
    @code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
       *
       * 
    @param key an arbitrary, non-null key
       * 
    @return the stripe that the passed key corresponds to
       
    */
      public abstract L get(Object key);

      /**
       * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
       * {
    @code size()}, exclusively.
       *
       * 
    @param index the index of the stripe to return; must be in {@code [0size())}
       * 
    @return the stripe at the specified index
       
    */
      public abstract L getAt(int index);

      /**
       * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
       
    */
      abstract int indexFor(Object key);

      /**
       * Returns the total number of stripes in this instance.
       
    */
      public abstract int size();

      /**
       * Returns the stripes that correspond to the passed objects, in ascending (as per
       * {
    @link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
       * by this method are guaranteed to not deadlock each other.
       *
       * <p>It should be noted that using a {
    @code Striped<L>} with relatively few stripes, and
       * {
    @code bulkGet(keys)} with a relative large number of keys can cause an excessive number
       * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
       * are needed for a pair of them to match). Please consider carefully the implications of the
       * number of stripes, the intended concurrency level, and the typical number of keys used in a
       * {
    @code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
       * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
       * collisions.
       *
       * 
    @param keys arbitrary non-null keys
       * 
    @return the stripes corresponding to the objects (one per each object, derived by delegating
       *         to {
    @link #get(Object)}; may contain duplicates), in an increasing index order.
       
    */
      public Iterable<L> bulkGet(Iterable<?> keys);
    }
    可以使用一下幾個靜態(tài)工廠方法創(chuàng)建相應的Striped實例,其中l(wèi)azyWeakXXX創(chuàng)建的Striped實例中鎖以弱引用的方式存在(在什么樣的場景中使用呢?):
    /**
     * Creates a {
    @code Striped<Lock>} with eagerly initialized, strongly referenced locks.
     * Every lock is reentrant.
     *
     * 
    @param stripes the minimum number of stripes (locks) required
     * 
    @return a new {@code Striped<Lock>}
     
    */
    public static Striped<Lock> lock(int stripes);
    /**
     * Creates a {
    @code Striped<Lock>} with lazily initialized, weakly referenced locks.
     * Every lock is reentrant.
     *
     * 
    @param stripes the minimum number of stripes (locks) required
     * 
    @return a new {@code Striped<Lock>}
     
    */
    public static Striped<Lock> lazyWeakLock(int stripes);
    /**
     * Creates a {
    @code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
     * with the specified number of permits.
     *
     * 
    @param stripes the minimum number of stripes (semaphores) required
     * 
    @param permits the number of permits in each semaphore
     * 
    @return a new {@code Striped<Semaphore>}
     
    */
    public static Striped<Semaphore> semaphore(int stripes, final int permits);
    /**
     * Creates a {
    @code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
     * with the specified number of permits.
     *
     * 
    @param stripes the minimum number of stripes (semaphores) required
     * 
    @param permits the number of permits in each semaphore
     * 
    @return a new {@code Striped<Semaphore>}
       
    */
    public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits);
    /**
     * Creates a {
    @code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
     * read-write locks. Every lock is reentrant.
     *
     * 
    @param stripes the minimum number of stripes (locks) required
     * 
    @return a new {@code Striped<ReadWriteLock>}
     
    */
    public static Striped<ReadWriteLock> readWriteLock(int stripes);
    /**
     * Creates a {
    @code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
     * read-write locks. Every lock is reentrant.
     *
     * 
    @param stripes the minimum number of stripes (locks) required
     * 
    @return a new {@code Striped<ReadWriteLock>}
     
    */
    public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes);

    Striped有兩個具體實現(xiàn)類,CompactStriped和LazyStriped,他們都繼承自PowerOfTwoStriped(用于表達內部保存的stripes值是2的指數(shù)值)。PowerOfTwoStriped實現(xiàn)了indexFor()方法,它使用hash值做映射函數(shù):
      private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
        /** Capacity (power of two) minus one, for fast mod evaluation */
        final int mask;

        @Override final int indexFor(Object key) {
          int hash = smear(key.hashCode());
          return hash & mask;
        }
      }
      private static int smear(int hashCode) {
        hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
        return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
      }
    CompactStriped類使用一個數(shù)組保存所有的Lock/Semaphore/ReadWriteLock實例,在初始化時就建立所有的鎖實例;而LazyStriped類使用一個值為WeakReference的ConcurrentMap做為數(shù)據(jù)結構,index值為key,Lock/Semaphore/ReadWriteLock的WeakReference為值,所有鎖實例在用到時動態(tài)創(chuàng)建。在CompactStriped中創(chuàng)建鎖實例時對ReentrantLock/Semaphore創(chuàng)建采用PaddedXXX版本,不知道為何要做Pad。

    Striped類實現(xiàn)的類圖如下:
    posted on 2013-12-25 10:03 DLevin 閱讀(4216) 評論(3)  編輯  收藏 所屬分類: Guava

    FeedBack:
    # re: 深入Guava源碼之Stripe
    2013-12-26 18:08 | acha
    請問你的額UML是怎么做出來的?  回復  更多評論
      
    # re: 深入Guava源碼之Stripe
    2014-01-02 09:45 | DLevin
    用StarUML畫的~@acha
      回復  更多評論
      
    # re: 深入Guava源碼之Stripe
    2014-01-16 11:15 | 水水水水
    sss@acha
      回復  更多評論
      

    只有注冊用戶登錄后才能發(fā)表評論。


    網(wǎng)站導航:
     
    主站蜘蛛池模板: 亚洲mv国产精品mv日本mv| 国产成人精品免费视频网页大全| 亚洲丝袜中文字幕| 久久久久亚洲AV综合波多野结衣| 天天摸天天操免费播放小视频| 亚洲高清免费在线观看| 日韩av无码免费播放| www在线观看播放免费视频日本| 亚洲午夜理论片在线观看| 亚洲国产电影在线观看| 日韩va亚洲va欧洲va国产| 亚洲欧洲自拍拍偷精品 美利坚| 在线观看永久免费视频网站| 一个人看www在线高清免费看| 最近免费mv在线电影| 热re99久久6国产精品免费| 少妇性饥渴无码A区免费 | a毛看片免费观看视频| 日本高清不卡中文字幕免费| 亚洲av成本人无码网站| 美女视频黄免费亚洲| 亚洲av永久无码精品三区在线4| 久久精品亚洲中文字幕无码麻豆| 亚洲国产精品无码久久久蜜芽| 黑人大战亚洲人精品一区| 久久国产成人精品国产成人亚洲| 日本中文一区二区三区亚洲| 国产一区二区三区免费视频| 国产成人3p视频免费观看| 日韩一区二区免费视频| 国产伦一区二区三区免费| 免费jjzz在在线播放国产| 四虎永久在线免费观看| 免费在线一级毛片| 亚洲精品尤物yw在线影院| 久久精品夜色噜噜亚洲A∨| 亚洲日韩激情无码一区| 久久亚洲国产伦理| 亚洲精品熟女国产| 亚洲综合无码一区二区痴汉| 亚洲成a人无码亚洲成av无码|