java多線程設計模式
zdmilan 的 Blog

java語言已經內置了多線程支持,所有實現Runnable接口的類都可被啟動一個新線程,新線程會執行該實例的run()方法,當run()方法執行完畢后,線程就結束了。一旦一個線程執行完畢,這個實例就不能再重新啟動,只能重新生成一個新實例,再啟動一個新線程。

Thread類是實現了Runnable接口的一個實例,它代表一個線程的實例,并且,啟動線程的唯一方法就是通過Thread類的start()實例方法:

Thread t = new Thread();
t.start();

start()方法是一個native方法,它將啟動一個新線程,并執行run()方法。Thread類默認的run()方法什么也不做就退出了。注意:直接調用run()方法并不會啟動一個新線程,它和調用一個普通的java方法沒有什么區別。

因此,有兩個方法可以實現自己的線程:

方法1:自己的類extend Thread,并復寫run()方法,就可以啟動新線程并執行自己定義的run()方法。例如:

public class MyThread extends Thread {
public run() {
System.out.println("MyThread.run()");
}
}

在合適的地方啟動線程:new MyThread().start();

方法2:如果自己的類已經extends另一個類,就無法直接extends Thread,此時,必須實現一個Runnable接口:

public class MyThread extends OtherClass implements Runnable {
public run() {
System.out.println("MyThread.run()");
}
}

為了啟動MyThread,需要首先實例化一個Thread,并傳入自己的MyThread實例:

MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();

事實上,當傳入一個Runnable target參數給Thread后,Thread的run()方法就會調用target.run(),參考JDK源代碼:

public void run() {
if (target != null) {
target.run();
}
}

線程還有一些Name, ThreadGroup, isDaemon等設置,由于和線程設計模式關聯很少,這里就不多說了。

由于同一進程內的多個線程共享內存空間,在Java中,就是共享實例,當多個線程試圖同時修改某個實例的內容時,就會造成沖突,因此,線程必須實現共享互斥,使多線程同步。

最簡單的同步是將一個方法標記為synchronized,對同一個實例來說,任一時刻只能有一個synchronized方法在執行。當一個方法正在執行某個synchronized方法時,其他線程如果想要執行這個實例的任意一個synchronized方法,都必須等待當前執行 synchronized方法的線程退出此方法后,才能依次執行。

但是,非synchronized方法不受影響,不管當前有沒有執行synchronized方法,非synchronized方法都可以被多個線程同時執行。

此外,必須注意,只有同一實例的synchronized方法同一時間只能被一個線程執行,不同實例的synchronized方法是可以并發的。例如,class A定義了synchronized方法sync(),則不同實例a1.sync()和a2.sync()可以同時由兩個線程來執行。

多線程同步的實現最終依賴鎖機制。我們可以想象某一共享資源是一間屋子,每個人都是一個線程。當A希望進入房間時,他必須獲得門鎖,一旦A獲得門鎖,他進去后就立刻將門鎖上,于是B,C,D...就不得不在門外等待,直到A釋放鎖出來后,B,C,D...中的某一人搶到了該鎖(具體搶法依賴于 JVM的實現,可以先到先得,也可以隨機挑選),然后進屋又將門鎖上。這樣,任一時刻最多有一人在屋內(使用共享資源)。

Java語言規范內置了對多線程的支持。對于Java程序來說,每一個對象實例都有一把“鎖”,一旦某個線程獲得了該鎖,別的線程如果希望獲得該鎖,只能等待這個線程釋放鎖之后。獲得鎖的方法只有一個,就是synchronized關鍵字。例如:

public class SharedResource {
private int count = 0;

public int getCount() { return count; }

public synchronized void setCount(int count) { this.count = count; }

}

同步方法public synchronized void setCount(int count) { this.count = count; } 事實上相當于:

public void setCount(int count) {
synchronized(this) { // 在此獲得this鎖
this.count = count;
} // 在此釋放this鎖
}

