在上一節中詳細分析了LinkedBlockingQueue 的實現原理。實現一個可擴展的隊列通常有兩種方式:一種方式就像LinkedBlockingQueue一樣使用鏈表,也就是每一個元素帶有下一個元素的引用,這樣的隊列原生就是可擴展的;另外一種就是通過數組實現,一旦隊列的大小達到數組的容量的時候就將數組擴充一倍(或者一定的系數倍),從而達到擴容的目的。常見的ArrayList就屬于第二種。前面章節介紹過的HashMap確是綜合使用了這兩種方式。
對于一個Queue而言,同樣可以使用數組實現。使用數組的好處在于各個元素之間原生就是通過數組的索引關聯起來的,一次元素之間就是有序的,在通過索引操作數組就方便多了。當然也有它不利的一面,擴容起來比較麻煩,同時刪除一個元素也比較低效。
ArrayBlockingQueue 就是Queue的一種數組實現。
ArrayBlockingQueue 原理
在沒有介紹ArrayBlockingQueue原理之前可以想象下,一個數組如何實現Queue的FIFO特性。首先,數組是固定大小的,這個是毫無疑問的,那么初始化就是所有元素都為null。假設數組一段為頭,另一端為尾。那么頭和尾之間的元素就是FIFO隊列。
- 入隊列就將尾索引往右移動一個,新元素加入尾索引的位置;
- 出隊列就將頭索引往尾索引方向移動一個,同時將舊頭索引元素設為null,返回舊頭索引的元素。
- 一旦數組已滿,那么就不允許添加新元素(除非擴充容量)
- 如果尾索引移到了數組的最后(最大索引處),那么就從索引0開始,形成一個“閉合”的數組。
- 由于頭索引和尾索引之間的元素都不能為空(因為為空不知道take出來的元素為空還是隊列為空),所以刪除一個頭索引和尾索引之間的元素的話,需要移動刪除索引前面或者后面的所有元素,以便填充刪除索引的位置。
- 由于是阻塞隊列,那么顯然需要一個鎖,另外由于只是一份數據(一個數組),所以只能有一個鎖,也就是同時只能有一個線程操作隊列。
有了上述幾點分析,設計一個可阻塞的數組隊列就比較容易了。
上圖描述的ArrayBlockingQueue的數據結構。首先有一個數組E[],用來存儲所有的元素。由于ArrayBlockingQueue最終設置為一個不可擴展大小的Queue,所以這里items就是初始化就固定大小的數組(final類型);另外有兩個索引,頭索引takeIndex,尾索引putIndex;一個隊列的大小count;要支持阻塞就必須需要一個鎖lock和兩個條件(非空、非滿),這三個元素都是不可變更類型的(final)。
由于只有一把鎖,所以任何時刻對隊列的操作都只有一個線程,這意味著對索引和大小的操作都是線程安全的,所以可以看到這個takeIndex/putIndex/count就不需要原子操作和volatile語義了。
清單1 描述的是一個可阻塞的添加元素過程。這與前面介紹的消費者、生產者模型相同。如果隊列已經滿了就掛起等待,否則就插入元素,同時喚醒一個隊列已空的線程。對比清單2 可以看到是完全相反的兩個過程。這在前面幾種實現生產者-消費者模型的時候都介紹過了。
清單1 可阻塞的添加元素
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
清單2 可阻塞的移除元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
需要注意到的是,盡管每次加入、移除一個元素使用的都是signal()通知,而不是signalAll()通知。我們參考上一節中notify替換notifyAll的原則:每一個await醒來的動作相同,每次最多喚醒一個線程來操作。顯然這里符合這兩種條件,因此使用signal要比使用signalAll要高效,并且是可靠的。
上圖描述了take()/put()的索引位置示意圖。
一開始takeIndex/putIndex都在E/0位置,然后每加入一個元素offer/put,putIndex都增加1,也就是往后邊移動一位;每移除一個元素poll/take,takeIndex都增加1,也是往后邊移動一位,顯然takeIndex總是在putIndex的“后邊”,因為當隊列中沒有元素的時候takeIndex和putIndex相等,同時當前位置也沒有元素,takeIndex也就是無法再往右邊移動了;一旦putIndex/takeIndex移動到了最后面,也就是size-1的位置(這里size是指數組的長度),那么就移動到0,繼續循環。循環的前提是數組中元素的個數小于數組的長度。整個過程就是這樣的。可見putIndex同時指向頭元素的下一個位置(如果隊列已經滿了,那么就是尾元素位置,否則就是一個元素為null的位置)。
比較復雜的操作時刪除任意一個元素。清單3 描述的是刪除任意一個元素的過程。顯然刪除任何一個元素需要遍歷整個數組,也就是它的復雜度是O(n),這與根據索引從ArrayList中查找一個元素的復雜度O(1)相比開銷要大得多。參考聲明的結構圖,一旦刪除的是takeIndex位置的元素,那么只需要將takeIndex往“右邊”移動一位即可;如果刪除的是takeIndex和putIndex之間的元素怎么辦?這時候就從刪除的位置i開始,將i后面的所有元素位置都往“左”移動一位,直到putIndex為止。最終的結果是刪除位置的所有元素都“后退”了一個位置,同時putIndex也后退了一個位置。
清單3 刪除任意一個元素
public boolean remove(Object o) {
if (o == null) return false;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
for (;;) {
if (k++ >= count)
return false;
if (o.equals(items[i])) {
removeAt(i);
return true;
}
i = inc(i);
}
} finally {
lock.unlock();
}
}
void removeAt(int i) {
final E[] items = this.items;
// if removing front item, just advance
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// slide over all others up through putIndex.
for (;;) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null;
putIndex = i;
break;
}
}
}
--count;
notFull.signal();
}
對于其他的操作,由于都是帶著Lock的操作,所以都比較簡單就不再展開了。
下一篇中將介紹另外兩個BlockingQueue, PriorityBlockingQueue和SynchronousQueue 然后對這些常見的Queue進行一個小范圍的對比。
©2009-2014 IMXYLZ
|求賢若渴