1.Future模式是一種常見的多線程設計模式,用來實現'異步'.其模式的核心在于去除main中的等待時間并使得原本需要等待的時間段可以用于處理其他的業務邏輯,從而充分利用計算機資源.
2.調用方式example:
ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", port));
future.awaitUninterruptibly();
session = future.getSession();
3.相關類圖
4.源碼_IoFuture

/** *//**
* 表示一個異步io操作的completion.
* 可通過IoFutureListener監聽completion
*/

public interface IoFuture
{

/** *//**
* 返回該future關聯的session
*/
IoSession getSession();


/** *//**
* 等待異步操作完成
* 關聯的listener將被通知完成
*/
IoFuture await() throws InterruptedException;


/** *//**
* 等待異步操作完成并指定超時時間
*
* @return <tt>true</tt> if the operation is completed.
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;


/** *//**
* 等待異步操作完成并指定超時時間(毫秒)
*
* @return <tt>true</tt> if the operation is completed.
*/
boolean await(long timeoutMillis) throws InterruptedException;


/** *//**
* 等待異步操作完成且不可中斷
* 關聯的listener將被通知完成
*
* @return the current IoFuture
*/
IoFuture awaitUninterruptibly();


/** *//**
* 等待異步操作完成并指定超時時間
* 不可中斷
*
* @return <tt>true</tt> if the operation is completed.
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);


/** *//**
* 等待異步操作完成并指定超時時間(毫秒)
* 不可中斷
*
* @return <tt>true</tt> if the operation is finished.
*/
boolean awaitUninterruptibly(long timeoutMillis);


/** *//**
* 已廢棄. 替換方法為 {@link #awaitUninterruptibly()}.
*/
@Deprecated
void join();


/** *//**
* 已廢棄. 替換方法為 {@link #awaitUninterruptibly(long)}.
*/
@Deprecated
boolean join(long timeoutMillis);


/** *//**
* 判斷異步操作是否完成
*/
boolean isDone();


/** *//**
* 添加一個監聽Future事件完成的listener
*/
IoFuture addListener(IoFutureListener<?> listener);


/** *//**
* 移除一個已存在的FutureListener
*/
IoFuture removeListener(IoFutureListener<?> listener);
}

1.從源碼看,其提供了三類await方法
1.await,一直等待至操作完成
2.await(long),在指定的超時時間范圍內等待
3.awaitUninterruptibly,等待且不可中斷
2.對外提供了FutureListener.
5.源碼_DefaultIoFuture
1.其實現了接口IoFuture,并提供了默認實現.
2.其內部有一個lock:
private final Object lock;//該lock用于await方法
該lock在DefaultIoFuture構造的時候初始化為this.
public DefaultIoFuture(IoSession session) {
this.session = session;
this.lock = this;
}
3.await()方法實現

public IoFuture await() throws InterruptedException
{

synchronized (lock)
{

while (!ready)
{
waiters++;

try
{
// 等待notify
// 假定尋在死鎖
// 循環檢查潛在的死鎖
lock.wait(DEAD_LOCK_CHECK_INTERVAL);

} finally
{
waiters--;

if (!ready)
{
checkDeadLock();
}
}
}
}
return this;
}
1.ready表示異步操作是否完成{@link isDone()}

public boolean isDone()
{

synchronized (lock)
{
return ready;
}
}

2.DEAD_LOCK_CHECK_INTERVAL為檢查死鎖的周期時間,為5000L,即5s.
3.waiters表示當前正在wait的的線程數目,如果大于0,則表示有線程正在阻塞在wait方法.所以可以用這個變量判斷是否notifyAll.
4.checkDeadLock()為檢查死鎖的方法:
其核心思路是檢查當前調用線程的StackTrace,如果此時調用await的線程是I/O processor thread,則表示死鎖發生(詳見死鎖的發生條件),則拋出IllegalStateException.
5.整體代碼邏輯是在異步操作未完成以前循環的wait(5000L)并在超時后檢查死鎖直到ready.
4.await0()方法實現

