產者-消費者方案是多線程應用程序開發中最常用的構造之一 ― 因此困難也在于此。因為在一個應用程序中可以多次重復生產者-消費者行為,其代碼也可以如此。軟件開發人員 Ze'ev Bubis 和 Saffi Hartal 創建了
Consumer
類,該類通過在一些多線程應用程序中促進代碼重用以及簡化代碼調試和維護來解決這個問題。請通過單擊本文頂部或底部的
討論來參與本文的
論壇,與作者和其他讀者分享您的想法。
多線程應用程序通常利用生產者-消費者編程方案,其中由生產者線程創建重復性作業,將其傳遞給作業隊列,然后由消費者線程處理作業。雖然這種編程方法很有用,但是它通常導致重復的代碼,這對于調試和維護可能是真正的問題。
為了解決這個問題并促進代碼重用,我們創建了 Consumer
類。 Consumer
類包含所有用于作業隊列和消費者線程的代碼,以及使這兩者能夠結合在一起的邏輯。這使我們可以專注于業務邏輯 ― 關于應該如何處理作業的細節 ― 而不是專注于編寫大量冗余的代碼。同時,它還使得調試多線程應用程序的任務變得更為容易。
在本文中,我們將簡單觀察一下多線程應用程序開發中公共線程用法,同時,解釋一下生產者-消費者編程方案,并研究一個實際的示例來向您演示 Consumer
類是如何工作的。請注意,對于多線程應用程序開發或消費者-生產者方案,本文不作深入介紹;有關那些主題,請參閱 參考資料獲取文章的清單。
多線程基礎知識
多線程是一種使應用程序能同時處理多個操作的編程技術。通常有兩種不同類型的多線程操作使用多個線程:
- 適時事件,當作業必須在特定的時間或在特定的間隔內調度執行時
- 后臺處理,當后臺事件必須與當前執行流并行處理或執行時
適時事件的示例包括程序提醒、超時事件以及諸如輪詢和刷新之類的重復性操作。后臺處理的示例包括等待發送的包或等待處理的已接收的消息。
生產者-消費者關系
生產者-消費者方案很適合于后臺處理類別的情況。這些情況通常圍繞一個作業“生產者”方和一個作業“消費者”方。當然,關于作業并行執行還有其它考慮事項。在大多數情況下,對于使用同一資源的作業,應以“先來先服務”的方式按順序處理,這可以通過使用單線程的消費者輕松實現。通過使用這種方法,我們使用單個線程來訪問單個資源,而不是用多個線程來訪問單個資源。
要啟用標準消費者,當作業到來時創建一個作業隊列來存儲所有作業。生產者線程通過將新對象添加到消費者隊列來交付這個要處理的新對象。然后消費者線程從隊列取出每個對象,并依次處理。當隊列為空時,消費者進入休眠。當新的對象添加到空隊列時,消費者會醒來并處理該對象。因為大多數應用程序喜歡順序處理方式,所以消費者通常是單線程的。
問題:代碼重復
因為生產者-消費者方案很常用,所以在構建應用程序時它可能會出現幾次,這導致了代碼重復。我們認識到,這顯示了在應用程序開發過程期間多次使用了生產者-消費者方案的問題。
當第一次需要生產者-消費者行為時,通過編寫一個采用一個線程和一個隊列的類來實現該行為。當第二次需要這種行為時,我們著手從頭開始實現它,但是接著認識到以前已經做過這件事了。我們復制了代碼并修改了處理對象的方式。當第三次在該應用程序中實現生產者-消費者行為時,很明顯我們復制了太多代碼。我們決定,需要一個適用的 Consumer
類,它將處理我們所有的生產者-消費者方案。
我們的解決方案:Consumer 類
我們創建 Consumer
類的目的是:在我們的應用程序中,消除這種代碼重復 ― 為每個生產者-消費者實例編寫一個新作業隊列和消費者線程來解決這個問題。有了適當的 Consumer
類,我們所必須做的只是編寫專門用于作業處理(業務邏輯)的代碼。這使得我們的代碼更清晰、更易于維護以及更改起來更靈活。
我們對 Consumer
類有如下需求:
- 重用:我們希望這個類包括所有東西。一個線程、一個隊列以及使這兩者結合在一起的所有邏輯。這將使我們只須編寫隊列中“消費”特定作業的代碼。(因而,例如,程序員使用
Consumer
類時,將重載 onConsume(ObjectjobToBeConsumed)
方法。)
- 隊列選項:我們希望能夠設置將由
Consumer
對象使用的隊列實現。但是,這意味著我們必須確保隊列是線程安全的或使用一個不會與消費操作沖突的單線程生產者。無論使用哪種方法,都必須將隊列設計成允許不同的進程能訪問其方法。
- Consumer 線程優先級:我們希望能夠設置
Consumer
線程運行的優先級。
- Consumer 線程命名:線程擁有一個有意義的名稱會比較方便,當然這的確有助于調試。例如,如果您向 Java 虛擬機發送了一個信號,它將生成一個完整的線程轉儲 ― 所有線程及其相應堆棧跟蹤的快照。要在 Windows 平臺上生成這個線程轉儲,您必須在 Java 程序運行的窗口中按下鍵序列
<ctrl><break>
,或者單擊窗口上的“關閉”按鈕。有關如何使用完整的線程轉儲來診斷 Java 軟件問題的更多信息,請參閱 參考資料。
類代碼
在 getThread()
方法中,我們使用“惰性創建”來創建 Consumer
的線程,如清單 1 所示:
清單 1. 創建 Consumer 的線程 /**
* Lazy creation of the Consumer's thread.
*
* @return the Consumer's thread
*/
private Thread getThread()
{
if (_thread==null)
{
_thread = new Thread()
{
public void run()
{
Consumer.this.run();
}
};
}
return _thread;
|
該線程的 run()
方法運行 Consumer
的 run()
方法,它是主消費者循環,如清單 2 所示:
清單 2. run() 方法是主 Consumer 循環 /**
* Main Consumer's thread method.
*/
private void run()
{
while (!_isTerminated)
{
// job handling loop
while (true)
{
Object o;
synchronized (_queue)
{
if (_queue.isEmpty())
break;
o = _queue.remove();
}
if (o == null)
break;
onConsume(o);
}
// if we are not terminated and the queue is still empty
// then wait until new jobs arrive.
synchronized(_waitForJobsMonitor)
{
if (_isTerminated)
break;
if(_queue.isEmpty())
{
try
{
_waitForJobsMonitor.wait();
}
catch (InterruptedException ex)
{
}
}
}
}
}// run()
|
基本上, Consumer
的線程一直運行,直到隊列中不再有等待的作業為止。然后它進入休眠,只在第一次調用 add(Object)
時醒來,該方法向隊列添加一個新作業并“踢”醒該線程。
使用 wait()
和 notify()
機制來完成“睡眠”和“踢”。實際的消費者工作由 OnConsume(Object)
方法處理,如清單 3 所示:
清單 3. 喚醒和通知 Consumer /**
* Add an object to the Consumer.
* This is the entry point for the producer.
* After the item is added, the Consumer's thread
* will be notified.
*
* @param the object to be 'consumed' by this consumer
*/
public void add(Object o)
{
_queue.add(o);
kickThread();
}
/**
* Wake up the thread (without adding new stuff to consume)
*
*/
public void kickThread()
{
if (!this._thread.isInterrupted())
{
synchronized(_waitForJobsMonitor)
{
_waitForJobsMonitor.notify();
}
}
}
|
示例:MessagesProcessor
為了向您展示 Consumer
類是如何工作的,我們將使用一個簡單示例。 MessagesProcessor
類以異步方式處理進入的消息(也就是說,不干擾調用線程)。其工作是在每個消息到來時打印它。 MessagesProcessor
具有一個處理到來的消息作業的內部 Consumer
。當新作業進入空隊列時, Consumer
調用 processMessage(String)
方法來處理它,如清單 4 所示:
清單 4. MessagesProcessor 類 class MessagesProcessor
{
String _name;
// anonymous inner class that supplies the consumer
// capabilities for the MessagesProcessor
private Consumer _consumer = new Consumer()
{
// that method is called on each event retrieved
protected void onConsume(Object o)
{
if (!(o instanceof String))
{
System.out.println("illegal use, ignoring");
return;
}
MessagesProcesser.this.processMessage((String)o);
}
}.setName("MessagesProcessor").init();
public void gotMessageEvent(String s)
{
_consumer.add(s);
}
private void processMessage(String s)
{
System.out.println(_name+" processed message: "+s);
}
private void terminate()
{
_consumer.terminateWait();
_name = null;
}
MessagesProcessor()
{
_name = "Example Consumer";
}
}
|
正如您可以從上面的代碼中所看到的,定制 Consumer
相當簡單。我們使用了一個匿名內部類來繼承 Consumer
類,并重載抽象方法 onConsume()
。因此,在我們的示例中,只需調用 processMessage
。
Consumer 類的高級特性
除了開始時提出的基本需求以外,我們還為 Consumer
類提供了一些我們覺得有用的高級特性。
事件通知
- onThreadTerminate():只在終止
Consumer
前調用該方法。我們出于調試目的覆蓋了這個方法。
- goingToRest():只在 Consumer 線程進入休眠前調用該方法(也就是說,只在調用
_waitForJobsMonitor.wait()
之前調用)。只在需要消費者在進入休眠之前處理一批已處理工作的復雜情況中,可能需要這種通知。
終止
- terminate():Consumer 線程的異步終止。
- terminateWait():設置調用線程一直等待,直到消費者線程實際終止為止。
在我們的示例中,如果使用 terminate()
而不是 terminateWait()
,那么將會出現問題,因為在將 _name
設置成空值之后調用 onConsume()
方法。這將導致執行 processMessage
的線程拋出一個 NullPointerException
。
結束語:Consumer 類的好處
可在 參考資料一節下載 Consumer
類的源代碼。請自由使用源代碼,并按照您的需要擴展它。我們發現將這個類用于多線程應用程序開發有許多好處:
- 代碼重用/重復代碼的消除:如果您有
Consumer
類,就不必為您應用程序中的每個實例編寫一個新的消費者。如果在應用程序開發中頻繁使用生產者-消費者方案,這可以很大程度地節省時間。另外,請牢記重復代碼是滋生錯誤的沃土。它還使基本代碼的維護更為困難。
- 更少錯誤:使用驗證過的代碼是一種防止錯誤的好實踐,尤其是處理多線程應用程序時。因為
Consumer
類已經被調試過,所以它更安全。消費者還通過在線程和資源之間擔任安全中介來防止與線程相關的錯誤。消費者可以代表其它線程以順序的方式訪問資源。
- 漂亮、清晰的代碼:使用
Consumer
類有助于我們編寫出更簡單的代碼,這樣的代碼更容易理解和維護。如果我們不使用 Consumer
類,就必須編寫代碼來處理兩種不同的功能:消費邏輯(隊列和線程管理、同步等)和指定消費者的用法或功能的代碼。