紅色部分表示需要同步的代碼段,該區域為“危險區域”,如果兩個以上的線程同時執行,會引發沖突,因此,要更改SharedResource的內部狀態,必須先獲得SharedResource實例的鎖。

退出synchronized塊時,線程擁有的鎖自動釋放,于是,別的線程又可以獲取該鎖了。

為了提高性能,不一定要鎖定this,例如,SharedResource有兩個獨立變化的變量:

public class SharedResouce {
private int a = 0;
private int b = 0;

public synchronized void setA(int a) { this.a = a; }

public synchronized void setB(int b) { this.b = b; }
}

若同步整個方法,則setA()的時候無法setB(),setB()時無法setA()。為了提高性能,可以使用不同對象的鎖:

public class SharedResouce {
private int a = 0;
private int b = 0;
private Object sync_a = new Object();
private Object sync_b = new Object();

public void setA(int a) {
synchronized(sync_a) {
this.a = a;
}
}

public synchronized void setB(int b) {
synchronized(sync_b) {
this.b = b;
}
}
}

通常,多線程之間需要協調工作。例如,瀏覽器的一個顯示圖片的線程displayThread想要執行顯示圖片的任務,必須等待下載線程 downloadThread將該圖片下載完畢。如果圖片還沒有下載完,displayThread可以暫停,當downloadThread完成了任務后,再通知displayThread“圖片準備完畢,可以顯示了”,這時,displayThread繼續執行。

以上邏輯簡單的說就是:如果條件不滿足,則等待。當條件滿足時,等待該條件的線程將被喚醒。在Java中,這個機制的實現依賴于wait/notify。等待機制與鎖機制是密切關聯的。例如:

synchronized(obj) {
while(!condition) {
obj.wait();
}
obj.doSomething();
}

當線程A獲得了obj鎖后,發現條件condition不滿足,無法繼續下一處理,于是線程A就wait()。

在另一線程B中,如果B更改了某些條件,使得線程A的condition條件滿足了,就可以喚醒線程A:

synchronized(obj) {
condition = true;
obj.notify();
}

需要注意的概念是:

# 調用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) {...} 代碼段內。

# 調用obj.wait()后,線程A就釋放了obj的鎖,否則線程B無法獲得obj鎖,也就無法在synchronized(obj) {...} 代碼段內喚醒A。

# 當obj.wait()方法返回后,線程A需要再次獲得obj鎖,才能繼續執行。

# 如果A1,A2,A3都在obj.wait(),則B調用obj.notify()只能喚醒A1,A2,A3中的一個(具體哪一個由JVM決定)。

# obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續執行obj.wait()的下一條語句,必須獲得obj鎖,因此,A1,A2,A3只有一個有機會獲得鎖繼續執行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續執行。

# 當B調用obj.notify/notifyAll的時候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個才有機會獲得鎖繼續執行。

前面講了wait/notify機制,Thread還有一個sleep()靜態方法,它也能使線程暫停一段時間。sleep與wait的不同點是: sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會使線程進入obj對象的等待集合中并等待喚醒。

但是wait()和sleep()都可以通過interrupt()方法打斷線程的暫停狀態,從而使線程立刻拋出InterruptedException。

如果線程A希望立即結束線程B,則可以對線程B對應的Thread實例調用interrupt方法。如果此刻線程B正在 wait/sleep/join,則線程B會立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結束線程。

需要注意的是,InterruptedException是線程自己從內部拋出的,并不是interrupt()方法拋出的。對某一線程調用 interrupt()時,如果該線程正在執行普通的代碼,那么該線程根本就不會拋出InterruptedException。但是,一旦該線程進入到 wait()/sleep()/join()后,就會立刻拋出InterruptedException。

GuardedSuspention模式主要思想是:

當條件不滿足時,線程等待,直到條件滿足時,等待該條件的線程被喚醒。

我們設計一個客戶端線程和一個服務器線程,客戶端線程不斷發送請求給服務器線程,服務器線程不斷處理請求。當請求隊列為空時,服務器線程就必須等待,直到客戶端發送了請求。

