<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    xylz,imxylz

    關注后端架構、中間件、分布式和并發編程

       :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

    上一節中詳細分析了LinkedBlockingQueue 的實現原理。實現一個可擴展的隊列通常有兩種方式:一種方式就像LinkedBlockingQueue一樣使用鏈表,也就是每一個元素帶有下一個元素的引用,這樣的隊列原生就是可擴展的;另外一種就是通過數組實現,一旦隊列的大小達到數組的容量的時候就將數組擴充一倍(或者一定的系數倍),從而達到擴容的目的。常見的ArrayList就屬于第二種。前面章節介紹過的HashMap確是綜合使用了這兩種方式。

    對于一個Queue而言,同樣可以使用數組實現。使用數組的好處在于各個元素之間原生就是通過數組的索引關聯起來的,一次元素之間就是有序的,在通過索引操作數組就方便多了。當然也有它不利的一面,擴容起來比較麻煩,同時刪除一個元素也比較低效。

    ArrayBlockingQueue 就是Queue的一種數組實現。

     

    ArrayBlockingQueue 原理

     

    在沒有介紹ArrayBlockingQueue原理之前可以想象下,一個數組如何實現Queue的FIFO特性。首先,數組是固定大小的,這個是毫無疑問的,那么初始化就是所有元素都為null。假設數組一段為頭,另一端為尾。那么頭和尾之間的元素就是FIFO隊列。

      1. 入隊列就將尾索引往右移動一個,新元素加入尾索引的位置;
      2. 出隊列就將頭索引往尾索引方向移動一個,同時將舊頭索引元素設為null,返回舊頭索引的元素。
      3. 一旦數組已滿,那么就不允許添加新元素(除非擴充容量)
      4. 如果尾索引移到了數組的最后(最大索引處),那么就從索引0開始,形成一個“閉合”的數組。
      5. 由于頭索引和尾索引之間的元素都不能為空(因為為空不知道take出來的元素為空還是隊列為空),所以刪除一個頭索引和尾索引之間的元素的話,需要移動刪除索引前面或者后面的所有元素,以便填充刪除索引的位置。
      6. 由于是阻塞隊列,那么顯然需要一個鎖,另外由于只是一份數據(一個數組),所以只能有一個鎖,也就是同時只能有一個線程操作隊列。

    有了上述幾點分析,設計一個可阻塞的數組隊列就比較容易了。

    image

    上圖描述的ArrayBlockingQueue的數據結構。首先有一個數組E[],用來存儲所有的元素。由于ArrayBlockingQueue最終設置為一個不可擴展大小的Queue,所以這里items就是初始化就固定大小的數組(final類型);另外有兩個索引,頭索引takeIndex,尾索引putIndex;一個隊列的大小count;要支持阻塞就必須需要一個鎖lock和兩個條件(非空、非滿),這三個元素都是不可變更類型的(final)。

    由于只有一把鎖,所以任何時刻對隊列的操作都只有一個線程,這意味著對索引和大小的操作都是線程安全的,所以可以看到這個takeIndex/putIndex/count就不需要原子操作和volatile語義了。

    清單1 描述的是一個可阻塞的添加元素過程。這與前面介紹的消費者、生產者模型相同。如果隊列已經滿了就掛起等待,否則就插入元素,同時喚醒一個隊列已空的線程。對比清單2 可以看到是完全相反的兩個過程。這在前面幾種實現生產者-消費者模型的時候都介紹過了。

     

    清單1 可阻塞的添加元素

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }

     

     清單2 可阻塞的移除元素

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

    需要注意到的是,盡管每次加入、移除一個元素使用的都是signal()通知,而不是signalAll()通知。我們參考上一節中notify替換notifyAll的原則:每一個await醒來的動作相同,每次最多喚醒一個線程來操作。顯然這里符合這兩種條件,因此使用signal要比使用signalAll要高效,并且是可靠的。

    image 上圖描述了take()/put()的索引位置示意圖。

    一開始takeIndex/putIndex都在E/0位置,然后每加入一個元素offer/put,putIndex都增加1,也就是往后邊移動一位;每移除一個元素poll/take,takeIndex都增加1,也是往后邊移動一位,顯然takeIndex總是在putIndex的“后邊”,因為當隊列中沒有元素的時候takeIndex和putIndex相等,同時當前位置也沒有元素,takeIndex也就是無法再往右邊移動了;一旦putIndex/takeIndex移動到了最后面,也就是size-1的位置(這里size是指數組的長度),那么就移動到0,繼續循環。循環的前提是數組中元素的個數小于數組的長度。整個過程就是這樣的。可見putIndex同時指向頭元素的下一個位置(如果隊列已經滿了,那么就是尾元素位置,否則就是一個元素為null的位置)。

     

    比較復雜的操作時刪除任意一個元素。清單3 描述的是刪除任意一個元素的過程。顯然刪除任何一個元素需要遍歷整個數組,也就是它的復雜度是O(n),這與根據索引從ArrayList中查找一個元素的復雜度O(1)相比開銷要大得多。參考聲明的結構圖,一旦刪除的是takeIndex位置的元素,那么只需要將takeIndex往“右邊”移動一位即可;如果刪除的是takeIndex和putIndex之間的元素怎么辦?這時候就從刪除的位置i開始,將i后面的所有元素位置都往“左”移動一位,直到putIndex為止。最終的結果是刪除位置的所有元素都“后退”了一個位置,同時putIndex也后退了一個位置。

     

    清單3 刪除任意一個元素

    public boolean remove(Object o) {
        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            for (;;) {
                if (k++ >= count)
                    return false;
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                i = inc(i);
            }

        } finally {
            lock.unlock();
        }
    }
    void removeAt(int i) {
        final E[] items = this.items;
        // if removing front item, just advance
        if (i == takeIndex) {
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
        } else {
            // slide over all others up through putIndex.
            for (;;) {
                int nexti = inc(i);
                if (nexti != putIndex) {
                    items[i] = items[nexti];
                    i = nexti;
                } else {
                    items[i] = null;
                    putIndex = i;
                    break;
                }
            }
        }
        --count;
        notFull.signal();
    }

     

    對于其他的操作,由于都是帶著Lock的操作,所以都比較簡單就不再展開了。

    下一篇中將介紹另外兩個BlockingQueue, PriorityBlockingQueue和SynchronousQueue 然后對這些常見的Queue進行一個小范圍的對比。

     



    ©2009-2014 IMXYLZ |求賢若渴
    posted on 2010-07-27 22:04 imxylz 閱讀(12938) 評論(0)  編輯  收藏 所屬分類: Java Concurrency

    ©2009-2014 IMXYLZ
    主站蜘蛛池模板: 一级毛片无遮挡免费全部| 亚洲福利视频一区二区三区| 亚洲精品无码专区| 三年片在线观看免费观看高清电影 | 中文字幕免费在线看线人动作大片 | 国内永久免费crm系统z在线 | 亚洲av无码日韩av无码网站冲| 91免费资源网站入口| 亚洲婷婷在线视频| 成年轻人网站色免费看| 亚洲色偷偷综合亚洲av78 | 亚洲视频精品在线| 免费观看国产网址你懂的| 亚洲一区在线视频观看| 成人午夜免费福利| 美女被免费网站在线视频免费| 亚洲AV中文无码乱人伦| 国产免费播放一区二区| 亚洲成人精品久久| 午夜宅男在线永久免费观看网| 亚洲av无码专区国产不乱码| 国产国产人免费人成免费视频| 国产精品偷伦视频免费观看了| 亚洲无线电影官网| 亚洲人成网站免费播放| 国产天堂亚洲精品| 日本亚洲视频在线| 一二三四影视在线看片免费| 羞羞视频免费网站入口| 亚洲av无码一区二区三区不卡 | 91精品全国免费观看青青| 亚洲无删减国产精品一区| 成人性生交视频免费观看| 又硬又粗又长又爽免费看| 亚洲天堂视频在线观看| 午夜两性色视频免费网站| 99久久免费国产精品热| 亚洲性线免费观看视频成熟| 亚洲人成网站色在线入口| h片在线免费观看| jizz在线免费播放|