private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException
{
long endTime = System.currentTimeMillis() + timeoutMillis;


if (endTime < 0)
{
endTime = Long.MAX_VALUE;
}


synchronized (lock)
{

if (ready)
{
return ready;

} else if (timeoutMillis <= 0)
{
// 這里如果timeoutMillis小于等于0,則直接返回ready flag
return ready;
}

waiters++;


try
{
// 用一個無限循環來實現awaitUninterruptibly

for (;;)
{

try
{
long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
lock.wait(timeOut);

} catch (InterruptedException e)
{
// 這里說明當前線程在wait的時候被中斷了(調用了interrupt).如果可被中斷則繼續向上層拋出InterruptedException.否則catch住不做任何操作,繼續執行 //interruptable為false的時候表示Uninterruptibly.

if (interruptable)
{
// 可中斷則直接向上層拋出異常,則整個方法調用就結束了.反之則會繼續執行catch后代碼
throw e;
}
}


if (ready)
{
return true;
}
// 該判斷主要用來判斷超時

if (endTime < System.currentTimeMillis())
{
return ready;
}
}

} finally
{
waiters--;

if (!ready)
{
checkDeadLock();
}
}
}
} 1.該方法是內部的一個private方法.其中第一個參數指定超時時間,第二個參數指明在wait的時候是否可被中斷.
2.await(long timeout, TimeUnit unit)/await(long timeoutMillis)/awaitUninterruptibly的實現最終都調用了await0.
3.整體代碼的實現即用一個無限循環+捕捉中斷異常來實現awaitUninterruptibly.用endTime來實現超時.
4.仍然進行了潛在死鎖的檢查.
另外有一個問題是每次wait的時間都是Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL).如果timeoutMillis為6s,則實際wait的時間是5 + 5 = 10s.不知道作者的這個設計是什么.
5.await(long timeoutMillis)調用了await0(timeoutMillis, true)/awaitUninterruptibly()調用了await0(Long.MAX_VALUE, false).
5.setValue()方法實現

public void setValue(Object newValue)
{

synchronized (lock)
{
// 僅一次

if (ready)
{
return;
}
// 設置操作結果
result = newValue;
// 操作完成
ready = true;

if (waiters > 0)
{
// 喚醒await的線程
lock.notifyAll();
}
}

// 通知監聽器操作完成
notifyListeners();
}

這個方法是設置異步操作的結果表示操作完成.
6.源碼_IoFutureListener

/** *//**
* 泛型接口,監聽異步io操作完成
*/

1.public interface IoFutureListener<F extends IoFuture> extends EventListener
{

/** *//**
* 自實現的一個CLOSE_LISTENER.用于操作完成執行關閉session
*/

static IoFutureListener<IoFuture> CLOSE = new IoFutureListener<IoFuture>()
{

public void operationComplete(IoFuture future)
{
future.getSession().close(true);
}
};


/** *//**
* 操作完成時的回調接口
*/
void operationComplete(F future);
}

1.DefaultIoFuture的源碼中提供了添加/移除/通知的方法.不過有一個比較疑惑的地方是:其提供了一個firstListener和一個otherListeners,不知道為什么有這樣的需求.
2.DefaultIoFuture#addListener中對是否ready做了判斷.只要ready就會通知.也就是在操作已經完成的情況下添加一個監聽器,則也會執行回調方法.
7.when i/o operation complete?
1.ConnectFuture
{@link TailFilter#sessionCreated},調用了future.setSession(session)->調用setValue.->表示異步connect完成.
2.CloseFuture
{@link DefaultIoFilterChain#fireSessionClosed},調用了session.getCloseFuture().setClosed()->表示異步關閉完成
3.WriteFuture
{@link DefaultIoFilterChain#fireMessageSent},調用了request.getFuture().setWritten()>表示異步寫完成
4.ReadFuture
{@link TailFilter#messageReceived},如果isUseReadOperation{@link IoSessionConfig},則執行setRead(Object message)->表示異步讀完成.
8.總結
本篇介紹了mina內部異步的實現方式Future.著重介紹了await/awaitUninterruptly的實現方法等.
posted on 2013-11-28 17:50
landon 閱讀(2255)
評論(1) 編輯 收藏 所屬分類:
Sources