先定義一個請求隊列:Queue

package com.crackj2ee.thread;

import java.util.*;

public class Queue {
private List queue = new LinkedList();

public synchronized Request getRequest() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Request)queue.remove(0);
}

public synchronized void putRequest(Request request) {
queue.add(request);
this.notifyAll();
}

}

藍色部分就是服務器線程的等待條件,而客戶端線程在放入了一個request后,就使服務器線程等待條件滿足,于是喚醒服務器線程。

客戶端線程:ClientThread

package com.crackj2ee.thread;

public class ClientThread extends Thread {
private Queue queue;
private String clientName;

public ClientThread(Queue queue, String clientName) {
this.queue = queue;
this.clientName = clientName;
}

public String toString() {
return "[ClientThread-" + clientName + "]";
}

public void run() {
for(int i=0; i<100; i++) {
Request request = new Request("" + (long)(Math.random()*10000));
System.out.println(this + " send request: " + request);
queue.putRequest(request);
try {
Thread.sleep((long)(Math.random() * 10000 + 1000));
}
catch(InterruptedException ie) {
}
}
System.out.println(this + " shutdown.");
}
}

服務器線程:ServerThread

package com.crackj2ee.thread;
public class ServerThread extends Thread {
private boolean stop = false;
private Queue queue;

public ServerThread(Queue queue) {
this.queue = queue;
}

public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}

public void run() {
while(!stop) {
Request request = queue.getRequest();
System.out.println("[ServerThread] handle request: " + request);
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
}
System.out.println("[ServerThread] shutdown.");
}
}

服務器線程在紅色部分可能會阻塞,也就是說,Queue.getRequest是一個阻塞方法。這和java標準庫的許多IO方法類似。

最后,寫一個Main來啟動他們:

package com.crackj2ee.thread;

public class Main {

public static void main(String[] args) {
Queue queue = new Queue();
ServerThread server = new ServerThread(queue);
server.start();
ClientThread[] clients = new ClientThread[5];
for(int i=0; i<clients.length; i++) {
clients[i] = new ClientThread(queue, ""+i);
clients[i].start();
}
try {
Thread.sleep(100000);
}
catch(InterruptedException ie) {}
server.shutdown();
}
}

我們啟動了5個客戶端線程和一個服務器線程,運行結果如下:

[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......

可以觀察到ServerThread處理來自不同客戶端的請求。

思考

Q: 服務器線程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?

A: 在這個例子中可以,因為服務器線程只有一個。但是,如果服務器線程有多個(例如Web應用程序有多個線程處理并發請求,這非常普遍),就會造成嚴重問題。

Q: 能否用sleep(1000)代替wait()?

A: 絕對不可以。sleep()不會釋放鎖,因此sleep期間別的線程根本沒有辦法調用getRequest()和putRequest(),導致所有相關線程都被阻塞。

Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?

A: 不可以。因為while()是測試queue,remove()是使用queue,兩者是一個原子操作,不能放在synchronized外面。

總結

多線程設計看似簡單,實際上必須非常仔細地考慮各種鎖定/同步的條件,稍不小心,就可能出錯。并且,當線程較少時,很可能發現不了問題,一旦問題出現又難以調試。

所幸的是,已有一些被驗證過的模式可以供我們使用,我們會繼續介紹一些常用的多線程設計模式。


前面談了多線程應用程序能極大地改善用戶相應。例如對于一個Web應用程序,每當一個用戶請求服務器連接時,服務器就可以啟動一個新線程為用戶服務。

然而,創建和銷毀線程本身就有一定的開銷,如果頻繁創建和銷毀線程,CPU和內存開銷就不可忽略,垃圾收集器還必須負擔更多的工作。因此,線程池就是為了避免頻繁創建和銷毀線程。

每當服務器接受了一個新的請求后,服務器就從線程池中挑選一個等待的線程并執行請求處理。處理完畢后,線程并不結束,而是轉為阻塞狀態再次被放入線程池中。這樣就避免了頻繁創建和銷毀線程。

Worker Pattern實現了類似線程池的功能。首先定義Task接口:

package com.crackj2ee.thread;
public interface Task {
void execute();
}

線程將負責執行execute()方法。注意到任務是由子類通過實現execute()方法實現的,線程本身并不知道自己執行的任務。它只負責運行一個耗時的execute()方法。

具體任務由子類實現,我們定義了一個CalculateTask和一個TimerTask:

// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
private static int count = 0;
private int num = count;
public CalculateTask() {
count++;
}
public void execute() {
System.out.println("[CalculateTask " + num + "] start...");
try {
Thread.sleep(3000);
}
catch(InterruptedException ie) {}
System.out.println("[CalculateTask " + num + "] done.");
}
}

// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
private static int count = 0;
private int num = count;
public TimerTask() {
count++;
}
public void execute() {
System.out.println("[TimerTask " + num + "] start...");
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
System.out.println("[TimerTask " + num + "] done.");
}
}

以上任務均簡單的sleep若干秒。

TaskQueue實現了一個隊列,客戶端可以將請求放入隊列,服務器線程可以從隊列中取出任務:

package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
private List queue = new LinkedList();
public synchronized Task getTask() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Task)queue.remove(0);
}
public synchronized void putTask(Task task) {
queue.add(task);
this.notifyAll();
}
}

終于到了真正的WorkerThread,這是真正執行任務的服務器線程:

package com.crackj2ee.thread;
public class WorkerThread extends Thread {
private static int count = 0;
private boolean busy = false;
private boolean stop = false;
private TaskQueue queue;
public WorkerThread(ThreadGroup group, TaskQueue queue) {
super(group, "worker-" + count);
count++;
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public boolean isIdle() {
return !busy;
}
public void run() {
System.out.println(getName() + " start.");
while(!stop) {
Task task = queue.getTask();
if(task!=null) {
busy = true;
task.execute();
busy = false;
}
}
System.out.println(getName() + " end.");
}
}

前面已經講過,queue.getTask()是一個阻塞方法,服務器線程可能在此wait()一段時間。此外,WorkerThread還有一個shutdown方法,用于安全結束線程。

最后是ThreadPool,負責管理所有的服務器線程,還可以動態增加和減少線程數:

package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
private List threads = new LinkedList();
private TaskQueue queue;
public ThreadPool(TaskQueue queue) {
super("Thread-Pool");
this.queue = queue;
}
public synchronized void addWorkerThread() {
Thread t = new WorkerThread(this, queue);
threads.add(t);
t.start();
}
public synchronized void removeWorkerThread() {
if(threads.size()>0) {
WorkerThread t = (WorkerThread)threads.remove(0);
t.shutdown();
}
}
public synchronized void currentStatus() {
System.out.println("-----------------------------------------------");
System.out.println("Thread count = " + threads.size());
Iterator it = threads.iterator();
while(it.hasNext()) {
WorkerThread t = (WorkerThread)it.next();
System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
}
System.out.println("-----------------------------------------------");
}
}

currentStatus()方法是為了方便調試,打印出所有線程的當前狀態。

最后,Main負責完成main()方法:

