<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    xylz,imxylz

    關注后端架構、中間件、分布式和并發編程

       :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

    在《并發容器 part 4 并發隊列與Queue簡介》節中的類圖中可以看到,對于Queue來說,BlockingQueue是主要的線程安全版本。這是一個可阻塞的版本,也就是允許添加/刪除元素被阻塞,直到成功為止。

    BlockingQueue相對于Queue而言增加了兩個操作:put/take。下面是一張整理的表格。

    image 看似簡單的API,非常有用。這在控制隊列的并發上非常有好處。既然加入隊列和移除隊列能夠被阻塞,這在實現生產者-消費者模型上就簡單多了。

    清單1 是生產者-消費者模型的一個例子。這個例子是一個真實的場景。服務端(ICE服務)接受客戶端的請求(accept),請求計算此人的好友生日,然后將計算的結果存取緩存中(Memcache)中。在這個例子中采用了ExecutorService實現多線程的功能,盡可能的提高吞吐量,這個在后面線程池的部分會詳細說明。目前就可以理解為new Thread(r).start()就可以了。另外這里阻塞隊列使用的是LinkedBlockingQueue。

    清單1 一個生產者-消費者例子

    package xylz.study.concurrency;

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;

    public class BirthdayService {

        final int workerNumber;

        final Worker[] workers;

        final ExecutorService threadPool;

        static volatile boolean running = true;

        public BirthdayService(int workerNumber, int capacity) {
            if (workerNumber <= 0) throw new IllegalArgumentException();
            this.workerNumber = workerNumber;
            workers = new Worker[workerNumber];
            for (int i = 0; i < workerNumber; i++) {
                workers[i] = new Worker(capacity);
            }
            //
            boolean b = running;// kill the resorting
            threadPool = Executors.newFixedThreadPool(workerNumber);
            for (Worker w : workers) {
                threadPool.submit(w);
            }
        }

        Worker getWorker(int id) {
            return workers[id % workerNumber];

        }

        class Worker implements Runnable {

            final BlockingQueue<Integer> queue;

            public Worker(int capacity) {
                queue = new LinkedBlockingQueue<Integer>(capacity);
            }

            public void run() {
                while (true) {
                    try {
                        consume(queue.take());
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }

            void put(int id) {
                try {
                    queue.put(id);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        public void accept(int id) {
            //accept client request
            getWorker(id).put(id);
        }

        protected void consume(int id) {
            //do the work
            //get the list of friends and save the birthday to cache
        }
    }

     

    在清單1 中可以看到不管是put()還是get(),都拋出了一個InterruptedException。我們就從這里開始,為什么會拋出這個異常。

     

    上一節中提到實現一個并發隊列有三種方式。顯然只有第二種 Lock 才能實現阻塞隊列。在鎖機制中提到過,Lock結合Condition就可以實現線程的阻塞,這在鎖機制部分的很多工具中都詳細介紹過,而接下來要介紹的LinkedBlockingQueue就是采用這種方式。

     

    LinkedBlockingQueue 原理

     

    image 對比ConcurrentLinkedQueue的結構圖,LinkedBlockingQueue多了兩個ReentrantLock和兩個Condition以及用于計數的AtomicInteger,顯然這會導致LinkedBlockingQueue的實現有點復雜。對照此結構,有以下幾點說明:

      1. 但是整體上講,LinkedBlockingQueue和ConcurrentLinkedQueue的結構類似,都是采用頭尾節點,每個節點指向下一個節點的結構,這表示它們在操作上應該類似。
      2. LinkedBlockingQueue引入了原子計數器count,這意味著獲取隊列大小size()已經是常量時間了,不再需要遍歷隊列。每次隊列長度有變更時只需要修改count即可。
      3. 有了修改Node指向有了鎖,所以不需要volatile特性了。既然有了鎖Node的item為什么需要volatile在后面會詳細分析,暫且不表。
      4. 引入了兩個鎖,一個入隊列鎖,一個出隊列鎖。當然同時有一個隊列不滿的Condition和一個隊列不空的Condition。其實參照鎖機制前面介紹過的生產者-消費者模型就知道,入隊列就代表生產者,出隊列就代表消費者。為什么需要兩個鎖?一個鎖行不行?其實一個鎖完全可以,但是一個鎖意味著入隊列和出隊列同時只能有一個在進行,另一個必須等待其釋放鎖。而從ConcurrentLinkedQueue的實現原理來看,事實上head和last (ConcurrentLinkedQueue中是tail)是分離的,互相獨立的,這意味著入隊列實際上是不會修改出隊列的數據的,同時出隊列也不會修改入隊列,也就是說這兩個操作是互不干擾的。更通俗的將,這個鎖相當于兩個寫入鎖,入隊列是一種寫操作,操作head,出隊列是一種寫操作,操作tail。可見它們是無關的。但是并非完全無關,后面詳細分析。

     

    在沒有揭示入隊列和出隊列過程前,暫且猜測下實現原理。

    根據前面學到的鎖機制原理結合ConcurrentLinkedQueue的原理,入隊列的阻塞過程大概是這樣的:

      1. 獲取入隊列的鎖putLock,檢測隊列大小,如果隊列已滿,那么就掛起線程,等待隊列不滿信號notFull的喚醒。
      2. 將元素加入到隊列尾部,同時修改隊列尾部引用last。
      3. 隊列大小加1。
      4. 釋放鎖putLock。
      5. 喚醒notEmpty線程(如果有掛起的出隊列線程),告訴消費者,已經有了新的產品。

     

    對比入隊列,出隊列的阻塞過程大概是這樣的:

      1. 獲取出隊列的鎖takeLock,檢測隊列大小,如果隊列為空,那么就掛起線程,等待隊列不為空notEmpty的喚醒。
      2. 將元素從頭部移除,同時修改隊列頭部引用head。
      3. 隊列大小減1。
      4. 釋放鎖takeLock。
      5. 喚醒notFull線程(如果有掛起的入隊列線程),告訴生產者,現在還有空閑的空間。

    下面來驗證上面的過程。

     

    入隊列過程(put/offer)

     

    清單2 阻塞的入隊列過程

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    清單2 描述的是入隊列的阻塞過程。可以看到和上面描述的入隊列的過程基本相同。但是也有以下幾個問題:

      1. 如果在入隊列的時候線程被中斷,那么就需要發出一個notFull的信號,表示下一個入隊列的線程能夠被喚醒(如果阻塞的話)。
      2. 入隊列成功后如果隊列不滿需要補一個notFull的信號。為什么?隊列不滿的時候其它入隊列的阻塞線程難道不知道么?有可能。這是因為為了減少上下文切換的次數,每次喚醒一個線程(不管是入隊列還是出隊列)都是只隨機喚醒一個(notify),而不是喚醒所有的(notifyall())。這會導致其它阻塞的入隊列線程不能夠即使處理隊列不滿的情況。
      3. 如果隊列不為空并且可能有一個元素的話就喚醒一個出隊列線程。這么做說明之前隊列一定為空,因為在加入隊列之后隊列最多只能為1,那么說明未加入之前是0,那么就可能有被阻塞的出隊列線程,所以就喚醒一個出隊列線程。特別說明的是為什么使用一個臨時變量c,而不用count。這是因為讀取一個count的開銷比讀取一個臨時一個變量大,而此處c又能夠完成確認隊列最多只有一個元素的判斷。首先c默認為-1,如果加入隊列后獲取原子計數器的結果為0,說明之前隊列為空,不可能消費(出隊列),也不可能入隊列,因為此時鎖還在當前線程上,那么加入一個后隊列就不為空了,所以就可以安全的喚醒一個消費(出對立)線程。
      4. 入隊列的過程允許被中斷,所以總是拋出InterruptedException 異常。

    針對第2點,特別補充說明下。本來這屬于鎖機制中條件隊列的范圍,由于沒有應用場景,所以當時沒有提。

    前面提高notifyall總是比notify更可靠,因為notify可能丟失通知,為什么不適用notifyall呢?

    先解釋下notify丟失通知的問題。

     

    notify丟失通知問題

    假設線程A因為某種條件在條件隊列中等待,同時線程B因為另外一種條件在同一個條件隊列中等待,也就是說線程A/B都被同一個Conditon.await()掛起,但是等待的條件不同。現在假設線程B的線程被滿足,線程C執行一個notify操作,此時JVM從Conditon.await()的多個線程(A/B)中隨機挑選一個喚醒,不幸的是喚醒了A。此時A的條件不滿足,于是A繼續掛起。而此時B仍然在傻傻的等待被喚醒的信號。也就是說本來給B的通知卻被一個無關的線程持有了,真正需要通知的線程B卻沒有得到通知,而B仍然在等待一個已經發生過的通知。

    如果使用notifyall,則能夠避免此問題。notifyall會喚醒所有正在等待的線程,線程C發出的通知線程A同樣能夠收到,但是由于對于A沒用,所以A繼續掛起,而線程B也收到了此通知,于是線程B正常被喚醒。

     

    既然notifyall能夠解決單一notify丟失通知的問題,那么為什么不總是使用notifyall替換notify呢?

    假設有N個線程在條件隊列中等待,調用notifyall會喚醒所有線程,然后這N個線程競爭同一個鎖,最多只有一個線程能夠得到鎖,于是其它線程又回到掛起狀態。這意味每一次喚醒操作可能帶來大量的上下文切換(如果N比較大的話),同時有大量的競爭鎖的請求。這對于頻繁的喚醒操作而言性能上可能是一種災難。

    如果說總是只有一個線程被喚醒后能夠拿到鎖,那么為什么不使用notify呢?所以某些情況下使用notify的性能是要高于notifyall的。

    如果滿足下面的條件,可以使用單一的notify取代notifyall操作:

    相同的等待者,也就是說等待條件變量的線程操作相同,每一個從wait放回后執行相同的邏輯,同時一個條件變量的通知至多只能喚醒一個線程。

    也就是說理論上講在put/take中如果使用sinallAll喚醒的話,那么在清單2 中的notFull.singal就是多余的。

     

    出隊列過程(poll/take)

     

    再來看出隊列過程。清單3 描述了出隊列的過程。可以看到這和入隊列是對稱的。從這里可以看到,出隊列使用的是和入隊列不同的鎖,所以入隊列、出隊列這兩個操作才能并行進行。

    清單3 阻塞的出隊列過程

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }

            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

     

    為什么有異常?

     

    有了入隊列、出隊列的過程后再來回答前面的幾個問題。

    為什么總是拋出InterruptedException 異常? 這是很大一塊內容,其實是Java對線程中斷的處理問題,希望能夠在系列文章的最后能夠對此開辟單獨的篇章來談談。

    在鎖機制里面也是總遇到,這是因為,Java里面沒有一種直接的方法中斷一個掛起的線程,所以通常情況下等于一個處于WAITING狀態的線程,允許設置一個中斷位,一旦線程檢測到這個中斷位就會從WAITING狀態退出,以一個InterruptedException 的異常返回。所以只要是對一個線程掛起操作都會導致InterruptedException 的可能,比如Thread.sleep()、Thread.join()、Object.wait()。盡管LockSupport.park()不會拋出一個InterruptedException 異常,但是它會將當前線程的的interrupted狀態位置上,而對于Lock/Condition而言,當捕捉到interrupted狀態后就認為線程應該終止任務,所以就拋出了一個InterruptedException 異常。

     

    又見volatile

     

    還有一個不容易理解的問題。為什么Node.item是volatile類型的?

    起初我不大明白,因為對于一個進入隊列的Node,它的item是不變,當且僅當出隊列的時候會將頭結點元素的item 設置為null。盡管在remove(o)的時候也是設置為null,但是那時候是加了putLock/takeLock兩個鎖的,所以肯定是沒有問題的。那么問題出在哪?

    我們知道,item的值是在put/offer的時候加入的。這時候都是有putLock鎖保證的,也就是說它保證使用putLock鎖的讀取肯定是沒有問題的。那么問題就只可能出在一個不適用putLock卻需要讀取Node.item的地方。

    peek操作時獲取頭結點的元素而不移除它。顯然他不會操作尾節點,所以它不需要putLock鎖,也就是說它只有takeLock鎖。清單4 描述了這個過程。

    清單4 查詢隊列頭元素過程

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

    清單4 描述了peek的過程,最后返回一個非null節點的結果是Node.item。這里讀取了Node的item值,但是整個過程卻是使用了takeLock而非putLock。換句話說putLock對Node.item的操作,peek()線程可能不可見!

    清單5 隊列尾部加入元素

    private void insert(E x) {
        last = last.next = new Node<E>(x);
    }

     

    清單5 是入隊列offer/put的一部分,這里關鍵在于last=new Node<E>(x)可能發生重排序。Node構造函數是這樣的:Node(E x) { item = x; }。在這一步里面我們可能得到以下一種情況:

      1. 構建一個Node對象n;
      2. 將Node的n賦給last
      3. 初始化n,設置item=x

    在執行步驟2 的時候一個peek線程可能拿到了新的Node n,這時候它讀取item,得到了一個null。顯然這是不可靠的。

    對item采用volatile之后,JMM保證對item=x的賦值一定在last=n之前,也就是說last得到的一個是一個已經賦值了的新節點n。這就不會導致讀取空元素的問題的。

    出對了poll/take和peek都是使用的takeLock鎖,所以不會導致此問題。

    刪除操作和遍歷操作由于同時獲取了takeLock和putLock,所以也不會導致此問題。

    總結:當前僅當元素加入隊列時讀取此元素才可能導致不一致的問題。采用volatile正式避免此問題。

     

    附加功能

     

    BlockingQueue有一個額外的功能,允許批量從隊列中異常元素。這個API是:

    int drainTo(Collection<? super E> c, int maxElements); 最多從此隊列中移除給定數量的可用元素,并將這些元素添加到給定 collection 中。

    int drainTo(Collection<? super E> c); 移除此隊列中所有可用的元素,并將它們添加到給定 collection 中。

    清單6 描述的是最多移除指定數量元素的過程。由于批量操作只需要一次獲取鎖,所以效率會比每次獲取鎖要高。但是需要說明的,需要同時獲取takeLock/putLock兩把鎖,因為當移除完所有元素后這會涉及到尾節點的修改(last節點仍然指向一個已經移走的節點)。

    由于迭代操作contains()/remove()/iterator()也是獲取了兩個鎖,所以迭代操作也是線程安全的。

     

    清單6 批量移除操作

    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        fullyLock();
        try {
            int n = 0;
            Node<E> p = head.next;
            while (p != null && n < maxElements) {
                c.add(p.item);
                p.item = null;
                p = p.next;
                ++n;
            }
            if (n != 0) {
                head.next = p;
                assert head.item == null;
                if (p == null)
                    last = head;
                if (count.getAndAdd(-n) == capacity)
                    notFull.signalAll();
            }
            return n;
        } finally {
            fullyUnlock();
        }
    }

     



    ©2009-2014 IMXYLZ |求賢若渴
    posted on 2010-07-24 00:02 imxylz 閱讀(19641) 評論(6)  編輯  收藏 所屬分類: Java Concurrency

    評論

    # re: 深入淺出 Java Concurrency (21): 并發容器 part 6 可阻塞的BlockingQueue (1) 2011-02-15 10:00 hixiaomin
    BlockingQueue接口四種形式操作,中文API有說明:
    ”BlockingQueue 方法以四種形式出現,對于不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決于操作),第三種是在操作可以成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。“
    很顯然,依據條件是:"不能立即滿足但可能在將來某一時刻可以滿足的操作"所產生的不同輸出。
    這樣就很容易理解最上面表格所表述內容了。  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (21): 并發容器 part 6 可阻塞的BlockingQueue (1) 2011-02-15 10:07 xylz
    @hixiaomin
    嗯,理解非常不錯!  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (21): 并發容器 part 6 可阻塞的BlockingQueue (1)[未登錄] 2011-07-19 13:54 小牛犢
    你的版本是不是太老了,Node.item不是volatile,drainTo也只用了一把takeLock。其他感覺還不錯。
      回復  更多評論
      

    # re: 深入淺出 Java Concurrency (21): 并發容器 part 6 可阻塞的BlockingQueue (1) 2011-08-16 20:51 yintiefu
    @小牛犢
    你的是什么版本的 我的1.6.0_21.跟LZ一樣  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (21): 并發容器 part 6 可阻塞的BlockingQueue (1)[未登錄] 2011-09-05 15:47 xxx
    @yintiefu
    我用的是1.6.0_24的, 有較大的改變  回復  更多評論
      

    # re: 深入淺出 Java Concurrency (21): 并發容器 part 6 可阻塞的BlockingQueue (1)[未登錄] 2013-12-25 17:31 forever
    這里對于volatile的分析,覺得老主多慮了.之所以有volatile,是因為之前需要借助volatile的數據一致性,那時可能還沒有使用lock加鎖,但后面有了lock之后,lock之內的程序也是保證happens-before的,所以Dong Lea忘了把volatile拿掉,目前在1.6.0.27之后已經沒有了.如果按照樓主的分析,那豈不這個類有明顯的Bug.  回復  更多評論
      


    ©2009-2014 IMXYLZ
    主站蜘蛛池模板: 日本一道高清不卡免费| 国色精品va在线观看免费视频| 狠狠色香婷婷久久亚洲精品| 亚洲白色白色永久观看| 亚洲第一页在线视频| 亚洲美女中文字幕| 亚洲六月丁香六月婷婷蜜芽| 亚洲的天堂av无码| 亚洲精品一二三区| 亚洲日本天堂在线| 国产亚洲综合久久| 一级特级aaaa毛片免费观看| 黄色网址免费在线观看| 十八禁视频在线观看免费无码无遮挡骂过| 99麻豆久久久国产精品免费 | 国产亚洲无线码一区二区| 亚洲va久久久噜噜噜久久| 亚洲另类激情综合偷自拍| 亚洲人成综合在线播放| 亚洲日韩国产AV无码无码精品| 亚洲狠狠色丁香婷婷综合| 特级毛片aaaa级毛片免费| jzzjzz免费观看大片免费| 国产一级淫片a免费播放口| 中文字幕免费视频| 成年人免费观看视频网站| 亚洲AV网站在线观看| 亚洲色精品vr一区二区三区| 亚洲天天做日日做天天欢毛片| 亚洲精品国产啊女成拍色拍| 亚洲色成人四虎在线观看 | 亚洲国产精品久久久天堂| 久久综合亚洲色HEZYO社区| 亚洲一卡2卡三卡4卡无卡下载| 直接进入免费看黄的网站| 国产在线观看xxxx免费| ww在线观视频免费观看| 国产人成免费视频| 亚洲Av综合色区无码专区桃色| 激情内射亚洲一区二区三区爱妻| 国产精品亚洲精品日韩动图|