ConcurrentLinkedQueue是Queue的一個線程安全實現(xiàn)。先來看一段文檔說明。
一個基于鏈接節(jié)點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當?shù)倪x擇。此隊列不允許使用 null 元素。
由于ConcurrentLinkedQueue只是簡單的實現(xiàn)了一個隊列Queue,因此從API的角度講,沒有多少值的介紹,使用起來也很簡單,和前面遇到的所有FIFO隊列都類似。出隊列只能操作頭節(jié)點,入隊列只能操作尾節(jié)點,任意節(jié)點操作就需要遍歷完整的隊列。
重點放在解釋ConcurrentLinkedQueue的原理和實現(xiàn)上。
在繼續(xù)探討之前,結(jié)合前面線程安全的相關(guān)知識,我來分析設計一個線程安全的隊列哪幾種方法。
第一種:使用synchronized同步隊列,就像Vector或者Collections.synchronizedList/Collection那樣。顯然這不是一個好的并發(fā)隊列,這會導致吞吐量急劇下降。
第二種:使用Lock。一種好的實現(xiàn)方式是使用ReentrantReadWriteLock來代替ReentrantLock提高讀取的吞吐量。但是顯然ReentrantReadWriteLock的實現(xiàn)更為復雜,而且更容易導致出現(xiàn)問題,另外也不是一種通用的實現(xiàn)方式,因為ReentrantReadWriteLock適合哪種讀取量遠遠大于寫入量的場合。當然了ReentrantLock是一種很好的實現(xiàn),結(jié)合Condition能夠很方便的實現(xiàn)阻塞功能,這在后面介紹BlockingQueue的時候會具體分析。
第三種:使用CAS操作。盡管Lock的實現(xiàn)也用到了CAS操作,但是畢竟是間接操作,而且會導致線程掛起。一個好的并發(fā)隊列就是采用某種非阻塞算法來取得最大的吞吐量。
ConcurrentLinkedQueue采用的就是第三種策略。它采用了參考資料1 中的算法。
在鎖機制中談到過,要使用非阻塞算法來完成隊列操作,那么就需要一種“循環(huán)嘗試”的動作,就是循環(huán)操作隊列,直到成功為止,失敗就會再次嘗試。這在前面的章節(jié)中多次介紹過。
針對各種功能深入分析。
在開始之前先介紹下ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)。
在上面的數(shù)據(jù)結(jié)構(gòu)中,ConcurrentLinkedQueue只有頭結(jié)點、尾節(jié)點兩個元素,而對于一個節(jié)點Node而言除了保存隊列元素item外,還有一個指向下一個節(jié)點的引用next。 看起來整個數(shù)據(jù)結(jié)構(gòu)還是比較簡單的。但是也有幾點是需要說明:
- 所有結(jié)構(gòu)(head/tail/item/next)都是volatile類型。 這是因為ConcurrentLinkedQueue是非阻塞的,所以只有volatile才能使變量的寫操作對后續(xù)讀操作是可見的(這個是有happens-before法則保證的)。同樣也不會導致指令的重排序。
- 所有結(jié)構(gòu)的操作都帶有原子操作,這是由AtomicReferenceFieldUpdater保證的,這在原子操作中介紹過。它能保證需要的時候?qū)ψ兞康男薷牟僮魇窃拥摹?
- 由于隊列中任何一個節(jié)點(Node)只有下一個節(jié)點的引用,所以這個隊列是單向的,根據(jù)FIFO特性,也就是說出隊列在頭部(head),入隊列在尾部(tail)。頭部保存有進入隊列最長時間的元素,尾部是最近進入的元素。
- 沒有對隊列長度進行計數(shù),所以隊列的長度是無限的,同時獲取隊列的長度的時間不是固定的,這需要遍歷整個隊列,并且這個計數(shù)也可能是不精確的。
- 初始情況下隊列頭和隊列尾都指向一個空節(jié)點,但是非null,這是為了方便操作,不需要每次去判斷head/tail是否為空。但是head卻不作為存取元素的節(jié)點,tail在不等于head情況下保存一個節(jié)點元素。也就是說head.item這個應該一直是空,但是tail.item卻不一定是空(如果head!=tail,那么tail.item!=null)。
對于第5點,可以從ConcurrentLinkedQueue的初始化中看到。這種頭結(jié)點也叫“偽節(jié)點”,也就是說它不是真正的節(jié)點,只是一標識,就像c中的字符數(shù)組后面的\0以后,只是用來標識結(jié)束,并不是真正字符數(shù)組的一部分。
private transient volatile Node<E> head = new Node<E>(null, null);
private transient volatile Node<E> tail = head;
有了上述5點再來解釋相關(guān)API操作就容易多了。
在上一節(jié)中列出了add/offer/remove/poll/element/peek等價方法的區(qū)別,所以這里就不再重復了。
清單1 入隊列操作
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = tail;
Node<E> s = t.getNext();
if (t == tail) {
if (s == null) {
if (t.casNext(s, n)) {
casTail(t, n);
return true;
}
} else {
casTail(t, s);
}
}
}
}
清單1 描述的是入隊列的過程。整個過程是這樣的。
- 獲取尾節(jié)點t,以及尾節(jié)點的下一個節(jié)點s。如果尾節(jié)點沒有被別人修改,也就是t==tail,進行2,否則進行1。
- 如果s不為空,也就是說此時尾節(jié)點后面還有元素,那么就需要把尾節(jié)點往后移,進行1。否則進行3。
- 修改尾節(jié)點的下一個節(jié)點為新節(jié)點,如果成功就修改尾節(jié)點,返回true。否則進行1。
從操作3中可以看到是先修改尾節(jié)點的下一個節(jié)點,然后才修改尾節(jié)點位置的,所以這才有操作2中為什么獲取到的尾節(jié)點的下一個節(jié)點不為空的原因。
特別需要說明的是,對尾節(jié)點的tail的操作需要換成臨時變量t和s,一方面是為了去掉volatile變量的可變性,另一方面是為了減少volatile的性能影響。
清單2 描述的出隊列的過程,這個過程和入隊列相似,有點意思。
頭結(jié)點是為了標識隊列起始,也為了減少空指針的比較,所以頭結(jié)點總是一個item為null的非null節(jié)點。也就是說head!=null并且head.item==null總是成立。所以實際上獲取的是head.next,一旦將頭結(jié)點head設置為head.next成功就將新head的item設置為null。至于以前就的頭結(jié)點h,h.item=null并且h.next為新的head,但是由于沒有對h的引用,所以最終會被GC回收。這就是整個出隊列的過程。
清單2 出隊列操作
public E poll() {
for (;;) {
Node<E> h = head;
Node<E> t = tail;
Node<E> first = h.getNext();
if (h == head) {
if (h == t) {
if (first == null)
return null;
else
casTail(t, first);
} else if (casHead(h, first)) {
E item = first.getItem();
if (item != null) {
first.setItem(null);
return item;
}
// else skip over deleted item, continue loop,
}
}
}
}
另外對于清單3 描述的獲取隊列大小的過程,由于沒有一個計數(shù)器來對隊列大小計數(shù),所以獲取隊列的大小只能通過從頭到尾完整的遍歷隊列,顯然這個代價是很大的。所以通常情況下ConcurrentLinkedQueue需要和一個AtomicInteger搭配才能獲取隊列大小。后面介紹的BlockingQueue正是使用了這種思想。
清單3 遍歷隊列大小
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = p.getNext()) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
}
}
return count;
}
參考資料:
- Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
- 多線程基礎(chǔ)總結(jié)十一—ConcurrentLinkedQueue
- 對ConcurrentLinkedQueue進行的并發(fā)測試
©2009-2014 IMXYLZ
|求賢若渴