package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
TaskQueue queue = new TaskQueue();
ThreadPool pool = new ThreadPool(queue);
for(int i=0; i<10; i++) {
queue.putTask(new CalculateTask());
queue.putTask(new TimerTask());
}
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(8000);
pool.currentStatus();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(5000);
pool.currentStatus();
}
private static void doSleep(long ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

main()一開始放入了20個Task,然后動態添加了一些服務線程,并定期打印線程狀態,運行結果如下:

worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔細觀察:一開始只有兩個服務器線程,因此線程狀態都是忙,后來線程數增多,6個線程中的兩個狀態變成idle,說明處于wait()狀態。

思考:本例的線程調度算法其實根本沒有,因為這個應用是圍繞TaskQueue設計的,不是以Thread Pool為中心設計的。因此,Task調度取決于TaskQueue的getTask()方法,你可以改進這個方法,例如使用優先隊列,使優先級高的任務先被執行。

如果所有的服務器線程都處于busy狀態,則說明任務繁忙,TaskQueue的隊列越來越長,最終會導致服務器內存耗盡。因此,可以限制 TaskQueue的等待任務數,超過最大長度就拒絕處理。許多Web服務器在用戶請求繁忙時就會拒絕用戶:HTTP 503 SERVICE UNAVAILABLE

多線程讀寫同一個對象的數據是很普遍的,通常,要避免讀寫沖突,必須保證任何時候僅有一個線程在寫入,有線程正在讀取的時候,寫入操作就必須等待。簡單說,就是要避免“寫-寫”沖突和“讀-寫”沖突。但是同時讀是允許的,因為“讀-讀”不沖突,而且很安全。

要實現以上的ReadWriteLock,簡單的使用synchronized就不行,我們必須自己設計一個ReadWriteLock類,在讀之前,必須先獲得“讀鎖”,寫之前,必須先獲得“寫鎖”。舉例說明:

DataHandler對象保存了一個可讀寫的char[]數組:

package com.crackj2ee.thread;

public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();

private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}

private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}

private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

doRead()和doWrite()方法是非線程安全的讀寫方法。為了演示,加入了sleep(),并設置讀的速度大約是寫的3倍,這符合通常的情況。

為了讓多線程能安全讀寫,我們設計了一個ReadWriteLock:

package com.crackj2ee.thread;
public class ReadWriteLock {
private int readingThreads = 0;
private int writingThreads = 0;
private int waitingThreads = 0; // waiting for write
private boolean preferWrite = true;

public synchronized void readLock() throws InterruptedException {
while(writingThreads>0 || (preferWrite && waitingThreads>0))
this.wait();
readingThreads++;
}

public synchronized void readUnlock() {
readingThreads--;
preferWrite = true;
notifyAll();
}

public synchronized void writeLock() throws InterruptedException {
waitingThreads++;
try {
while(readingThreads>0 || writingThreads>0)
this.wait();
}
finally {
waitingThreads--;
}
writingThreads++;
}

public synchronized void writeUnlock() {
writingThreads--;
preferWrite = false;
notifyAll();
}
}

readLock()用于獲得讀鎖,readUnlock()釋放讀鎖,writeLock()和writeUnlock()一樣。由于鎖用完必須釋放,因此,必須保證lock和unlock匹配。我們修改DataHandler,加入ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {
// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();
// lock:
private ReadWriteLock lock = new ReadWriteLock();

public char[] read(String name) throws InterruptedException {
System.out.println(name + " waiting for read...");
lock.readLock();
try {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
finally {
lock.readUnlock();
}
}

public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " waiting for write...");
lock.writeLock();
try {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}
finally {
lock.writeUnlock();
}
}

