<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 9, comments - 4, trackbacks - 0, articles - 21

    生產者/消費者模型

    Posted on 2007-10-27 16:23 一步一步努力向上爬 閱讀(3548) 評論(0)  編輯  收藏 所屬分類: 設計模式
    生產者/消費者模型是最基本的并發協作模型,是所有并發協作的基礎??梢赃@么說,其他的并發協作都是供求關系模型的變種。生產者,消費者之間的供求關系可以簡單的 使用管道來構造。讓我們看兩者之間的行為模式: *生產/消費模型:消費者如果無消費對象,就會阻塞直到有消費對象到達;一個消費對象僅供一個消費者消費。 *BlockingQueue: 如果隊列為空,則讀取操作將會阻塞直至隊列有新的內容到達;隊列中對象一旦被讀取,將從隊列中移走。 由此可見,阻塞隊列天然符合生產/消費模型的供求行為模式。在前面展示condition的用法的時候,曾經 用過生產者/消費者模型來舉例。那個例子如果改用BlockingQueue來寫的話就十分簡單
        ...
    BlockingQueue<String> q =new ArrayBlockingQueue<String> (10);
    ...
    public void supply () {
    q.put("product by "+Thread.currentThread().getId()+":"+(++productNo));
    }
    ...
    public void cunsume () {
    String product =q.take();
    System.out.println("consume product:"+product);
    }
    從BlockingQueue也可以看出,它和UNIX系統下面的Pipe十分相似。所不同的不過是兩點,首先,pipe是進程間的,命名管道甚至可以在非親緣進程間使用,而BlockingQueue 目前只是線程間的通信手段。當然,由于java本身強大的動態類裝載功能,這個缺陷對java程序之間的溝通限制并不大。其次,pipe是基于字節流的,而BlockingQueue是 基于對象的,這使得BlockingQueue更加易用,不過卻讓BlockingQueue綁定了Java語言,使進一步成為輕量級本地進程通信工具的難度增大。

    從前面對生產/消費模型的行為方式可以看出,生產/消費模型著重于規范消費者的行為模式,當消費速度超過生產速度的時候,消費者就會被阻塞。而對于生產者的行為則沒有 規定。當生產速度超過消費速度,生產者的行為模式可以分為以下幾種: #當積壓的產品達到一定數量時,生產者被阻塞 #無論有多少積壓產品,生產者都不會被阻塞 #不能有任何積壓產品,生產者在當前產品未被消費之前,會被阻塞 對于產品來說,也有不同的行為模式 #產品只有在被生產出來一段時間之后才能被消費(先花點時間晾晾干?) #不同類別的產品被消費的優先級不同(有鉆石的話,黃金就先放一邊吧:))

    根據生產者行為模式的不同Concurrent包提供了不同的BlockingQueue的實現 ||Queue種類||行為描述 |ArrayBlockingQueue|有限制的blocking queue,積壓的產品不得超過制訂數量 |DelayQueue|產品只有生產出一段時間之后,才能被消費,無限制的積壓產品 |LinkedBlockingQueue|同時支持有限制的blocking queue,也能支持無限制的積壓產品(數量不能超過Integer.MAX_VALUE) |PriorityBlockingQueue|不同產品的被消費優先級不同,無限制的積壓產品 |SynchronousQueue|不允許積壓產品

    這些不同的行為模式中,較為常見的除了ArrayBlockingQueue和LinkedBlockingQueue之外,PriorityBlockingQueue也非常重要。舉例來說,如果我們利用BlockingQueue 來實現一個郵件系統(著名的qmail就是利用pipe技術構建的核心架構)。我們知道郵件有不同的級別,如果當前隊列里有加急郵件需要處理的話,系統將優先處理加急郵件。 我們將以郵件傳遞為例子,說明PriorityBlockingQueue的使用方法。(注:這里的這個郵件模型只是一個非常簡陋的模型,用來說明PriorityBlockingQueue的使用方法而已, 和實際應用有很大的差距)

    首先,我們需要了解郵件傳遞過程的基本模型。在這個簡單的郵件傳送模型中涉及到下列概念 *MDA: Mail Deliver Agent, 負責接受指定用戶的郵件。 *MTA: Mail Transfer Agent, 負責接受遠程傳送過來的郵件,并將其傳送給收件人的MDA 它們和郵件用戶之間的關系如下圖

    其中MTA使用Queue傳送郵件給MDA。因此,不同的用戶會使用不同的Mail Queue。下面是MailQueue的代碼
    public class MailQueue<E> extends PriorityBlockingQueue<E>{
    public E take () throws InterruptedException {
    E ren =super.take();
    Utils._log("take:"+ren);
    return ren;
    }

    public void put (E o) {
    super.put(o);
    Utils._log("put:"+o);
    }
    }
    為了能夠根據收件人的Mail Address找到相應的Mail Queue, 使用一個MailQueueFactory來產生MailQueue
    public class MailQueueFactory {
    //A ConcurrentHashMap is used here instead of Hashtable
    static ConcurrentHashMap<MailAccount,MailQueue<Mail>> mailQueues =
    new ConcurrentHashMap<MailAccount,MailQueue<Mail>>();
    public static BlockingQueue<Mail> getMailQueue (MailDeliverer e) {
    return getMailQueue(e.getMailAccount());
    }

    public static BlockingQueue<Mail> getReceiveMailQueue (Mail m) {
    return getMailQueue (m.getReceiver());
    }

    public static BlockingQueue<Mail> getMailQueue (MailAccount e) {
    mailQueues.putIfAbsent (e,new MailQueue<Mail>());
    MailQueue<Mail> mailQ =mailQueues.get(e);

    return mailQ;
    }
    }
    需要注意的是,我們在MailQueueFactory里面使用了ConcurrentHashMap,而不是傳統的Hashtable, 雖然Hashtable是thread-safe,但是缺乏putIfAbsent這樣的 原子函數,如果不小心設計的話,會造成對同一個MailQueue重復初始化,從而導致死鎖問題。 下面看Mail的定義
    public class Mail implements Comparable{
    public final static int emergencyMail =0;
    public final static int normalMail =1;

    static AtomicInteger serialCounter =new AtomicInteger(0);

    private int mailLevel;
    private int serialNumber =serialCounter.addAndGet(1);
    private MailAccount receiver =null;
    private MailAccount sender =null;
    private Date sendTime =new Date();

    public Mail (String from, String to, int level) {
    ...
    }

    //Get functions
    ...

    public int compareTo(Object o) {
    if (o instanceof Mail) {
    return compareTo ((Mail)o);
    }
    return 0;
    }

    public int compareTo (Mail o) {
    if (o.mailLevel==this.mailLevel) { //Same level, compare the serial no
    if (o.serialNumber==this.serialNumber)
    return 0;
    if (o.serialNumber>this.serialNumber)
    return -1;
    return 1;
    }
    if (this.mailLevel==emergencyMail) return -1;
    return 1;
    }
    //Other functions
    ...
    }
    這里值得注意的是AtomicInteger的使用,它被用來做內部serialNumber的產生。另外就是compareTo函數的使用,PriorityBlockingQueue使用Comparable接口來判定元素的優先級別。這里所定義的優先級如下: *如果郵件類別相同,則序列號小的郵件有較大的優先級 *如果郵件類別不同,則emergencyMail有較大的優先級 最后是Deliver Agent 和 Transfer Agent的代碼
    public class MailDeliverer {
    MailAccount mailAccount =null;

    public MailDeliverer (MailAccount account) {
    this.mailAccount =account;
    }

    public MailAccount getMailAccount() {
    return mailAccount;
    }

    public Mail retrieveMail () {
    Mail mail =null;
    while (mail==null) {
    try {
    mail =MailQueueFactory.getMailQueue(this).take();
    }catch (Exception e) {
    Utils._log("Encounter Exception",e);
    }
    }
    return mail;
    }
    }

    public class MailTransfer {
    private static MailTransfer instance =new MailTransfer ();
    private MailTransfer () { }

    public static MailTransfer getInstance () {
    return instance;
    }

    public void processMail (Mail m) {
    BlockingQueue mailQ =MailQueueFactory.getReceiveMailQueue(m);
    try {
    mailQ.put(m);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 99久久人妻精品免费二区| 一级毛片在线播放免费| 91短视频免费在线观看| 亚洲AV一宅男色影视| 免费观看一区二区三区| 亚洲av综合avav中文| 麻豆成人久久精品二区三区免费| 亚洲av无码一区二区三区不卡| 国产精品白浆在线观看免费| 亚洲综合视频在线| 国产国产人免费视频成69堂| 亚洲av无码电影网| 噜噜嘿在线视频免费观看| 亚洲熟女综合一区二区三区| 又色又污又黄无遮挡的免费视| 皇色在线免费视频| 99久久亚洲综合精品成人网| 99久久这里只精品国产免费| 亚洲精品天堂无码中文字幕| 亚洲av无码乱码在线观看野外 | 亚洲va在线va天堂成人| 日本不卡在线观看免费v| 免费无毒a网站在线观看| 亚洲成av人影院| 69xx免费观看视频| 在线91精品亚洲网站精品成人| 亚洲精品专区在线观看| 无码午夜成人1000部免费视频| 亚洲一区免费视频| 少妇亚洲免费精品| 中文字幕成人免费视频| 亚洲国产成人精品无码区二本| 中文字幕亚洲一区二区va在线| 日本xxxx色视频在线观看免费| 亚洲五月综合网色九月色| 免费在线视频一区| 37pao成人国产永久免费视频| 午夜亚洲国产理论片二级港台二级| 亚洲精品白浆高清久久久久久 | 1000部夫妻午夜免费| 九九精品国产亚洲AV日韩|