生產者/消費者模型是最基本的并發協作模型,是所有并發協作的基礎??梢赃@么說,其他的并發協作都是供求關系模型的變種。生產者,消費者之間的供求關系可以簡單的
使用管道來構造。讓我們看兩者之間的行為模式:
*生產/消費模型:消費者如果無消費對象,就會阻塞直到有消費對象到達;一個消費對象僅供一個消費者消費。
*BlockingQueue: 如果隊列為空,則讀取操作將會阻塞直至隊列有新的內容到達;隊列中對象一旦被讀取,將從隊列中移走。
由此可見,阻塞隊列天然符合生產/消費模型的供求行為模式。在前面展示condition的用法的時候,曾經
用過生產者/消費者模型來舉例。那個例子如果改用BlockingQueue來寫的話就十分簡單
...
BlockingQueue<String> q =new ArrayBlockingQueue<String> (10);
...
public void supply () {
q.put("product by "+Thread.currentThread().getId()+":"+(++productNo));
}
...
public void cunsume () {
String product =q.take();
System.out.println("consume product:"+product);
}
從BlockingQueue也可以看出,它和UNIX系統下面的Pipe十分相似。所不同的不過是兩點,首先,pipe是進程間的,命名管道甚至可以在非親緣進程間使用,而BlockingQueue
目前只是線程間的通信手段。當然,由于java本身強大的動態類裝載功能,這個缺陷對java程序之間的溝通限制并不大。其次,pipe是基于字節流的,而BlockingQueue是
基于對象的,這使得BlockingQueue更加易用,不過卻讓BlockingQueue綁定了Java語言,使進一步成為輕量級本地進程通信工具的難度增大。
從前面對生產/消費模型的行為方式可以看出,生產/消費模型著重于規范消費者的行為模式,當消費速度超過生產速度的時候,消費者就會被阻塞。而對于生產者的行為則沒有
規定。當生產速度超過消費速度,生產者的行為模式可以分為以下幾種:
#當積壓的產品達到一定數量時,生產者被阻塞
#無論有多少積壓產品,生產者都不會被阻塞
#不能有任何積壓產品,生產者在當前產品未被消費之前,會被阻塞
對于產品來說,也有不同的行為模式
#產品只有在被生產出來一段時間之后才能被消費(先花點時間晾晾干?)
#不同類別的產品被消費的優先級不同(有鉆石的話,黃金就先放一邊吧:))
根據生產者行為模式的不同Concurrent包提供了不同的BlockingQueue的實現
||Queue種類||行為描述
|ArrayBlockingQueue|有限制的blocking queue,積壓的產品不得超過制訂數量
|DelayQueue|產品只有生產出一段時間之后,才能被消費,無限制的積壓產品
|LinkedBlockingQueue|同時支持有限制的blocking queue,也能支持無限制的積壓產品(數量不能超過Integer.MAX_VALUE)
|PriorityBlockingQueue|不同產品的被消費優先級不同,無限制的積壓產品
|SynchronousQueue|不允許積壓產品
這些不同的行為模式中,較為常見的除了ArrayBlockingQueue和LinkedBlockingQueue之外,PriorityBlockingQueue也非常重要。舉例來說,如果我們利用BlockingQueue
來實現一個郵件系統(著名的qmail就是利用pipe技術構建的核心架構)。我們知道郵件有不同的級別,如果當前隊列里有加急郵件需要處理的話,系統將優先處理加急郵件。
我們將以郵件傳遞為例子,說明PriorityBlockingQueue的使用方法。(注:這里的這個郵件模型只是一個非常簡陋的模型,用來說明PriorityBlockingQueue的使用方法而已,
和實際應用有很大的差距)
首先,我們需要了解郵件傳遞過程的基本模型。在這個簡單的郵件傳送模型中涉及到下列概念
*MDA: Mail Deliver Agent, 負責接受指定用戶的郵件。
*MTA: Mail Transfer Agent, 負責接受遠程傳送過來的郵件,并將其傳送給收件人的MDA
它們和郵件用戶之間的關系如下圖