private char[] doRead() {
char[] ret = new char[buffer.length];
for(int i=0; i<buffer.length; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
if(data!=null) {
buffer = new char[data.length];
for(int i=0; i<buffer.length; i++) {
buffer[i] = data[i];
sleep(10);
}
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

public方法read()和write()完全封裝了底層的ReadWriteLock,因此,多線程可以安全地調用這兩個方法:

// ReadingThread不斷讀取數據:
package com.crackj2ee.thread;
public class ReadingThread extends Thread {
private DataHandler handler;
public ReadingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
for(;;) {
try {
char[] data = handler.read(getName());
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
}

// WritingThread不斷寫入數據,每次寫入的都是10個相同的字符:
package com.crackj2ee.thread;
public class WritingThread extends Thread {
private DataHandler handler;
public WritingThread(DataHandler handler) {
this.handler = handler;
}
public void run() {
char[] data = new char[10];
for(;;) {
try {
fill(data);
handler.write(getName(), data);
Thread.sleep((long)(Math.random()*1000+100));
}
catch(InterruptedException ie) {
break;
}
}
}
// 產生一個A-Z隨機字符,填入char[10]:
private void fill(char[] data) {
char c = (char)(Math.random()*26+'A');
for(int i=0; i<data.length; i++)
data[i] = c;
}
}

最后Main負責啟動這些線程:

package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
DataHandler handler = new DataHandler();
Thread[] ts = new Thread[] {
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new ReadingThread(handler),
new WritingThread(handler),
new WritingThread(handler)
};
for(int i=0; i<ts.length; i++) {
ts[i].start();
}
}
}

我們啟動了5個讀線程和2個寫線程,運行結果如下:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read...
Thread-4 waiting for read...
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read...
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read...
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read...
Thread-5 waiting for write...
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write...
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read...
Thread-3 reads data: AAAAAAAAAA
......

可以看到,每次讀/寫都是完整的原子操作,因為我們每次寫入的都是10個相同字符。并且,每次讀出的都是最近一次寫入的內容。

如果去掉ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {

// store data:
private char[] buffer = "AAAAAAAAAA".toCharArray();

public char[] read(String name) throws InterruptedException {
char[] data = doRead();
System.out.println(name + " reads data: " + new String(data));
return data;
}
public void write(String name, char[] data) throws InterruptedException {
System.out.println(name + " wrote data: " + new String(data));
doWrite(data);
}

private char[] doRead() {
char[] ret = new char[10];
for(int i=0; i<10; i++) {
ret[i] = buffer[i];
sleep(3);
}
return ret;
}
private void doWrite(char[] data) {
for(int i=0; i<10; i++) {
buffer[i] = data[i];
sleep(10);
}
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}

運行結果如下:

Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE

可以看到在Thread-6寫入EEEEEEEEEE的過程中,3個線程讀取的內容是不同的。

思考

java的synchronized提供了最底層的物理鎖,要在synchronized的基礎上,實現自己的邏輯鎖,就必須仔細設計ReadWriteLock。

Q: lock.readLock()為什么不放入try{ } 內?
A: 因為readLock()會拋出InterruptedException,導致readingThreads++不執行,而readUnlock()在 finally{ } 中,導致readingThreads--執行,從而使readingThread狀態出錯。writeLock()也是類似的。

Q: preferWrite有用嗎?
A: 如果去掉preferWrite,線程安全不受影響。但是,如果讀取線程很多,上一個線程還沒有讀取完,下一個線程又開始讀了,就導致寫入線程長時間無法獲得writeLock;如果寫入線程等待的很多,一個接一個寫,也會導致讀取線程長時間無法獲得readLock。preferWrite的作用是讓讀 /寫交替執行,避免由于讀線程繁忙導致寫無法進行和由于寫線程繁忙導致讀無法進行。

Q: notifyAll()換成notify()行不行?
A: 不可以。由于preferWrite的存在,如果一個線程剛讀取完畢,此時preferWrite=true,再notify(),若恰好喚醒的是一個讀線程,則while(writingThreads>0 || (preferWrite && waitingThreads>0))可能為true導致該讀線程繼續等待,而等待寫入的線程也處于wait()中,結果所有線程都處于wait ()狀態,誰也無法喚醒誰。因此,notifyAll()比notify()要來得安全。程序驗證notify()帶來的死鎖:

Thread-0 waiting for read...
Thread-1 waiting for read...
Thread-2 waiting for read...
Thread-3 waiting for read...
Thread-4 waiting for read...
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read...
Thread-1 waiting for read...
Thread-3 waiting for read...
Thread-0 waiting for read...
Thread-4 waiting for read...
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write...
Thread-6 waiting for write...
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read...
(運行到此不動了)

注意到這種死鎖是由于所有線程都在等待別的線程喚醒自己,結果都無法醒過來。這和兩個線程希望獲得對方已有的鎖造成死鎖不同。因此多線程設計的難度遠遠高于單線程應用。