java語言已經(jīng)內(nèi)置了多線程支持,所有實(shí)現(xiàn)Runnable接口的類都可被啟動(dòng)一個(gè)新線程,新線程會(huì)執(zhí)行該實(shí)例的run()方法,當(dāng)run()方法執(zhí)行完畢后,線程就結(jié)束了。一旦一個(gè)線程執(zhí)行完畢,這個(gè)實(shí)例就不能再重新啟動(dòng),只能重新生成一個(gè)新實(shí)例,再啟動(dòng)一個(gè)新線程。
Thread類是實(shí)現(xiàn)了Runnable接口的一個(gè)實(shí)例,它代表一個(gè)線程的實(shí)例,并且,啟動(dòng)線程的唯一方法就是通過Thread類的start()實(shí)例方法:
Thread t = new Thread();
t.start();
start()方法是一個(gè)native方法,它將啟動(dòng)一個(gè)新線程,并執(zhí)行run()方法。Thread類默認(rèn)的run()方法什么也不做就退出了。注意:直接調(diào)用run()方法并不會(huì)啟動(dòng)一個(gè)新線程,它和調(diào)用一個(gè)普通的java方法沒有什么區(qū)別。
因此,有兩個(gè)方法可以實(shí)現(xiàn)自己的線程:
方法1:自己的類extend Thread,并復(fù)寫run()方法,就可以啟動(dòng)新線程并執(zhí)行自己定義的run()方法。例如:
public class MyThread extends Thread {
public run() {
System.out.println("MyThread.run()");
}
}
在合適的地方啟動(dòng)線程:new MyThread().start();
方法2:如果自己的類已經(jīng)extends另一個(gè)類,就無法直接extends Thread,此時(shí),必須實(shí)現(xiàn)一個(gè)Runnable接口:
public class MyThread extends OtherClass implements Runnable {
public run() {
System.out.println("MyThread.run()");
}
}
為了啟動(dòng)MyThread,需要首先實(shí)例化一個(gè)Thread,并傳入自己的MyThread實(shí)例:
MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();
事實(shí)上,當(dāng)傳入一個(gè)Runnable target參數(shù)給Thread后,Thread的run()方法就會(huì)調(diào)用target.run(),參考JDK源代碼:
public void run() {
if (target != null) {
target.run();
}
}
線程還有一些Name, ThreadGroup, isDaemon等設(shè)置,由于和線程設(shè)計(jì)模式關(guān)聯(lián)很少,這里就不多說了。
由于同一進(jìn)程內(nèi)的多個(gè)線程共享內(nèi)存空間,在Java中,就是共享實(shí)例,當(dāng)多個(gè)線程試圖同時(shí)修改某個(gè)實(shí)例的內(nèi)容時(shí),就會(huì)造成沖突,因此,線程必須實(shí)現(xiàn)共享互斥,使多線程同步。
最簡單的同步是將一個(gè)方法標(biāo)記為synchronized,對同一個(gè)實(shí)例來說,任一時(shí)刻只能有一個(gè)synchronized方法在執(zhí)行。當(dāng)一個(gè)方法正在執(zhí)行某個(gè)synchronized方法時(shí),其他線程如果想要執(zhí)行這個(gè)實(shí)例的任意一個(gè)synchronized方法,都必須等待當(dāng)前執(zhí)行 synchronized方法的線程退出此方法后,才能依次執(zhí)行。
但是,非synchronized方法不受影響,不管當(dāng)前有沒有執(zhí)行synchronized方法,非synchronized方法都可以被多個(gè)線程同時(shí)執(zhí)行。
此外,必須注意,只有同一實(shí)例的synchronized方法同一時(shí)間只能被一個(gè)線程執(zhí)行,不同實(shí)例的synchronized方法是可以并發(fā)的。例如,class A定義了synchronized方法sync(),則不同實(shí)例a1.sync()和a2.sync()可以同時(shí)由兩個(gè)線程來執(zhí)行。
多線程同步的實(shí)現(xiàn)最終依賴鎖機(jī)制。我們可以想象某一共享資源是一間屋子,每個(gè)人都是一個(gè)線程。當(dāng)A希望進(jìn)入房間時(shí),他必須獲得門鎖,一旦A獲得門鎖,他進(jìn)去后就立刻將門鎖上,于是B,C,D...就不得不在門外等待,直到A釋放鎖出來后,B,C,D...中的某一人搶到了該鎖(具體搶法依賴于 JVM的實(shí)現(xiàn),可以先到先得,也可以隨機(jī)挑選),然后進(jìn)屋又將門鎖上。這樣,任一時(shí)刻最多有一人在屋內(nèi)(使用共享資源)。
Java語言規(guī)范內(nèi)置了對多線程的支持。對于Java程序來說,每一個(gè)對象實(shí)例都有一把“鎖”,一旦某個(gè)線程獲得了該鎖,別的線程如果希望獲得該鎖,只能等待這個(gè)線程釋放鎖之后。獲得鎖的方法只有一個(gè),就是synchronized關(guān)鍵字。例如:
public class SharedResource {
private int count = 0;
public int getCount() { return count; }
public synchronized void setCount(int count) { this.count = count; }
}
同步方法public synchronized void setCount(int count) { this.count = count; } 事實(shí)上相當(dāng)于:
public void setCount(int count) {
synchronized(this) { // 在此獲得this鎖
this.count = count;
} // 在此釋放this鎖
}
紅色部分表示需要同步的代碼段,該區(qū)域?yàn)椤拔kU(xiǎn)區(qū)域”,如果兩個(gè)以上的線程同時(shí)執(zhí)行,會(huì)引發(fā)沖突,因此,要更改SharedResource的內(nèi)部狀態(tài),必須先獲得SharedResource實(shí)例的鎖。
退出synchronized塊時(shí),線程擁有的鎖自動(dòng)釋放,于是,別的線程又可以獲取該鎖了。
為了提高性能,不一定要鎖定this,例如,SharedResource有兩個(gè)獨(dú)立變化的變量:
public class SharedResouce {
private int a = 0;
private int b = 0;
public synchronized void setA(int a) { this.a = a; }
public synchronized void setB(int b) { this.b = b; }
}
若同步整個(gè)方法,則setA()的時(shí)候無法setB(),setB()時(shí)無法setA()。為了提高性能,可以使用不同對象的鎖:
public class SharedResouce {
private int a = 0;
private int b = 0;
private Object sync_a = new Object();
private Object sync_b = new Object();
public void setA(int a) {
synchronized(sync_a) {
this.a = a;
}
}
public synchronized void setB(int b) {
synchronized(sync_b) {
this.b = b;
}
}
}
通常,多線程之間需要協(xié)調(diào)工作。例如,瀏覽器的一個(gè)顯示圖片的線程displayThread想要執(zhí)行顯示圖片的任務(wù),必須等待下載線程 downloadThread將該圖片下載完畢。如果圖片還沒有下載完,displayThread可以暫停,當(dāng)downloadThread完成了任務(wù)后,再通知displayThread“圖片準(zhǔn)備完畢,可以顯示了”,這時(shí),displayThread繼續(xù)執(zhí)行。
以上邏輯簡單的說就是:如果條件不滿足,則等待。當(dāng)條件滿足時(shí),等待該條件的線程將被喚醒。在Java中,這個(gè)機(jī)制的實(shí)現(xiàn)依賴于wait/notify。等待機(jī)制與鎖機(jī)制是密切關(guān)聯(lián)的。例如:
synchronized(obj) {
while(!condition) {
obj.wait();
}
obj.doSomething();
}
當(dāng)線程A獲得了obj鎖后,發(fā)現(xiàn)條件condition不滿足,無法繼續(xù)下一處理,于是線程A就wait()。
在另一線程B中,如果B更改了某些條件,使得線程A的condition條件滿足了,就可以喚醒線程A:
synchronized(obj) {
condition = true;
obj.notify();
}
需要注意的概念是:
# 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) {...} 代碼段內(nèi)。
# 調(diào)用obj.wait()后,線程A就釋放了obj的鎖,否則線程B無法獲得obj鎖,也就無法在synchronized(obj) {...} 代碼段內(nèi)喚醒A。
# 當(dāng)obj.wait()方法返回后,線程A需要再次獲得obj鎖,才能繼續(xù)執(zhí)行。
# 如果A1,A2,A3都在obj.wait(),則B調(diào)用obj.notify()只能喚醒A1,A2,A3中的一個(gè)(具體哪一個(gè)由JVM決定)。
# obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續(xù)執(zhí)行obj.wait()的下一條語句,必須獲得obj鎖,因此,A1,A2,A3只有一個(gè)有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續(xù)執(zhí)行。
# 當(dāng)B調(diào)用obj.notify/notifyAll的時(shí)候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個(gè)才有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行。
前面講了wait/notify機(jī)制,Thread還有一個(gè)sleep()靜態(tài)方法,它也能使線程暫停一段時(shí)間。sleep與wait的不同點(diǎn)是: sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會(huì)使線程進(jìn)入obj對象的等待集合中并等待喚醒。
但是wait()和sleep()都可以通過interrupt()方法打斷線程的暫停狀態(tài),從而使線程立刻拋出InterruptedException。
如果線程A希望立即結(jié)束線程B,則可以對線程B對應(yīng)的Thread實(shí)例調(diào)用interrupt方法。如果此刻線程B正在wait/sleep/join,則線程B會(huì)立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結(jié)束線程。
需要注意的是,InterruptedException是線程自己從內(nèi)部拋出的,并不是interrupt()方法拋出的。對某一線程調(diào)用 interrupt()時(shí),如果該線程正在執(zhí)行普通的代碼,那么該線程根本就不會(huì)拋出InterruptedException。但是,一旦該線程進(jìn)入到 wait()/sleep()/join()后,就會(huì)立刻拋出InterruptedException。
GuardedSuspention模式主要思想是:
當(dāng)條件不滿足時(shí),線程等待,直到條件滿足時(shí),等待該條件的線程被喚醒。
我們設(shè)計(jì)一個(gè)客戶端線程和一個(gè)服務(wù)器線程,客戶端線程不斷發(fā)送請求給服務(wù)器線程,服務(wù)器線程不斷處理請求。當(dāng)請求隊(duì)列為空時(shí),服務(wù)器線程就必須等待,直到客戶端發(fā)送了請求。
先定義一個(gè)請求隊(duì)列:Queue
package com.crackj2ee.thread;
import java.util.*;
public class Queue {
private List queue = new LinkedList();
public synchronized Request getRequest() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Request)queue.remove(0);
}
public synchronized void putRequest(Request request) {
queue.add(request);
this.notifyAll();
}
}
藍(lán)色部分就是服務(wù)器線程的等待條件,而客戶端線程在放入了一個(gè)request后,就使服務(wù)器線程等待條件滿足,于是喚醒服務(wù)器線程。
客戶端線程:ClientThread
package com.crackj2ee.thread;
public class ClientThread extends Thread {
private Queue queue;
private String clientName;
public ClientThread(Queue queue, String clientName) {
this.queue = queue;
this.clientName = clientName;
}
public String toString() {
return "[ClientThread-" + clientName + "]";
}
public void run() {
for(int i=0; i<100; i++) {
Request request = new Request("" + (long)(Math.random()*10000));
System.out.println(this + " send request: " + request);
queue.putRequest(request);
try {
Thread.sleep((long)(Math.random() * 10000 + 1000));
}
catch(InterruptedException ie) {
}
}
System.out.println(this + " shutdown.");
}
}
服務(wù)器線程:ServerThread
package com.crackj2ee.thread;
public class ServerThread extends Thread {
private boolean stop = false;
private Queue queue;
public ServerThread(Queue queue) {
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public void run() {
while(!stop) {
Request request = queue.getRequest();
System.out.println("[ServerThread] handle request: " + request);
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
}
System.out.println("[ServerThread] shutdown.");
}
}
服務(wù)器線程在紅色部分可能會(huì)阻塞,也就是說,Queue.getRequest是一個(gè)阻塞方法。這和java標(biāo)準(zhǔn)庫的許多IO方法類似。
最后,寫一個(gè)Main來啟動(dòng)他們:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
Queue queue = new Queue();
ServerThread server = new ServerThread(queue);
server.start();
ClientThread[] clients = new ClientThread[5];
for(int i=0; i<clients.length; i++) {
clients[i] = new ClientThread(queue, ""+i);
clients[i].start();
}
try {
Thread.sleep(100000);
}
catch(InterruptedException ie) {}
server.shutdown();
}
}
我們啟動(dòng)了5個(gè)客戶端線程和一個(gè)服務(wù)器線程,運(yùn)行結(jié)果如下:
[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......
可以觀察到ServerThread處理來自不同客戶端的請求。
思考
Q: 服務(wù)器線程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?
A: 在這個(gè)例子中可以,因?yàn)榉?wù)器線程只有一個(gè)。但是,如果服務(wù)器線程有多個(gè)(例如Web應(yīng)用程序有多個(gè)線程處理并發(fā)請求,這非常普遍),就會(huì)造成嚴(yán)重問題。
Q: 能否用sleep(1000)代替wait()?
A: 絕對不可以。sleep()不會(huì)釋放鎖,因此sleep期間別的線程根本沒有辦法調(diào)用getRequest()和putRequest(),導(dǎo)致所有相關(guān)線程都被阻塞。
Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?
A: 不可以。因?yàn)閣hile()是測試queue,remove()是使用queue,兩者是一個(gè)原子操作,不能放在synchronized外面。
總結(jié)
多線程設(shè)計(jì)看似簡單,實(shí)際上必須非常仔細(xì)地考慮各種鎖定/同步的條件,稍不小心,就可能出錯(cuò)。并且,當(dāng)線程較少時(shí),很可能發(fā)現(xiàn)不了問題,一旦問題出現(xiàn)又難以調(diào)試。
所幸的是,已有一些被驗(yàn)證過的模式可以供我們使用,我們會(huì)繼續(xù)介紹一些常用的多線程設(shè)計(jì)模式。
前面談了多線程應(yīng)用程序能極大地改善用戶相應(yīng)。例如對于一個(gè)Web應(yīng)用程序,每當(dāng)一個(gè)用戶請求服務(wù)器連接時(shí),服務(wù)器就可以啟動(dòng)一個(gè)新線程為用戶服務(wù)。
然而,創(chuàng)建和銷毀線程本身就有一定的開銷,如果頻繁創(chuàng)建和銷毀線程,CPU和內(nèi)存開銷就不可忽略,垃圾收集器還必須負(fù)擔(dān)更多的工作。因此,線程池就是為了避免頻繁創(chuàng)建和銷毀線程。
每當(dāng)服務(wù)器接受了一個(gè)新的請求后,服務(wù)器就從線程池中挑選一個(gè)等待的線程并執(zhí)行請求處理。處理完畢后,線程并不結(jié)束,而是轉(zhuǎn)為阻塞狀態(tài)再次被放入線程池中。這樣就避免了頻繁創(chuàng)建和銷毀線程。
Worker Pattern實(shí)現(xiàn)了類似線程池的功能。首先定義Task接口:
package com.crackj2ee.thread;
public interface Task {
void execute();
}
線程將負(fù)責(zé)執(zhí)行execute()方法。注意到任務(wù)是由子類通過實(shí)現(xiàn)execute()方法實(shí)現(xiàn)的,線程本身并不知道自己執(zhí)行的任務(wù)。它只負(fù)責(zé)運(yùn)行一個(gè)耗時(shí)的execute()方法。
具體任務(wù)由子類實(shí)現(xiàn),我們定義了一個(gè)CalculateTask和一個(gè)TimerTask:
// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
private static int count = 0;
private int num = count;
public CalculateTask() {
count++;
}
public void execute() {
System.out.println("[CalculateTask " + num + "] start...");
try {
Thread.sleep(3000);
}
catch(InterruptedException ie) {}
System.out.println("[CalculateTask " + num + "] done.");
}
}
// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
private static int count = 0;
private int num = count;
public TimerTask() {
count++;
}
public void execute() {
System.out.println("[TimerTask " + num + "] start...");
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
System.out.println("[TimerTask " + num + "] done.");
}
}
以上任務(wù)均簡單的sleep若干秒。
TaskQueue實(shí)現(xiàn)了一個(gè)隊(duì)列,客戶端可以將請求放入隊(duì)列,服務(wù)器線程可以從隊(duì)列中取出任務(wù):
package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
private List queue = new LinkedList();
public synchronized Task getTask() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Task)queue.remove(0);
}
public synchronized void putTask(Task task) {
queue.add(task);
this.notifyAll();
}
}
終于到了真正的WorkerThread,這是真正執(zhí)行任務(wù)的服務(wù)器線程:
package com.crackj2ee.thread;
public class WorkerThread extends Thread {
private static int count = 0;
private boolean busy = false;
private boolean stop = false;
private TaskQueue queue;
public WorkerThread(ThreadGroup group, TaskQueue queue) {
super(group, "worker-" + count);
count++;
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public boolean isIdle() {
return !busy;
}
public void run() {
System.out.println(getName() + " start.");
while(!stop) {
Task task = queue.getTask();
if(task!=null) {
busy = true;
task.execute();
busy = false;
}
}
System.out.println(getName() + " end.");
}
}
前面已經(jīng)講過,queue.getTask()是一個(gè)阻塞方法,服務(wù)器線程可能在此wait()一段時(shí)間。此外,WorkerThread還有一個(gè)shutdown方法,用于安全結(jié)束線程。
最后是ThreadPool,負(fù)責(zé)管理所有的服務(wù)器線程,還可以動(dòng)態(tài)增加和減少線程數(shù):
package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
private List threads = new LinkedList();
private TaskQueue queue;
public ThreadPool(TaskQueue queue) {
super("Thread-Pool");
this.queue = queue;
}
public synchronized void addWorkerThread() {
Thread t = new WorkerThread(this, queue);
threads.add(t);
t.start();
}
public synchronized void removeWorkerThread() {
if(threads.size()>0) {
WorkerThread t = (WorkerThread)threads.remove(0);
t.shutdown();
}
}
public synchronized void currentStatus() {
System.out.println("-----------------------------------------------");
System.out.println("Thread count = " + threads.size());
Iterator it = threads.iterator();
while(it.hasNext()) {
WorkerThread t = (WorkerThread)it.next();
System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
}
System.out.println("-----------------------------------------------");
}
}
currentStatus()方法是為了方便調(diào)試,打印出所有線程的當(dāng)前狀態(tài)。
最后,Main負(fù)責(zé)完成main()方法:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
TaskQueue queue = new TaskQueue();
ThreadPool pool = new ThreadPool(queue);
for(int i=0; i<10; i++) {
queue.putTask(new CalculateTask());
queue.putTask(new TimerTask());
}
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(8000);
pool.currentStatus();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(5000);
pool.currentStatus();
}
private static void doSleep(long ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}
main()一開始放入了20個(gè)Task,然后動(dòng)態(tài)添加了一些服務(wù)線程,并定期打印線程狀態(tài),運(yùn)行結(jié)果如下:
worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.
仔細(xì)觀察:一開始只有兩個(gè)服務(wù)器線程,因此線程狀態(tài)都是忙,后來線程數(shù)增多,6個(gè)線程中的兩個(gè)狀態(tài)變成idle,說明處于wait()狀態(tài)。
思考:本例的線程調(diào)度算法其實(shí)根本沒有,因?yàn)檫@個(gè)應(yīng)用是圍繞TaskQueue設(shè)計(jì)的,不是以Thread Pool為中心設(shè)計(jì)的。因此,Task調(diào)度取決于TaskQueue的getTask()方法,你可以改進(jìn)這個(gè)方法,例如使用優(yōu)先隊(duì)列,使優(yōu)先級高的任務(wù)先被執(zhí)行。
如果所有的服務(wù)器線程都處于busy狀態(tài),則說明任務(wù)繁忙,TaskQueue的隊(duì)列越來越長,最終會(huì)導(dǎo)致服務(wù)器內(nèi)存耗盡。因此,可以限制TaskQueue的等待任務(wù)數(shù),超過最大長度就拒絕處理。許多Web服務(wù)器在用戶請求繁忙時(shí)就會(huì)拒絕用戶:HTTP 503 SERVICE UNAVAILABLE
多線程讀寫同一個(gè)對象的數(shù)據(jù)是很普遍的,通常,要避免讀寫沖突,必須保證任何時(shí)候僅有一個(gè)線程在寫入,有線程正在讀取的時(shí)候,寫入操作就必須等待。簡單說,就是要避免“寫-寫”沖突和“讀-寫”沖突。但是同時(shí)讀是允許的,因?yàn)椤白x-讀”不沖突,而且很安全。
要實(shí)現(xiàn)以上的ReadWriteLock,簡單的使用synchronized就不行,我們必須自己設(shè)計(jì)一個(gè)ReadWriteLock類,在讀之前,必須先獲得“讀鎖”,寫之前,必須先獲得“寫鎖”。舉例說明:
DataHandler對象保存了一個(gè)可讀寫的char[]數(shù)組:
package com.crackj2ee.thread;
public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();
private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}
doRead()和doWrite()方法是非線程安全的讀寫方法。為了演示,加入了sleep(),并設(shè)置讀的速度大約是寫的3倍,這符合通常的情況。
為了讓多線程能安全讀寫,我們設(shè)計(jì)了一個(gè)ReadWriteLock:
package com.crackj2ee.thread;
public class ReadWriteLock {
private int readingThreads = 0;
private int writingThreads = 0;
private int waitingThreads = 0; // waiting for write
private boolean preferWrite = true;
public synchronized void readLock() throws InterruptedException {
while(writingThreads>0 || (preferWrite && waitingThreads>0))
this.wait();
readingThreads++;
}
public synchronized void readUnlock() {
readingThreads--;
preferWrite = true;
notifyAll();
}
public synchronized void writeLock() throws InterruptedException {
waitingThreads++;
try {
while(readingThreads>0 || writingThreads>0)
this.wait();
}
finally {
waitingThreads--;
}
writingThreads++;
}
public synchronized void writeUnlock() {
writingThreads--;
preferWrite = false;
notifyAll();
}
}
readLock()用于獲得讀鎖,readUnlock()釋放讀鎖,writeLock()和writeUnlock()一樣。由于鎖用完必須釋放,因此,必須保證lock和unlock匹配。我們修改DataHandler,加入ReadWriteLock:
package com.crackj2ee.thread;
public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();
// lock:
private ReadWriteLock lock = new ReadWriteLock();
public char[] read(String name) throws InterruptedException {
System.out.println(name + " waiting for read...");
lock.readLock();
try {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
finally {
lock.readUnlock();
}
}
public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " waiting for write...");
lock.writeLock();
try {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}
finally {
lock.writeUnlock();
}
}
private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}
public方法read()和write()完全封裝了底層的ReadWriteLock,因此,多線程可以安全地調(diào)用這兩個(gè)方法:
// ReadingThread不斷讀取數(shù)據(jù):
package com.crackj2ee.thread;
public class ReadingThread extends Thread {
private DataHandler handler;
public ReadingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
for(;;) {
try {
char[] data = handler.read(getName());
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
}
// WritingThread不斷寫入數(shù)據(jù),每次寫入的都是10個(gè)相同的字符:
package com.crackj2ee.thread;
public class WritingThread extends Thread {
private DataHandler handler;
public WritingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
char[] data = new char[10];
for(;;) {
try {
fill(data);
handler.write(getName(), data);
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
// 產(chǎn)生一個(gè)A-Z隨機(jī)字符,填入char[10]:
private void fill(char[] data) {
char c = (char)(Math.random()*26+'A');
for(int i=0; i<data.length; i++)
data[i] = c;
}
}
最后Main負(fù)責(zé)啟動(dòng)這些線程:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
DataHandler handler = new DataHandler();
Thread[] ts = new Thread[] {
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new WritingThread(handler),
new WritingThread(handler)
};
for(int i=0; i<ts.length; i++) {
ts[i].start();
}
}
}
我們啟動(dòng)了5個(gè)讀線程和2個(gè)寫線程,運(yùn)行結(jié)果如下:
Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......
可以看到,每次讀/寫都是完整的原子操作,因?yàn)槲覀兠看螌懭氲亩际?0個(gè)相同字符。并且,每次讀出的都是最近一次寫入的內(nèi)容。
如果去掉ReadWriteLock:
package com.crackj2ee.thread;
public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();
public char[] read(String name) throws InterruptedException {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}
private char[] doRead() {
char[] ret = new char[10];
for(int i=0; i<10; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
for(int i=0; i<10; i++) {
buffer[i] = data[i];
sleep(10);
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}
運(yùn)行結(jié)果如下:
Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE
可以看到在Thread-6寫入EEEEEEEEEE的過程中,3個(gè)線程讀取的內(nèi)容是不同的。
思考
java的synchronized提供了最底層的物理鎖,要在synchronized的基礎(chǔ)上,實(shí)現(xiàn)自己的邏輯鎖,就必須仔細(xì)設(shè)計(jì)ReadWriteLock。
Q: lock.readLock()為什么不放入try{ } 內(nèi)?
A: 因?yàn)閞eadLock()會(huì)拋出InterruptedException,導(dǎo)致readingThreads++不執(zhí)行,而readUnlock()在finally{ } 中,導(dǎo)致readingThreads--執(zhí)行,從而使readingThread狀態(tài)出錯(cuò)。writeLock()也是類似的。
Q: preferWrite有用嗎?
A: 如果去掉preferWrite,線程安全不受影響。但是,如果讀取線程很多,上一個(gè)線程還沒有讀取完,下一個(gè)線程又開始讀了,就導(dǎo)致寫入線程長時(shí)間無法獲得writeLock;如果寫入線程等待的很多,一個(gè)接一個(gè)寫,也會(huì)導(dǎo)致讀取線程長時(shí)間無法獲得readLock。preferWrite的作用是讓讀 /寫交替執(zhí)行,避免由于讀線程繁忙導(dǎo)致寫無法進(jìn)行和由于寫線程繁忙導(dǎo)致讀無法進(jìn)行。
Q: notifyAll()換成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一個(gè)線程剛讀取完畢,此時(shí)preferWrite=true,再notify(),若恰好喚醒的是一個(gè)讀線程,則while(writingThreads>0 || (preferWrite && waitingThreads>0))可能為true導(dǎo)致該讀線程繼續(xù)等待,而等待寫入的線程也處于wait()中,結(jié)果所有線程都處于wait ()狀態(tài),誰也無法喚醒誰。因此,notifyAll()比notify()要來得安全。程序驗(yàn)證notify()帶來的死鎖:
Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(運(yùn)行到此不動(dòng)了)
注意到這種死鎖是由于所有線程都在等待別的線程喚醒自己,結(jié)果都無法醒過來。這和兩個(gè)線程希望獲得對方已有的鎖造成死鎖不同。因此多線程設(shè)計(jì)的難度遠(yuǎn)遠(yuǎn)高于單線程應(yīng)用。
posted on 2005-10-24 22:22
zjw_albert 閱讀(111)
評論(0) 編輯 收藏