其中MTA使用Queue傳送郵件給MDA。因此,不同的用戶會使用不同的Mail Queue。下面是MailQueue的代碼
public class MailQueue<E> extends PriorityBlockingQueue<E>{
public E take () throws InterruptedException {
E ren =super.take();
Utils._log("take:"+ren);
return ren;
}
public void put (E o) {
super.put(o);
Utils._log("put:"+o);
}
}
為了能夠根據收件人的Mail Address找到相應的Mail Queue, 使用一個MailQueueFactory來產生MailQueue
public class MailQueueFactory {
//A ConcurrentHashMap is used here instead of Hashtable
static ConcurrentHashMap<MailAccount,MailQueue<Mail>> mailQueues =
new ConcurrentHashMap<MailAccount,MailQueue<Mail>>();
public static BlockingQueue<Mail> getMailQueue (MailDeliverer e) {
return getMailQueue(e.getMailAccount());
}
public static BlockingQueue<Mail> getReceiveMailQueue (Mail m) {
return getMailQueue (m.getReceiver());
}
public static BlockingQueue<Mail> getMailQueue (MailAccount e) {
mailQueues.putIfAbsent (e,new MailQueue<Mail>());
MailQueue<Mail> mailQ =mailQueues.get(e);
return mailQ;
}
}
需要注意的是,我們在MailQueueFactory里面使用了ConcurrentHashMap,而不是傳統的Hashtable, 雖然Hashtable是thread-safe,但是缺乏putIfAbsent這樣的
原子函數,如果不小心設計的話,會造成對同一個MailQueue重復初始化,從而導致死鎖問題。
下面看Mail的定義
public class Mail implements Comparable{
public final static int emergencyMail =0;
public final static int normalMail =1;
static AtomicInteger serialCounter =new AtomicInteger(0);
private int mailLevel;
private int serialNumber =serialCounter.addAndGet(1);
private MailAccount receiver =null;
private MailAccount sender =null;
private Date sendTime =new Date();
public Mail (String from, String to, int level) {
...
}
//Get functions
...
public int compareTo(Object o) {
if (o instanceof Mail) {
return compareTo ((Mail)o);
}
return 0;
}
public int compareTo (Mail o) {
if (o.mailLevel==this.mailLevel) { //Same level, compare the serial no
if (o.serialNumber==this.serialNumber)
return 0;
if (o.serialNumber>this.serialNumber)
return -1;
return 1;
}
if (this.mailLevel==emergencyMail) return -1;
return 1;
}
//Other functions
...
}
這里值得注意的是AtomicInteger的使用,它被用來做內部serialNumber的產生。另外就是compareTo函數的使用,PriorityBlockingQueue使用Comparable接口來判定元素的優先級別。這里所定義的優先級如下:
*如果郵件類別相同,則序列號小的郵件有較大的優先級
*如果郵件類別不同,則emergencyMail有較大的優先級
最后是Deliver Agent 和 Transfer Agent的代碼
public class MailDeliverer {
MailAccount mailAccount =null;
public MailDeliverer (MailAccount account) {
this.mailAccount =account;
}
public MailAccount getMailAccount() {
return mailAccount;
}
public Mail retrieveMail () {
Mail mail =null;
while (mail==null) {
try {
mail =MailQueueFactory.getMailQueue(this).take();
}catch (Exception e) {
Utils._log("Encounter Exception",e);
}
}
return mail;
}
}
public class MailTransfer {
private static MailTransfer instance =new MailTransfer ();
private MailTransfer () { }
public static MailTransfer getInstance () {
return instance;
}
public void processMail (Mail m) {
BlockingQueue mailQ =MailQueueFactory.getReceiveMailQueue(m);
try {
mailQ.put(m);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}