ConcurrentLinkedQueue是Queue的一個線程安全實現。先來看一段文檔說明。
一個基于鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不允許使用 null 元素。
由于ConcurrentLinkedQueue只是簡單的實現了一個隊列Queue,因此從API的角度講,沒有多少值的介紹,使用起來也很簡單,和前面遇到的所有FIFO隊列都類似。出隊列只能操作頭節點,入隊列只能操作尾節點,任意節點操作就需要遍歷完整的隊列。
重點放在解釋ConcurrentLinkedQueue的原理和實現上。
在繼續探討之前,結合前面線程安全的相關知識,我來分析設計一個線程安全的隊列哪幾種方法。
第一種:使用synchronized同步隊列,就像Vector或者Collections.synchronizedList/Collection那樣。顯然這不是一個好的并發隊列,這會導致吞吐量急劇下降。
第二種:使用Lock。一種好的實現方式是使用ReentrantReadWriteLock來代替ReentrantLock提高讀取的吞吐量。但是顯然ReentrantReadWriteLock的實現更為復雜,而且更容易導致出現問題,另外也不是一種通用的實現方式,因為ReentrantReadWriteLock適合哪種讀取量遠遠大于寫入量的場合。當然了ReentrantLock是一種很好的實現,結合Condition能夠很方便的實現阻塞功能,這在后面介紹BlockingQueue的時候會具體分析。
第三種:使用CAS操作。盡管Lock的實現也用到了CAS操作,但是畢竟是間接操作,而且會導致線程掛起。一個好的并發隊列就是采用某種非阻塞算法來取得最大的吞吐量。
ConcurrentLinkedQueue采用的就是第三種策略。它采用了參考資料1 中的算法。
在鎖機制中談到過,要使用非阻塞算法來完成隊列操作,那么就需要一種“循環嘗試”的動作,就是循環操作隊列,直到成功為止,失敗就會再次嘗試。這在前面的章節中多次介紹過。
針對各種功能深入分析。
在開始之前先介紹下ConcurrentLinkedQueue的數據結構。
在上面的數據結構中,ConcurrentLinkedQueue只有頭結點、尾節點兩個元素,而對于一個節點Node而言除了保存隊列元素item外,還有一個指向下一個節點的引用next。 看起來整個數據結構還是比較簡單的。但是也有幾點是需要說明:
- 所有結構(head/tail/item/next)都是volatile類型。 這是因為ConcurrentLinkedQueue是非阻塞的,所以只有volatile才能使變量的寫操作對后續讀操作是可見的(這個是有happens-before法則保證的)。同樣也不會導致指令的重排序。
- 所有結構的操作都帶有原子操作,這是由AtomicReferenceFieldUpdater保證的,這在原子操作中介紹過。它能保證需要的時候對變量的修改操作是原子的。
- 由于隊列中任何一個節點(Node)只有下一個節點的引用,所以這個隊列是單向的,根據FIFO特性,也就是說出隊列在頭部(head),入隊列在尾部(tail)。頭部保存有進入隊列最長時間的元素,尾部是最近進入的元素。
- 沒有對隊列長度進行計數,所以隊列的長度是無限的,同時獲取隊列的長度的時間不是固定的,這需要遍歷整個隊列,并且這個計數也可能是不精確的。
- 初始情況下隊列頭和隊列尾都指向一個空節點,但是非null,這是為了方便操作,不需要每次去判斷head/tail是否為空。但是head卻不作為存取元素的節點,tail在不等于head情況下保存一個節點元素。也就是說head.item這個應該一直是空,但是tail.item卻不一定是空(如果head!=tail,那么tail.item!=null)。
對于第5點,可以從ConcurrentLinkedQueue的初始化中看到。這種頭結點也叫“偽節點”,也就是說它不是真正的節點,只是一標識,就像c中的字符數組后面的\0以后,只是用來標識結束,并不是真正字符數組的一部分。
private transient volatile Node<E> head = new Node<E>(null, null);
private transient volatile Node<E> tail = head;
有了上述5點再來解釋相關API操作就容易多了。
在上一節中列出了add/offer/remove/poll/element/peek等價方法的區別,所以這里就不再重復了。
清單1 入隊列操作
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = tail;
Node<E> s = t.getNext();
if (t == tail) {
if (s == null) {
if (t.casNext(s, n)) {
casTail(t, n);
return true;
}
} else {
casTail(t, s);
}
}
}
}
清單1 描述的是入隊列的過程。整個過程是這樣的。
- 獲取尾節點t,以及尾節點的下一個節點s。如果尾節點沒有被別人修改,也就是t==tail,進行2,否則進行1。
- 如果s不為空,也就是說此時尾節點后面還有元素,那么就需要把尾節點往后移,進行1。否則進行3。
- 修改尾節點的下一個節點為新節點,如果成功就修改尾節點,返回true。否則進行1。
從操作3中可以看到是先修改尾節點的下一個節點,然后才修改尾節點位置的,所以這才有操作2中為什么獲取到的尾節點的下一個節點不為空的原因。
特別需要說明的是,對尾節點的tail的操作需要換成臨時變量t和s,一方面是為了去掉volatile變量的可變性,另一方面是為了減少volatile的性能影響。
清單2 描述的出隊列的過程,這個過程和入隊列相似,有點意思。
頭結點是為了標識隊列起始,也為了減少空指針的比較,所以頭結點總是一個item為null的非null節點。也就是說head!=null并且head.item==null總是成立。所以實際上獲取的是head.next,一旦將頭結點head設置為head.next成功就將新head的item設置為null。至于以前就的頭結點h,h.item=null并且h.next為新的head,但是由于沒有對h的引用,所以最終會被GC回收。這就是整個出隊列的過程。
清單2 出隊列操作
public E poll() {
for (;;) {
Node<E> h = head;
Node<E> t = tail;
Node<E> first = h.getNext();
if (h == head) {
if (h == t) {
if (first == null)
return null;
else
casTail(t, first);
} else if (casHead(h, first)) {
E item = first.getItem();
if (item != null) {
first.setItem(null);
return item;
}
// else skip over deleted item, continue loop,
}
}
}
}
另外對于清單3 描述的獲取隊列大小的過程,由于沒有一個計數器來對隊列大小計數,所以獲取隊列的大小只能通過從頭到尾完整的遍歷隊列,顯然這個代價是很大的。所以通常情況下ConcurrentLinkedQueue需要和一個AtomicInteger搭配才能獲取隊列大小。后面介紹的BlockingQueue正是使用了這種思想。
清單3 遍歷隊列大小
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = p.getNext()) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
}
}
return count;
}
參考資料:
- Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
- 多線程基礎總結十一—ConcurrentLinkedQueue
- 對ConcurrentLinkedQueue進行的并發測試
©2009-2014 IMXYLZ
|求賢若渴