原帖地址:http://eyesmore.javaeye.com/blog/243648
在多線程交互的中2,經常有一個線程需要得到另個一線程的計算結果,我們常用的是Future異步模式來加以解決。
Future顧名思意,有點像期貨市場的“期權”,是“對未來的一種憑證”,例如當我們買了某個房地產開發商的期房,交錢之后,開發商會給我們一個憑證
(期權),這個憑證告訴我們等明年某個時候拿這個憑證就可以拿到我們所需要的房子,但是現在房子還沒建好。市場上之所以有“期貨”,也正由于有這種需求,
才有這種供給。
這種應用在GUI上用的比較多,在設計模式中一般稱為“虛擬代理模式”。
例如:現在有個這樣的需求,Client向Server提交一個Request(int count,char
c),希望獲取一個由count個字符c構造出來的字符串。比如發送Request(10,'K'),那么反饋字符串“KKKKKKKKKK”,但是我們
假設這個生成字符串的過程很費時間。
于是,為了獲取比較好的交互性,我們的Server收到請求后,先構造一個FutureData,并把這個所謂的“期權(未來憑證)”反饋給
Client;于此同時,通過另一個并發線程去構造一個真正的字符串RealData,并在構造完畢后,RealData給FutureData報告一個
消息,說數據(期房)已經準備好了,此時Client可以通過期權拿到期房,但是假如我們的Client比較著急,還沒等房子假好的時,就想要房子,怎么
辦呢?這個時候我們可以阻塞Client所在的線程,讓Client等待,直到最后RealData通知FutureData說房子好了,才返回。
這里的要點:
(1)Server先給Client一個“期
權”,同時開一個線程去干活建房子(未來的“現房”);
(2)當“現房”RealData準備好了的
時候,如何告訴FutureData說已經準備好了。(本處采用“回調過程”(借用觀察者模式,來實現回調))
(3)如果客戶比較著急,現房還沒準備好的時
候,就要取房,怎么辦? 本處采用“阻塞”。
Data(公共數據接口)
- package com.umpay.future;
-
- public interface Data {
- public abstract String getContent();
- }
package com.umpay.future;
public interface Data {
public abstract String getContent();
}
FutureData(期權)
- package com.umpay.future.extend;
-
- import java.util.Observable;
- import java.util.Observer;
-
- import com.umpay.future.Data;
-
- public class FutureData2 implements Data,Observer {
-
- /**
- * 存
放真實數據,并且標志真正的數據是否已經準備完畢
- * 被多線程享受
- * 如果realData2==null,表示數據還準備好
- * */
- private volatile RealData2 realData2 = null;
- /**
- * 查
看真正的數據是否準備完畢
- * */
- public boolean isFinished() {
- return realData2 != null;
- }
-
- /**
- * 如
果數據已經準備好,則返回真正的數據;
- * 否
則,阻塞調用線程,直到數據準備完畢后,才返回真實數據;
- * */
- public String getContent() {
- synchronized (mutex) {
- while(!isFinished()) {//只要數據沒有準備完畢,就阻塞調用線程
- try {
- mutex.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return realData2.getContent();
- }
- }
-
- /**
- * 當 RealData2 準
備完數據后,RealData2 應該通知 FutureData2 數據準備完畢。
- * 并在輸入參數 realData 傳入真實數據,在參數 event 傳入事件(比如數據如期準備好
了,或出了什么異常)
- *
- * @param realData 真實的數據
- * @param event 事件類型
- * */
- public void update(Observable realData, Object event) {
- System.out.println("通知...."+event);
- if(!(realData instanceof RealData2)) {
- throw new IllegalArgumentException("主題的數據類型必須是RealData2");
- }
- if(!(event instanceof String)) {
- throw new IllegalArgumentException("事件的數據類型必須是String");
- }
- synchronized (mutex) {
- if(isFinished()) {
- mutex.notifyAll();
- return;//如果數據
已經準備好了,直接返回.
- }
- if("Finished".equals(event)) {
- realData2 = (RealData2)realData;//數據準備好了的時候,便可以通知數據準備好了
- mutex.notifyAll();//喚醒被阻塞的線程
- }
- }
- }
-
- private Object mutex = new Object();
- }
package com.umpay.future.extend;
import java.util.Observable;
import java.util.Observer;
import com.umpay.future.Data;
public class FutureData2 implements Data,Observer {
/**
* 存放真實數據,并且標志真正的數據是否已經準備完畢
* 被多線程享受
* 如果realData2==null,表示數據還準備好
* */
private volatile RealData2 realData2 = null;
/**
* 查看真正的數據是否準備完畢
* */
public boolean isFinished() {
return realData2 != null;
}
/**
* 如果數據已經準備好,則返回真正的數據;
* 否則,阻塞調用線程,直到數據準備完畢后,才返回真實數據;
* */
public String getContent() {
synchronized (mutex) {
while(!isFinished()) {//只要數據沒有準備完畢,就阻塞調用線程
try {
mutex.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData2.getContent();
}
}
/**
* 當 RealData2 準備完數據后,RealData2 應該通知 FutureData2 數據準備完畢。
* 并在輸入參數 realData 傳入真實數據,在參數 event 傳入事件(比如數據如期準備好了,或出了什么異常)
*
* @param realData 真實的數據
* @param event 事件類型
* */
public void update(Observable realData, Object event) {
System.out.println("通知...."+event);
if(!(realData instanceof RealData2)) {
throw new IllegalArgumentException("主題的數據類型必須是RealData2");
}
if(!(event instanceof String)) {
throw new IllegalArgumentException("事件的數據類型必須是String");
}
synchronized (mutex) {
if(isFinished()) {
mutex.notifyAll();
return;//如果數據已經準備好了,直接返回.
}
if("Finished".equals(event)) {
realData2 = (RealData2)realData;//數據準備好了的時候,便可以通知數據準備好了
mutex.notifyAll();//喚醒被阻塞的線程
}
}
}
private Object mutex = new Object();
}
RealData(實際數據)
- package com.umpay.future.extend;
-
- import java.util.Observable;
-
- import com.umpay.future.Data;
-
- public class RealData2 extends Observable implements Data {
-
- private String content;
-
- public RealData2() {
-
- }
-
- public void createRealData2(int count, char c) {
- System.out.println(" making RealData(" + count + ", " + c
- + ") BEGIN");
- char[] buffer = new char[count];
- for (int i = 0; i < count; i++) {
- buffer[i] = c;
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
- System.out.println(" making RealData(" + count + ", " + c
- + ") END");
- this.content = new String(buffer);
-
- //真實數據準備完畢了,通知FutureData2說數據已經準備好了.
- setChanged();//必須先設置本對象的狀態發生了變化,并且通知所有的觀察者
- notifyObservers("Finished");
- }
-
-
- public String getContent() {
- return content;
- }
- }
package com.umpay.future.extend;
import java.util.Observable;
import com.umpay.future.Data;
public class RealData2 extends Observable implements Data {
private String content;
public RealData2() {
}
public void createRealData2(int count, char c) {
System.out.println(" making RealData(" + count + ", " + c
+ ") BEGIN");
char[] buffer = new char[count];
for (int i = 0; i < count; i++) {
buffer[i] = c;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
System.out.println(" making RealData(" + count + ", " + c
+ ") END");
this.content = new String(buffer);
//真實數據準備完畢了,通知FutureData2說數據已經準備好了.
setChanged();//必須先設置本對象的狀態發生了變化,并且通知所有的觀察者
notifyObservers("Finished");
}
public String getContent() {
return content;
}
}
服務端代碼:
- package com.umpay.future.extend;
-
- import com.umpay.future.Data;
-
- public class HostServer2 {
-
- public Data request(final int count, final char c) {
- System.out.println(" request(" + count + ", " + c + ") BEGIN");
-
- // (1) 建立FutureData的實體
- final FutureData2 future2 = new FutureData2();
-
- // (2) 為了建立RealData的實體,啟動新的線程
- new Thread() {
- public void run() {
- RealData2 realdata2 = new RealData2();
- realdata2.addObserver(future2);//以便當RealData2把數據準備完畢后,通過該回調口子,通知FutureData2表示數據已經貯備好了
- realdata2.createRealData2(count, c);
- }
- }.start();
-
- System.out.println(" request(" + count + ", " + c + ") END");
-
- // (3) 取回FutureData實體,作為傳回值
- return future2;
- }
-
- }
package com.umpay.future.extend;
import com.umpay.future.Data;
public class HostServer2 {
public Data request(final int count, final char c) {
System.out.println(" request(" + count + ", " + c + ") BEGIN");
// (1) 建立FutureData的實體
final FutureData2 future2 = new FutureData2();
// (2) 為了建立RealData的實體,啟動新的線程
new Thread() {
public void run() {
RealData2 realdata2 = new RealData2();
realdata2.addObserver(future2);//以便當RealData2把數據準備完畢后,通過該回調口子,通知FutureData2表示數據已經貯備好了
realdata2.createRealData2(count, c);
}
}.start();
System.out.println(" request(" + count + ", " + c + ") END");
// (3) 取回FutureData實體,作為傳回值
return future2;
}
}
客戶端代碼:
- package com.umpay.future;
-
- import com.umpay.future.extend.HostServer2;
-
- public class MainClient {
- public static void main(String[] args) {
- // testHostServer();
- testHostServer2();
- }
-
- static void testHostServer() {
- System.out.println("main BEGIN");
- HostServer hostServer = new HostServer();
- Data data1 = hostServer.request(10, 'A');
- Data data2 = hostServer.request(20, 'B');
- Data data3 = hostServer.request(30, 'C');
-
- System.out.println("main otherJob BEGIN");
- // try {
- // Thread.sleep(2000);
- // } catch (InterruptedException e) {
- // }
- System.out.println("main otherJob END");
-
- System.out.println("data1 = " + data1.getContent());
- System.out.println("data2 = " + data2.getContent());
- System.out.println("data3 = " + data3.getContent());
- System.out.println("main END");
-
- }
-
- static void testHostServer2() {
- System.out.println("main BEGIN");
- HostServer2 hostServer2 = new HostServer2();
- Data data1 = hostServer2.request(10, 'A');
- Data data2 = hostServer2.request(20, 'B');
- Data data3 = hostServer2.request(30, 'C');
-
- System.out.println("main otherJob BEGIN");
- // try {
- // Thread.sleep(2000);
- // } catch (InterruptedException e) {
- // }
- System.out.println("main otherJob END");
-
- System.out.println("data1 = " + data1.getContent());
- System.out.println("data2 = " + data2.getContent());
- System.out.println("data3 = " + data3.getContent());
- System.out.println("main END");
-
- }
- }
package com.umpay.future;
import com.umpay.future.extend.HostServer2;
public class MainClient {
public static void main(String[] args) {
// testHostServer();
testHostServer2();
}
static void testHostServer() {
System.out.println("main BEGIN");
HostServer hostServer = new HostServer();
Data data1 = hostServer.request(10, 'A');
Data data2 = hostServer.request(20, 'B');
Data data3 = hostServer.request(30, 'C');
System.out.println("main otherJob BEGIN");
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// }
System.out.println("main otherJob END");
System.out.println("data1 = " + data1.getContent());
System.out.println("data2 = " + data2.getContent());
System.out.println("data3 = " + data3.getContent());
System.out.println("main END");
}
static void testHostServer2() {
System.out.println("main BEGIN");
HostServer2 hostServer2 = new HostServer2();
Data data1 = hostServer2.request(10, 'A');
Data data2 = hostServer2.request(20, 'B');
Data data3 = hostServer2.request(30, 'C');
System.out.println("main otherJob BEGIN");
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// }
System.out.println("main otherJob END");
System.out.println("data1 = " + data1.getContent());
System.out.println("data2 = " + data2.getContent());
System.out.println("data3 = " + data3.getContent());
System.out.println("main END");
}
}