通常,多線程之間需要協調工作。例如,瀏覽器的一個顯示圖片的線程displayThread想要執行顯示圖片的任務,必須等待下載線程downloadThread將該圖片下載完畢。如果圖片還沒有下載完,displayThread可以暫停,當downloadThread完成了任務后,再通知displayThread“圖片準備完畢,可以顯示了”,這時,displayThread繼續執行。
以上邏輯簡單的說就是:如果條件不滿足,則等待。當條件滿足時,等待該條件的線程將被喚醒。在Java中,這個機制的實現依賴于wait/notify。等待機制與鎖機制是密切關聯的。例如:
1 2 3 4 5 6 | synchronized(obj) { while(!condition) { obj.wait(); } obj.doSomething(); } |
當線程A獲得了obj鎖后,發現條件condition不滿足,無法繼續下一處理,于是線程A就wait()。
在另一線程B中,如果B更改了某些條件,使得線程A的condition條件滿足了,就可以喚醒線程A:
1 2 3 4 | synchronized(obj) { condition = true; obj.notify(); } |
需要注意的概念是:
# 調用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) {...} 代碼段內。
# 調用obj.wait()后,線程A就釋放了obj的鎖,否則線程B無法獲得obj鎖,也就無法在synchronized(obj) {...} 代碼段內喚醒A。
# 當obj.wait()方法返回后,線程A需要再次獲得obj鎖,才能繼續執行。
# 如果A1,A2,A3都在obj.wait(),則B調用obj.notify()只能喚醒A1,A2,A3中的一個(具體哪一個由JVM決定)。
# obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續執行obj.wait()的下一條語句,必須獲得obj鎖,因此,A1,A2,A3只有一個有機會獲得鎖繼續執行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續執行。
# 當B調用obj.notify/notifyAll的時候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個才有機會獲得鎖繼續執行。
wait()/sleep()的區別
前面講了wait/notify機制,Thread還有一個sleep()靜態方法,它也能使線程暫停一段時間。sleep與wait的不同點是:sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會使線程進入obj對象的等待集合中并等待喚醒。
但是wait()和sleep()都可以通過interrupt()方法打斷線程的暫停狀態,從而使線程立刻拋出InterruptedException。
如果線程A希望立即結束線程B,則可以對線程B對應的Thread實例調用interrupt方法。如果此刻線程B正在wait/sleep/join,則線程B會立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結束線程。
需要注意的是,InterruptedException是線程自己從內部拋出的,并不是interrupt()方法拋出的。對某一線程調用interrupt()時,如果該線程正在執行普通的代碼,那么該線程根本就不會拋出InterruptedException。但是,一旦該線程進入到wait()/sleep()/join()后,就會立刻拋出InterruptedException。
GuardedSuspention
GuardedSuspention模式主要思想是:
當條件不滿足時,線程等待,直到條件滿足時,等待該條件的線程被喚醒。
我們設計一個客戶端線程和一個服務器線程,客戶端線程不斷發送請求給服務器線程,服務器線程不斷處理請求。當請求隊列為空時,服務器線程就必須等待,直到客戶端發送了請求。
先定義一個請求隊列:Queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | package me.luger.Thread;import java.util.*;public class Queue { private List queue = new LinkedList(); public synchronized Request getRequest () { while (queue. size() == 0) { try { this. wait(); } catch (InterruptedException ie ) { return null; } } return (Request) queue. remove(0); } public synchronized void putRequest (Request request ) { queue. add(request ); this. notifyAll(); }} |
藍色部分就是服務器線程的等待條件,而客戶端線程在放入了一個request后,就使服務器線程等待條件滿足,于是喚醒服務器線程。
客戶端線程:ClientThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package me.luger.Thread;public class ClientThread extends Thread { private Queue queue ; private String clientName ; public ClientThread (Queue queue, String clientName ) { this. queue = queue ; this. clientName = clientName ; } public String toString () { return "[ClientThread-" + clientName + "]"; } public void run () { for(int i =0; i <100; i ++ ) { Request request = new Request("" (long)(Math. random()*10000)); System. out. println(this " send request: " request ); queue. putRequest(request ); try { Thread. sleep((long)(Math. random() * 10000 1000)); } catch(InterruptedException ie ) { } } System. out. println(this " shutdown."); }} |
服務器線程:ServerThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | package com.crackj2ee.thread;public class ServerThread extends Thread { private boolean stop = false; private Queue queue ; public ServerThread (Queue queue ) { this. queue = queue ; } public void shutdown () { stop = true; this. interrupt(); try { this. join(); } catch (InterruptedException ie ) { } } public void run () { while(!stop ) { Request request = queue. getRequest(); System. out. println("[ServerThread] handle request: " request ); try { Thread. sleep(2000); } catch(InterruptedException ie ) {} } System. out. println("[ServerThread] shutdown."); }} |
服務器線程在紅色部分可能會阻塞,也就是說,Queue.getRequest是一個阻塞方法。這和java標準庫的許多IO方法類似。
最后,寫一個Main來啟動他們:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | package me.luger.thread;public class Main { public static void main (String[] args ) { Queue queue = new Queue (); ServerThread server = new ServerThread (queue ); server. start(); ClientThread [] clients = new ClientThread [5]; for(int i =0; i <CLIENTS. LENGTH; { clients [i ] = new ClientThread (queue, "" i ); clients [i ]. start(); } try { Thread. sleep(100000); } catch(InterruptedException ie ) {} server. shutdown(); }} |
我們啟動了5個客戶端線程和一個服務器線程,運行結果如下:
[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......
可以觀察到ServerThread處理來自不同客戶端的請求。
思考
Q: 服務器線程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?
A: 在這個例子中可以,因為服務器線程只有一個。但是,如果服務器線程有多個(例如Web應用程序有多個線程處理并發請求,這非常普遍),就會造成嚴重問題。
Q: 能否用sleep(1000)代替wait()?
A: 絕對不可以。sleep()不會釋放鎖,因此sleep期間別的線程根本沒有辦法調用getRequest()和putRequest(),導致所有相關線程都被阻塞。
Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?
A: 不可以。因為while()是測試queue,remove()是使用queue,兩者是一個原子操作,不能放在synchronized外面。
總結
多線程設計看似簡單,實際上必須非常仔細地考慮各種鎖定/同步的條件,稍不小心,就可能出錯。并且,當線程較少時,很可能發現不了問題,一旦問題出現又難以調試。
所幸的是,已有一些被驗證過的模式可以供我們使用,我們會繼續介紹一些常用的多線程設計模式。
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;