在《并發(fā)容器 part 4 并發(fā)隊(duì)列與Queue簡(jiǎn)介》節(jié)中的類圖中可以看到,對(duì)于Queue來(lái)說(shuō),BlockingQueue是主要的線程安全版本。這是一個(gè)可阻塞的版本,也就是允許添加/刪除元素被阻塞,直到成功為止。
BlockingQueue相對(duì)于Queue而言增加了兩個(gè)操作:put/take。下面是一張整理的表格。
看似簡(jiǎn)單的API,非常有用。這在控制隊(duì)列的并發(fā)上非常有好處。既然加入隊(duì)列和移除隊(duì)列能夠被阻塞,這在實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型上就簡(jiǎn)單多了。
清單1 是生產(chǎn)者-消費(fèi)者模型的一個(gè)例子。這個(gè)例子是一個(gè)真實(shí)的場(chǎng)景。服務(wù)端(ICE服務(wù))接受客戶端的請(qǐng)求(accept),請(qǐng)求計(jì)算此人的好友生日,然后將計(jì)算的結(jié)果存取緩存中(Memcache)中。在這個(gè)例子中采用了ExecutorService實(shí)現(xiàn)多線程的功能,盡可能的提高吞吐量,這個(gè)在后面線程池的部分會(huì)詳細(xì)說(shuō)明。目前就可以理解為new Thread(r).start()就可以了。另外這里阻塞隊(duì)列使用的是LinkedBlockingQueue。
清單1 一個(gè)生產(chǎn)者-消費(fèi)者例子
package xylz.study.concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class BirthdayService {
final int workerNumber;
final Worker[] workers;
final ExecutorService threadPool;
static volatile boolean running = true;
public BirthdayService(int workerNumber, int capacity) {
if (workerNumber <= 0) throw new IllegalArgumentException();
this.workerNumber = workerNumber;
workers = new Worker[workerNumber];
for (int i = 0; i < workerNumber; i++) {
workers[i] = new Worker(capacity);
}
//
boolean b = running;// kill the resorting
threadPool = Executors.newFixedThreadPool(workerNumber);
for (Worker w : workers) {
threadPool.submit(w);
}
}
Worker getWorker(int id) {
return workers[id % workerNumber];
}
class Worker implements Runnable {
final BlockingQueue<Integer> queue;
public Worker(int capacity) {
queue = new LinkedBlockingQueue<Integer>(capacity);
}
public void run() {
while (true) {
try {
consume(queue.take());
} catch (InterruptedException e) {
return;
}
}
}
void put(int id) {
try {
queue.put(id);
} catch (InterruptedException e) {
return;
}
}
}
public void accept(int id) {
//accept client request
getWorker(id).put(id);
}
protected void consume(int id) {
//do the work
//get the list of friends and save the birthday to cache
}
}
在清單1 中可以看到不管是put()還是get(),都拋出了一個(gè)InterruptedException。我們就從這里開(kāi)始,為什么會(huì)拋出這個(gè)異常。
上一節(jié)中提到實(shí)現(xiàn)一個(gè)并發(fā)隊(duì)列有三種方式。顯然只有第二種 Lock 才能實(shí)現(xiàn)阻塞隊(duì)列。在鎖機(jī)制中提到過(guò),Lock結(jié)合Condition就可以實(shí)現(xiàn)線程的阻塞,這在鎖機(jī)制部分的很多工具中都詳細(xì)介紹過(guò),而接下來(lái)要介紹的LinkedBlockingQueue就是采用這種方式。
LinkedBlockingQueue 原理
對(duì)比ConcurrentLinkedQueue的結(jié)構(gòu)圖,LinkedBlockingQueue多了兩個(gè)ReentrantLock和兩個(gè)Condition以及用于計(jì)數(shù)的AtomicInteger,顯然這會(huì)導(dǎo)致LinkedBlockingQueue的實(shí)現(xiàn)有點(diǎn)復(fù)雜。對(duì)照此結(jié)構(gòu),有以下幾點(diǎn)說(shuō)明:
- 但是整體上講,LinkedBlockingQueue和ConcurrentLinkedQueue的結(jié)構(gòu)類似,都是采用頭尾節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)指向下一個(gè)節(jié)點(diǎn)的結(jié)構(gòu),這表示它們?cè)诓僮魃蠎?yīng)該類似。
- LinkedBlockingQueue引入了原子計(jì)數(shù)器count,這意味著獲取隊(duì)列大小size()已經(jīng)是常量時(shí)間了,不再需要遍歷隊(duì)列。每次隊(duì)列長(zhǎng)度有變更時(shí)只需要修改count即可。
- 有了修改Node指向有了鎖,所以不需要volatile特性了。既然有了鎖Node的item為什么需要volatile在后面會(huì)詳細(xì)分析,暫且不表。
- 引入了兩個(gè)鎖,一個(gè)入隊(duì)列鎖,一個(gè)出隊(duì)列鎖。當(dāng)然同時(shí)有一個(gè)隊(duì)列不滿的Condition和一個(gè)隊(duì)列不空的Condition。其實(shí)參照鎖機(jī)制前面介紹過(guò)的生產(chǎn)者-消費(fèi)者模型就知道,入隊(duì)列就代表生產(chǎn)者,出隊(duì)列就代表消費(fèi)者。為什么需要兩個(gè)鎖?一個(gè)鎖行不行?其實(shí)一個(gè)鎖完全可以,但是一個(gè)鎖意味著入隊(duì)列和出隊(duì)列同時(shí)只能有一個(gè)在進(jìn)行,另一個(gè)必須等待其釋放鎖。而從ConcurrentLinkedQueue的實(shí)現(xiàn)原理來(lái)看,事實(shí)上head和last (ConcurrentLinkedQueue中是tail)是分離的,互相獨(dú)立的,這意味著入隊(duì)列實(shí)際上是不會(huì)修改出隊(duì)列的數(shù)據(jù)的,同時(shí)出隊(duì)列也不會(huì)修改入隊(duì)列,也就是說(shuō)這兩個(gè)操作是互不干擾的。更通俗的將,這個(gè)鎖相當(dāng)于兩個(gè)寫入鎖,入隊(duì)列是一種寫操作,操作head,出隊(duì)列是一種寫操作,操作tail。可見(jiàn)它們是無(wú)關(guān)的。但是并非完全無(wú)關(guān),后面詳細(xì)分析。
在沒(méi)有揭示入隊(duì)列和出隊(duì)列過(guò)程前,暫且猜測(cè)下實(shí)現(xiàn)原理。
根據(jù)前面學(xué)到的鎖機(jī)制原理結(jié)合ConcurrentLinkedQueue的原理,入隊(duì)列的阻塞過(guò)程大概是這樣的:
- 獲取入隊(duì)列的鎖putLock,檢測(cè)隊(duì)列大小,如果隊(duì)列已滿,那么就掛起線程,等待隊(duì)列不滿信號(hào)notFull的喚醒。
- 將元素加入到隊(duì)列尾部,同時(shí)修改隊(duì)列尾部引用last。
- 隊(duì)列大小加1。
- 釋放鎖putLock。
- 喚醒notEmpty線程(如果有掛起的出隊(duì)列線程),告訴消費(fèi)者,已經(jīng)有了新的產(chǎn)品。
對(duì)比入隊(duì)列,出隊(duì)列的阻塞過(guò)程大概是這樣的:
- 獲取出隊(duì)列的鎖takeLock,檢測(cè)隊(duì)列大小,如果隊(duì)列為空,那么就掛起線程,等待隊(duì)列不為空notEmpty的喚醒。
- 將元素從頭部移除,同時(shí)修改隊(duì)列頭部引用head。
- 隊(duì)列大小減1。
- 釋放鎖takeLock。
- 喚醒notFull線程(如果有掛起的入隊(duì)列線程),告訴生產(chǎn)者,現(xiàn)在還有空閑的空間。
下面來(lái)驗(yàn)證上面的過(guò)程。
入隊(duì)列過(guò)程(put/offer)
清單2 阻塞的入隊(duì)列過(guò)程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
清單2 描述的是入隊(duì)列的阻塞過(guò)程。可以看到和上面描述的入隊(duì)列的過(guò)程基本相同。但是也有以下幾個(gè)問(wèn)題:
- 如果在入隊(duì)列的時(shí)候線程被中斷,那么就需要發(fā)出一個(gè)notFull的信號(hào),表示下一個(gè)入隊(duì)列的線程能夠被喚醒(如果阻塞的話)。
- 入隊(duì)列成功后如果隊(duì)列不滿需要補(bǔ)一個(gè)notFull的信號(hào)。為什么?隊(duì)列不滿的時(shí)候其它入隊(duì)列的阻塞線程難道不知道么?有可能。這是因?yàn)闉榱藴p少上下文切換的次數(shù),每次喚醒一個(gè)線程(不管是入隊(duì)列還是出隊(duì)列)都是只隨機(jī)喚醒一個(gè)(notify),而不是喚醒所有的(notifyall())。這會(huì)導(dǎo)致其它阻塞的入隊(duì)列線程不能夠即使處理隊(duì)列不滿的情況。
- 如果隊(duì)列不為空并且可能有一個(gè)元素的話就喚醒一個(gè)出隊(duì)列線程。這么做說(shuō)明之前隊(duì)列一定為空,因?yàn)樵诩尤腙?duì)列之后隊(duì)列最多只能為1,那么說(shuō)明未加入之前是0,那么就可能有被阻塞的出隊(duì)列線程,所以就喚醒一個(gè)出隊(duì)列線程。特別說(shuō)明的是為什么使用一個(gè)臨時(shí)變量c,而不用count。這是因?yàn)樽x取一個(gè)count的開(kāi)銷比讀取一個(gè)臨時(shí)一個(gè)變量大,而此處c又能夠完成確認(rèn)隊(duì)列最多只有一個(gè)元素的判斷。首先c默認(rèn)為-1,如果加入隊(duì)列后獲取原子計(jì)數(shù)器的結(jié)果為0,說(shuō)明之前隊(duì)列為空,不可能消費(fèi)(出隊(duì)列),也不可能入隊(duì)列,因?yàn)榇藭r(shí)鎖還在當(dāng)前線程上,那么加入一個(gè)后隊(duì)列就不為空了,所以就可以安全的喚醒一個(gè)消費(fèi)(出對(duì)立)線程。
- 入隊(duì)列的過(guò)程允許被中斷,所以總是拋出InterruptedException 異常。
針對(duì)第2點(diǎn),特別補(bǔ)充說(shuō)明下。本來(lái)這屬于鎖機(jī)制中條件隊(duì)列的范圍,由于沒(méi)有應(yīng)用場(chǎng)景,所以當(dāng)時(shí)沒(méi)有提。
前面提高notifyall總是比notify更可靠,因?yàn)閚otify可能丟失通知,為什么不適用notifyall呢?
先解釋下notify丟失通知的問(wèn)題。
notify丟失通知問(wèn)題
假設(shè)線程A因?yàn)槟撤N條件在條件隊(duì)列中等待,同時(shí)線程B因?yàn)榱硗庖环N條件在同一個(gè)條件隊(duì)列中等待,也就是說(shuō)線程A/B都被同一個(gè)Conditon.await()掛起,但是等待的條件不同。現(xiàn)在假設(shè)線程B的線程被滿足,線程C執(zhí)行一個(gè)notify操作,此時(shí)JVM從Conditon.await()的多個(gè)線程(A/B)中隨機(jī)挑選一個(gè)喚醒,不幸的是喚醒了A。此時(shí)A的條件不滿足,于是A繼續(xù)掛起。而此時(shí)B仍然在傻傻的等待被喚醒的信號(hào)。也就是說(shuō)本來(lái)給B的通知卻被一個(gè)無(wú)關(guān)的線程持有了,真正需要通知的線程B卻沒(méi)有得到通知,而B(niǎo)仍然在等待一個(gè)已經(jīng)發(fā)生過(guò)的通知。
如果使用notifyall,則能夠避免此問(wèn)題。notifyall會(huì)喚醒所有正在等待的線程,線程C發(fā)出的通知線程A同樣能夠收到,但是由于對(duì)于A沒(méi)用,所以A繼續(xù)掛起,而線程B也收到了此通知,于是線程B正常被喚醒。
既然notifyall能夠解決單一notify丟失通知的問(wèn)題,那么為什么不總是使用notifyall替換notify呢?
假設(shè)有N個(gè)線程在條件隊(duì)列中等待,調(diào)用notifyall會(huì)喚醒所有線程,然后這N個(gè)線程競(jìng)爭(zhēng)同一個(gè)鎖,最多只有一個(gè)線程能夠得到鎖,于是其它線程又回到掛起狀態(tài)。這意味每一次喚醒操作可能帶來(lái)大量的上下文切換(如果N比較大的話),同時(shí)有大量的競(jìng)爭(zhēng)鎖的請(qǐng)求。這對(duì)于頻繁的喚醒操作而言性能上可能是一種災(zāi)難。
如果說(shuō)總是只有一個(gè)線程被喚醒后能夠拿到鎖,那么為什么不使用notify呢?所以某些情況下使用notify的性能是要高于notifyall的。
如果滿足下面的條件,可以使用單一的notify取代notifyall操作:
相同的等待者,也就是說(shuō)等待條件變量的線程操作相同,每一個(gè)從wait放回后執(zhí)行相同的邏輯,同時(shí)一個(gè)條件變量的通知至多只能喚醒一個(gè)線程。
也就是說(shuō)理論上講在put/take中如果使用sinallAll喚醒的話,那么在清單2 中的notFull.singal就是多余的。
出隊(duì)列過(guò)程(poll/take)
再來(lái)看出隊(duì)列過(guò)程。清單3 描述了出隊(duì)列的過(guò)程。可以看到這和入隊(duì)列是對(duì)稱的。從這里可以看到,出隊(duì)列使用的是和入隊(duì)列不同的鎖,所以入隊(duì)列、出隊(duì)列這兩個(gè)操作才能并行進(jìn)行。
清單3 阻塞的出隊(duì)列過(guò)程
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
為什么有異常?
有了入隊(duì)列、出隊(duì)列的過(guò)程后再來(lái)回答前面的幾個(gè)問(wèn)題。
為什么總是拋出InterruptedException 異常? 這是很大一塊內(nèi)容,其實(shí)是Java對(duì)線程中斷的處理問(wèn)題,希望能夠在系列文章的最后能夠?qū)Υ碎_(kāi)辟單獨(dú)的篇章來(lái)談?wù)劇?/p>
在鎖機(jī)制里面也是總遇到,這是因?yàn)椋琂ava里面沒(méi)有一種直接的方法中斷一個(gè)掛起的線程,所以通常情況下等于一個(gè)處于WAITING狀態(tài)的線程,允許設(shè)置一個(gè)中斷位,一旦線程檢測(cè)到這個(gè)中斷位就會(huì)從WAITING狀態(tài)退出,以一個(gè)InterruptedException 的異常返回。所以只要是對(duì)一個(gè)線程掛起操作都會(huì)導(dǎo)致InterruptedException 的可能,比如Thread.sleep()、Thread.join()、Object.wait()。盡管LockSupport.park()不會(huì)拋出一個(gè)InterruptedException 異常,但是它會(huì)將當(dāng)前線程的的interrupted狀態(tài)位置上,而對(duì)于Lock/Condition而言,當(dāng)捕捉到interrupted狀態(tài)后就認(rèn)為線程應(yīng)該終止任務(wù),所以就拋出了一個(gè)InterruptedException 異常。
又見(jiàn)volatile
還有一個(gè)不容易理解的問(wèn)題。為什么Node.item是volatile類型的?
起初我不大明白,因?yàn)閷?duì)于一個(gè)進(jìn)入隊(duì)列的Node,它的item是不變,當(dāng)且僅當(dāng)出隊(duì)列的時(shí)候會(huì)將頭結(jié)點(diǎn)元素的item 設(shè)置為null。盡管在remove(o)的時(shí)候也是設(shè)置為null,但是那時(shí)候是加了putLock/takeLock兩個(gè)鎖的,所以肯定是沒(méi)有問(wèn)題的。那么問(wèn)題出在哪?
我們知道,item的值是在put/offer的時(shí)候加入的。這時(shí)候都是有putLock鎖保證的,也就是說(shuō)它保證使用putLock鎖的讀取肯定是沒(méi)有問(wèn)題的。那么問(wèn)題就只可能出在一個(gè)不適用putLock卻需要讀取Node.item的地方。
peek操作時(shí)獲取頭結(jié)點(diǎn)的元素而不移除它。顯然他不會(huì)操作尾節(jié)點(diǎn),所以它不需要putLock鎖,也就是說(shuō)它只有takeLock鎖。清單4 描述了這個(gè)過(guò)程。
清單4 查詢隊(duì)列頭元素過(guò)程
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
清單4 描述了peek的過(guò)程,最后返回一個(gè)非null節(jié)點(diǎn)的結(jié)果是Node.item。這里讀取了Node的item值,但是整個(gè)過(guò)程卻是使用了takeLock而非putLock。換句話說(shuō)putLock對(duì)Node.item的操作,peek()線程可能不可見(jiàn)!
清單5 隊(duì)列尾部加入元素
private void insert(E x) {
last = last.next = new Node<E>(x);
}
清單5 是入隊(duì)列offer/put的一部分,這里關(guān)鍵在于last=new Node<E>(x)可能發(fā)生重排序。Node構(gòu)造函數(shù)是這樣的:Node(E x) { item = x; }。在這一步里面我們可能得到以下一種情況:
- 構(gòu)建一個(gè)Node對(duì)象n;
- 將Node的n賦給last
- 初始化n,設(shè)置item=x
在執(zhí)行步驟2 的時(shí)候一個(gè)peek線程可能拿到了新的Node n,這時(shí)候它讀取item,得到了一個(gè)null。顯然這是不可靠的。
對(duì)item采用volatile之后,JMM保證對(duì)item=x的賦值一定在last=n之前,也就是說(shuō)last得到的一個(gè)是一個(gè)已經(jīng)賦值了的新節(jié)點(diǎn)n。這就不會(huì)導(dǎo)致讀取空元素的問(wèn)題的。
出對(duì)了poll/take和peek都是使用的takeLock鎖,所以不會(huì)導(dǎo)致此問(wèn)題。
刪除操作和遍歷操作由于同時(shí)獲取了takeLock和putLock,所以也不會(huì)導(dǎo)致此問(wèn)題。
總結(jié):當(dāng)前僅當(dāng)元素加入隊(duì)列時(shí)讀取此元素才可能導(dǎo)致不一致的問(wèn)題。采用volatile正式避免此問(wèn)題。
附加功能
BlockingQueue有一個(gè)額外的功能,允許批量從隊(duì)列中異常元素。這個(gè)API是:
int drainTo(Collection<? super E> c, int maxElements); 最多從此隊(duì)列中移除給定數(shù)量的可用元素,并將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c); 移除此隊(duì)列中所有可用的元素,并將它們添加到給定 collection 中。
清單6 描述的是最多移除指定數(shù)量元素的過(guò)程。由于批量操作只需要一次獲取鎖,所以效率會(huì)比每次獲取鎖要高。但是需要說(shuō)明的,需要同時(shí)獲取takeLock/putLock兩把鎖,因?yàn)楫?dāng)移除完所有元素后這會(huì)涉及到尾節(jié)點(diǎn)的修改(last節(jié)點(diǎn)仍然指向一個(gè)已經(jīng)移走的節(jié)點(diǎn))。
由于迭代操作contains()/remove()/iterator()也是獲取了兩個(gè)鎖,所以迭代操作也是線程安全的。
清單6 批量移除操作
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
fullyLock();
try {
int n = 0;
Node<E> p = head.next;
while (p != null && n < maxElements) {
c.add(p.item);
p.item = null;
p = p.next;
++n;
}
if (n != 0) {
head.next = p;
assert head.item == null;
if (p == null)
last = head;
if (count.getAndAdd(-n) == capacity)
notFull.signalAll();
}
return n;
} finally {
fullyUnlock();
}
}
©2009-2014 IMXYLZ
|求賢若渴