<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0

    1.Future模式是一種常見的多線程設計模式,用來實現'異步'.其模式的核心在于去除main中的等待時間并使得原本需要等待的時間段可以用于處理其他的業務邏輯,從而充分利用計算機資源.

    2.調用方式example:
                ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", port));
                future.awaitUninterruptibly();
                session = future.getSession();

    3.相關類圖

        

    4.源碼_IoFuture

    /**
     * 表示一個異步io操作的completion.
     * 可通過IoFutureListener監聽completion
     
    */

    public interface IoFuture {
        
    /**
         * 返回該future關聯的session
         
    */

        IoSession getSession();

        
    /**
         * 等待異步操作完成
         * 關聯的listener將被通知完成
         
    */

        IoFuture await() 
    throws InterruptedException;

        
    /**
         * 等待異步操作完成并指定超時時間
         *
         * 
    @return <tt>true</tt> if the operation is completed.
         
    */

        
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

        
    /**
         * 等待異步操作完成并指定超時時間(毫秒)
         *
         * 
    @return <tt>true</tt> if the operation is completed.
         
    */

        
    boolean await(long timeoutMillis) throws InterruptedException;

        
    /**
         * 等待異步操作完成且不可中斷
         * 關聯的listener將被通知完成
         * 
         * 
    @return the current IoFuture
         
    */

        IoFuture awaitUninterruptibly();

        
    /**
         * 等待異步操作完成并指定超時時間
         * 不可中斷
         *
         * 
    @return <tt>true</tt> if the operation is completed.
         
    */

        
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

        
    /**
         * 等待異步操作完成并指定超時時間(毫秒)
         * 不可中斷
         *
         * 
    @return <tt>true</tt> if the operation is finished.
         
    */

        
    boolean awaitUninterruptibly(long timeoutMillis);

        
    /**
         * 已廢棄. 替換方法為 {
    @link #awaitUninterruptibly()}.
         
    */

        @Deprecated
        
    void join();

        
    /**
         * 已廢棄. 替換方法為 {
    @link #awaitUninterruptibly(long)}.
         
    */

        @Deprecated
        
    boolean join(long timeoutMillis);

        
    /**
         * 判斷異步操作是否完成
         
    */

        
    boolean isDone();

        
    /**
         * 添加一個監聽Future事件完成的listener
         
    */

        IoFuture addListener(IoFutureListener
    <?> listener);

        
    /**
         * 移除一個已存在的FutureListener
         
    */

        IoFuture removeListener(IoFutureListener
    <?> listener);
    }

        1.從源碼看,其提供了三類await方法

              1.await,一直等待至操作完成
              2.await(long),在指定的超時時間范圍內等待
              3.awaitUninterruptibly,等待且不可中斷

         2.對外提供了FutureListener.

    5.源碼_DefaultIoFuture

         1.其實現了接口IoFuture,并提供了默認實現.

         2.其內部有一個lock:
             private final Object lock;//該lock用于await方法

             該lock在DefaultIoFuture構造的時候初始化為this.

               public DefaultIoFuture(IoSession session) {
                  this.session = session;
                 this.lock = this;
                 }

         3.await()方法實現

    public IoFuture await() throws InterruptedException {
             
    synchronized (lock) {
                 
    while (!ready) {
                     waiters
    ++;
                     
    try {
                         
    // 等待notify
                         
    // 假定尋在死鎖
                         
    // 循環檢查潛在的死鎖
                         lock.wait(DEAD_LOCK_CHECK_INTERVAL);
                     }
     finally {
                         waiters
    --;
                         
    if (!ready) {
                             checkDeadLock();
                         }

                          }

                     }

                    }

                
    return this;
        }


            1.ready表示異步操作是否完成
    {@link isDone()}
       

    public boolean isDone() {
                
    synchronized (lock) {
                   
    return ready;
                }

           }

     
             2.DEAD_LOCK_CHECK_INTERVAL為檢查死鎖的周期時間,為5000L,即5s.
             3.waiters表示當前正在wait的的線程數目,如果大于0,則表示有線程正在阻塞在wait方法.所以可以用這個變量判斷是否notifyAll.

             4.checkDeadLock()為檢查死鎖的方法:
               其核心思路是檢查當前調用線程的StackTrace,如果此時調用await的線程是I/O processor thread,則表示死鎖發生(詳見死鎖的發生條件),則拋出IllegalStateException.

             5.整體代碼邏輯是在異步操作未完成以前循環的wait(5000L)并在超時后檢查死鎖直到ready.

         4.await0()方法實現

        private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
                       
    long endTime = System.currentTimeMillis() + timeoutMillis;

                       
    if (endTime < 0) {
                            endTime
    = Long.MAX_VALUE;
                        }


                   
    synchronized (lock) {
                       
    if (ready) {
                           
    return ready;
                        }
    else if (timeoutMillis <= 0) {
               
    // 這里如果timeoutMillis小于等于0,則直接返回ready flag
                            return ready;
                        }


                        waiters
    ++;

                       
    try {
                 
    // 用一個無限循環來實現awaitUninterruptibly
                            for (;;) {
                               
    try {
                                   
    long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
                                    lock.wait(timeOut);
                                }
    catch (InterruptedException e) {
                   
    // 這里說明當前線程在wait的時候被中斷了(調用了interrupt).如果可被中斷則繼續向上層拋出InterruptedException.否則catch住不做任何操作,繼續執行                //interruptable為false的時候表示Uninterruptibly.
                                    if (interruptable) {
                     
    // 可中斷則直接向上層拋出異常,則整個方法調用就結束了.反之則會繼續執行catch后代碼
                                        throw e;
                                    }

                                }


                               
    if (ready) {
                                   
    return true;
                                }

               
                      
    // 該判斷主要用來判斷超時
                                if (endTime < System.currentTimeMillis()) {
                                   
    return ready;
                                }

                            }

                        }
    finally {
                            waiters
    --;
                           
    if (!ready) {
                                checkDeadLock();
                            }

                        }

                    }

                }

              1.該方法是內部的一個private方法.其中第一個參數指定超時時間,第二個參數指明在wait的時候是否可被中斷.
               2.await(long timeout, TimeUnit unit)/await(long timeoutMillis)/awaitUninterruptibly的實現最終都調用了await0.
               3.整體代碼的實現即用一個無限循環+捕捉中斷異常來實現awaitUninterruptibly.用endTime來實現超時.

              4.仍然進行了潛在死鎖的檢查.
                另外有一個問題是每次wait的時間都是Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL).如果timeoutMillis為6s,則實際wait的時間是5 + 5  = 10s.不知道作者的這個設計是什么.

              5.await(long timeoutMillis)調用了await0(timeoutMillis, true)/awaitUninterruptibly()調用了await0(Long.MAX_VALUE, false).

         5.setValue()方法實現

    public void setValue(Object newValue) {
             
    synchronized (lock) {
                 
    // 僅一次
                  if (ready) {
                     
    return;
                  }

      
      
    // 設置操作結果
                  result = newValue;
      
    // 操作完成
                  ready = true;
                 
    if (waiters > 0) {
         
    // 喚醒await的線程
                      lock.notifyAll();
                  }

              }


            
    // 通知監聽器操作完成
              notifyListeners();
          }

        
              這個方法是設置異步操作的結果表示操作完成.

    6.源碼_IoFutureListener

    /**
     * 泛型接口,監聽異步io操作完成
    */

    1.public interface IoFutureListener<extends IoFuture> extends EventListener {
        
    /**
         *  自實現的一個CLOSE_LISTENER.用于操作完成執行關閉session
         
    */

        
    static IoFutureListener<IoFuture> CLOSE = new IoFutureListener<IoFuture>() {
            
    public void operationComplete(IoFuture future) {
                future.getSession().close(
    true);
            }

        }
    ;

        
    /** 
         * 操作完成時的回調接口
         
    */

        
    void operationComplete(F future);
    }


        1.DefaultIoFuture的源碼中提供了添
    加/移除/通知的方法.不過有一個比較疑惑的地方是:其提供了一個firstListener和一個otherListeners,不知道為什么有這樣的需求.
        2.DefaultIoFuture#addListener中對是否ready做了判斷.只要ready就會通知.也就是在操作已經完成的情況下添加一個監聽器,則也會執行回調方法.


    7.
    when i/o operation complete?

         1.ConnectFuture
          {@link TailFilter#sessionCreated},調用了future.setSession(session)->調用setValue.->表示異步connect完成.
         2.CloseFuture
          {@link DefaultIoFilterChain#fireSessionClosed},調用了session.getCloseFuture().setClosed()->表示異步關閉完成
         3.WriteFuture
          {@link DefaultIoFilterChain#fireMessageSent},調用了request.getFuture().setWritten()>表示異步寫完成
         4.ReadFuture
          {@link TailFilter#messageReceived},如果isUseReadOperation{@link IoSessionConfig},則執行setRead(Object message)->表示異步讀完成.

    8.總結
        本篇介紹了mina內部異步的實現方式Future.著重介紹了await/awaitUninterruptly的實現方法等.

    posted on 2013-11-28 17:50 landon 閱讀(2255) 評論(1)  編輯  收藏 所屬分類: Sources

    FeedBack:
    # re: apache-mina-2.07源碼筆記3-Future
    2013-11-29 15:38 | 馮磊
    看著不錯 感謝分享  回復  更多評論
      
    主站蜘蛛池模板: 国产午夜免费福利红片| 久久精品国产亚洲AV麻豆王友容| 九九久久精品国产免费看小说| 亚洲小说区图片区另类春色| 亚洲一区免费在线观看| 在线观看亚洲精品专区| 亚洲AV无码久久精品蜜桃| 久久不见久久见中文字幕免费| 丰满妇女做a级毛片免费观看| 1区1区3区4区产品亚洲| 暖暖免费高清日本一区二区三区 | 人人狠狠综合久久亚洲| 亚洲色欲色欲www在线丝| 久久午夜免费视频| 在线观看H网址免费入口| 国产成人亚洲精品无码AV大片| 久久精品国产96精品亚洲| 在线观看免费亚洲| 中文字幕在线免费| eeuss影院www天堂免费| 亚洲人成77777在线观看网| 国产成A人亚洲精V品无码| 日本特黄特色免费大片| 日本一卡精品视频免费| 一级毛片不卡免费看老司机| 在线aⅴ亚洲中文字幕| 成年女人18级毛片毛片免费观看| 不卡视频免费在线观看| 亚洲天然素人无码专区| 亚洲人成在线观看| 成人亚洲性情网站WWW在线观看| 成人免费a级毛片无码网站入口| 久久久久成人精品免费播放动漫| 污网站在线免费观看| 久久精品国产亚洲av麻豆图片| 亚洲AV成人精品网站在线播放| 亚洲男女内射在线播放| 日韩视频免费一区二区三区| 99久久综合国产精品免费| 久久久久高潮毛片免费全部播放| 久久成人18免费网站|