<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0
    1.NioSocketAcceptor持有一個Selector對象.->調(diào)用bind方法后->AbstractPollingIoAcceptor#bindInternal. 

    protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
                
    // 創(chuàng)建了一個綁定請求的future operation.當(dāng)selector處理注冊的時候,會signal future.
                AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
        
                
    // 將請求加入注冊隊列
                registerQueue.add(request);
        
                
    // 創(chuàng)建acceptor任務(wù)并啟動(acceptor-worker線程),這個是單線程的
                startupAcceptor();
        
                
    // 這塊的細(xì)節(jié)很重要.因為Acceptor任務(wù)是一個while(){selector.select},此時accepor線程因為select操作為阻塞,因為此時沒有任何事件發(fā)生.
                
    // 所以這邊用了一個信號量.在初始化Acceptor這個任務(wù)后并啟動后,釋放這個許可(信號量初始化為1).然后lock.acquire繼續(xù)執(zhí)行.
                try {
                    lock.acquire();
        
                    
    // 這里等待了10毫秒,是要給acceptor-worker線程機(jī)會執(zhí)行任務(wù).即進(jìn)入while(select),即執(zhí)行到selector處
                    Thread.sleep(10);
                    
    // 因為acceptor任務(wù)中的selector此時因為select操作阻塞,所以這里執(zhí)行喚醒selector操作.進(jìn)而可以處理之前加入注冊隊列的請求.
                    wakeup();
                }
     finally {
                    lock.release();
                }

        
                
    // 阻塞,等到注冊隊列的請求被處理完成
                request.awaitUninterruptibly();
                
                
        }

    2.

    private class Acceptor implements Runnable {
            
    public void run() {
                
                
    // 釋放一個許可,使得主線程可以執(zhí)行后續(xù)后續(xù)調(diào)度(喚醒selector).
                lock.release();

                
    // break的條件是之前bind的serversocket全部unbind了.
                while (selectable) {
                    
    try {
                        
                        
    // selector執(zhí)行select.
                        
    // 1.有新連接出現(xiàn)則被喚醒 2.在首次阻塞的時候被主線程wakeup(處理注冊O(shè)P_ACCEPT)
                        int selected = select();

                        
    // registerHandles做的主要事情是將注冊隊列的綁定地址,執(zhí)行NioSocketAcceptor#open.
                        
    // 即(nio的一系列配置)1.ServerSocketChannel.open() 2.channel.configureBlocking(false)
                        
    // 3.ServerSocket socket = channel.socket() 4.socket.setReuseAddress(isReuseAddress())
                        
    // 5.socket.bind(localAddress, getBacklog()) 6.channel.register(selector, SelectionKey.OP_ACCEPT) 向selector注冊Acceptor事件
                        
    // 這里有兩個ServerSocket的參數(shù)可以設(shè)置 reuseAddress/backlog
                        nHandles += registerHandles();

                        
    ..檢查regiser是否成功.如果不成功則break
                        
    ..檢查取消隊列是否為空,如果為空則break(即沒有serversocket監(jiān)聽了,都unbind了).

                        
    // 表明有新連接請求進(jìn)來.
                        if (selected > 0{
                            
    // 處理新連接請求.
                            
    // 1.accept返回new NioSocketSession 2.初始化session 3.將其綁定到processor池(SimpleIoProcessorPool)的一個NioProcessor(SimpleIoProcessorPool#getProcessor,取模)
                            
    // 4.AbstractPollingIoProcessor#add->將session加入NioProcessor的新創(chuàng)建的session隊列并startupProcessor
                            
    // 注:startupProcessor方法做了引用判斷,即一個NioProcessor只會啟動一個Processor任務(wù).(所以對于session的io讀寫也是單線程的.因為session是已經(jīng)綁定了一個固定的NioProcessor中)
                            processHandles(selectedHandles());
                        }


                        
    // 檢查是否調(diào)用了unbind.如果unbind則加入取消隊列.
                        nHandles -= unregisterHandles();
                        
                        
    .
            }


    3.NioProcessor持有一個Selector對象.其初始化的時候會open selector.


    private class Processor implements Runnable {
            
    public void run() {
                
                
    int nSessions = 0;
                
    // 上一次空閑檢查時間
                lastIdleCheckTime = System.currentTimeMillis();

                
    // 無限循環(huán).說明proceeeor會始終占用線程池的一個線程.并可以這樣說,NioProcessor的數(shù)目就是線程池工作線程的數(shù)目.
                for (;;) {
                    
    try {
                        
    // 這里select有一個超時,是為了管理空閑session,超時時間是1s
                        long t0 = System.currentTimeMillis();
                        
    int selected = select(SELECT_TIMEOUT);
                        
    long t1 = System.currentTimeMillis();
                        
    long delta = (t1 - t0);

                        
    //(處理java6的nio的bug)
                        
    // 下面if這段代碼的大致意思是說如果select未超時且select未被喚醒且未有讀寫事件發(fā)生的一種情況.
                        
    // 1.說明可能select被中斷了.->然后檢查是否有channel被close了(如果有的話則key.cancel).如果是的話則繼續(xù)執(zhí)行select.
                        
    // 2.如果檢查發(fā)現(xiàn)沒有channel被close則重新注冊一個新的Selector.
                        
    //(注意這里的檢查是之前NIO的bug.Selector應(yīng)該只在2種情況有返回值,即有網(wǎng)絡(luò)事件發(fā)生或者超時。但是Selector有時卻會在沒有獲得任何selectionKey的情況返回.)
                        
    //(http://bugs.java.com/view_bug.do?bug_id=6693490)(http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933)
                        if ((selected == 0&& !wakeupCalled.get() && (delta < 100)) {
                            
    // Last chance : the select() may have been
                            
    // interrupted because we have had an closed channel.
                            if (isBrokenConnection()) {
                                LOG.warn("Broken connection");

                                
    // we can reselect immediately
                                
    // set back the flag to false
                                wakeupCalled.getAndSet(false);

                                
    continue;
                            }
     else {
                                LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                                
    // Ok, we are hit by the nasty epoll
                                
    // spinning.
                                
    // Basically, there is a race condition
                                
    // which causes a closing file descriptor not to be
                                
    // considered as available as a selected channel, but
                                
    // it stopped the select. The next time we will
                                
    // call select(), it will exit immediately for the same
                                
    // reason, and do so forever, consuming 100%
                                
    // CPU.
                                
    // We have to destroy the selector, and
                                
    // register all the socket on a new one.
                                registerNewSelector();
                            }


                            
    // Set back the flag to false
                            wakeupCalled.getAndSet(false);

                            
    // and continue the loop
                            continue;
                        }


                        
    // 處理新session
                        
    // 1.初始化NioSession.{@link NioProcessor#init},即將channel配置為非阻塞模式并向selector注冊O(shè)P_READ
                        
    // 2.fireSessionCreated/fireSessionOpened兩個事件.(注意這兩個區(qū)別,如果配置了線程模型ExecutorFilter.則sessionOpened事件在該線程模型內(nèi)執(zhí)行.因為其只覆寫了該方法,而沒有覆寫sessionOpened)
                        nSessions += handleNewSessions();

                        updateTrafficMask();

                        
    // 處理讀寫事件(對于已select的session)
                        
    // 1.處理讀的時候,即AbstractPollingIoProcessor#read,讀到的字節(jié)>0則觸發(fā)fireMessageReceived.另外對ReadBufferSize這個參數(shù)做了一些判斷(buffer會分配該大小).(即如果設(shè)置的太大則decrease,設(shè)置的太小則increase,根據(jù)讀到的字節(jié)數(shù)目.所以說為了避開這個判斷,該參數(shù)可設(shè)置在(readByte,2*readByte]這個區(qū)間)
                        
    // 2.處理寫,將session加入flush隊列.
                        if (selected > 0{
                            
    //LOG.debug("Processing "); // This log hurts one of the MDCFilter test
                            process();
                        }


                        
    // 寫未執(zhí)行的請求
                        
    // 1.通過session.write(msg)時,AbstractIoSession#write時->會觸發(fā)fireFilterWrite事件.該觸發(fā)鏈?zhǔn)茄刂鴗ail->header的方向觸發(fā)的.
                        
    // 2.HeadFilter#filterWrite,session上有一個WriteRequestQueue.將WriteRequest加入該隊列.
                        
    // 3.喚醒selector.
                        
    //(注意第一次在write的時候,即writeRequestQueue為空的時候,是直接schedule_flush并wakeup selector(所以第一次也 沒有必要向selecor注冊寫事件,第一次肯定是可寫的).而后續(xù)的寫請求則是直接將請求插入隊列而已.只有再次寫隊列為空的時候則會再次schedule_flush并wakeup.另外如果session的寫請求未執(zhí)行完畢則會向selector注冊寫事件,在可寫的時候依然會繼續(xù)執(zhí)行寫.)
                        long currentTime = System.currentTimeMillis();
                        
    // 1.遍歷flushingSessions隊列. 重置該session schedule flush flag(這個標(biāo)識表示該session有寫的request還未寫完).2.flushNow,從writeRequestQueue依次取出寫請求.
                        
    // 3.maxWrittenBytes = 1.5 * maxReadBufferSize,讀寫公平(注意這里:flushNow的while循環(huán)結(jié)束條件是writtenBytes < maxWrittenBytes.即一次flush不會超過最大寫字節(jié)數(shù).)
                        
    // (其實這個處理就是為了讀寫公平,防止因為寫的數(shù)據(jù)過多而導(dǎo)致read不能得到及時響應(yīng).因為都是在一個processor線程處理的.)
                        
    // 4.如果session中當(dāng)前請求的buffer已發(fā)送完畢,則觸發(fā)fireMessageSent事件.
                       
    // 5.如果session中請求的數(shù)據(jù)未全部發(fā)送完畢(buffer.hasRemaining),則session重新向selector注冊寫事件 OP_WRITE.
                        flush(currentTime);

                        
    // 注意這里:
                        
    // 1.當(dāng)processor正在執(zhí)行read的時候,如果客戶端端掉了連接,則NioProcessor.read這里就會拋出一個io異常:java.io.IOException: 遠(yuǎn)程主機(jī)強(qiáng)迫關(guān)閉了一個現(xiàn)有的連接
                        
    // 2.read這段代碼在try/catch異常的時候:判斷了一下異常如果是ioexception且該異常不是PortUnreachableException或者不是udp相關(guān),則執(zhí)行scheduleRemove->removingSessions.add(session)
                        
    // 而下面這句代碼則是處理removeSessions.->removeNow->對被移除的session進(jìn)行destory處理(close_channel/cancel_key)并清理session的寫隊列,fireSessionDestroyed->fireSessionClosed
                        nSessions -= removeSessions();

                        
    // if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT),即timeout內(nèi)未有讀寫事件發(fā)生則通知空閑
                        
    // 遍歷session 判讀當(dāng)前時間與上次事件發(fā)生時間的差是否大于空閑時間
                        notifyIdleSessions(currentTime);

                        
    .
        }


    4.再談io.
         IO分兩個階段:
       1.通知內(nèi)核準(zhǔn)備數(shù)據(jù)。2.數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到應(yīng)用緩沖區(qū)

       根據(jù)這2點IO類型可以分成:
           1.阻塞IO,在兩個階段上面都是阻塞的。
           2.非阻塞IO,在第1階段,程序不斷的輪詢直到數(shù)據(jù)準(zhǔn)備好,第2階段還是阻塞的
           3.IO復(fù)用,在第1階段,當(dāng)一個或者多個IO準(zhǔn)備就緒時,通知程序,第2階段還是阻塞的,在第1階段還是輪詢實現(xiàn)的,只是所有的IO都集中在一個地方,這個地方進(jìn)行輪詢
           4.信號IO,當(dāng)數(shù)據(jù)準(zhǔn)備完畢的時候,信號通知程序數(shù)據(jù)準(zhǔn)備完畢,第2階段阻塞
           5.異步IO,1,2都不阻塞
          
       當(dāng)然write是從應(yīng)用緩沖區(qū)到內(nèi)核緩沖區(qū).
       2.selector底層基礎(chǔ)實現(xiàn)就應(yīng)該是不斷的輪訓(xùn)內(nèi)核緩沖區(qū)的狀態(tài).
       3.select模型僅僅是輪訓(xùn),知道有IO事件發(fā)生了.但是并不知道是哪些channel.所以只能輪訓(xùn)所有的注冊channel,然后依次判斷讀寫;引入epoll->會把哪個channel發(fā)生了什么io事件直接通知.

    posted on 2014-03-07 17:01 landon 閱讀(1977) 評論(2)  編輯  收藏 所屬分類: ProgramSources

    FeedBack:
    # re: apache-mina-2.07源碼筆記6-nio細(xì)節(jié)
    2014-03-09 11:34 | 鵬達(dá)鎖業(yè)
    給力支持 博主。。。。。。贊一個

      回復(fù)  更多評論
      
    # re: apache-mina-2.07源碼筆記6-nio細(xì)節(jié)
    2015-11-07 16:53 | qwert
    樓主大贊,分析的詳細(xì)多了,比其他的  回復(fù)  更多評論
      
    主站蜘蛛池模板: 久久综合九色综合97免费下载| 精品一区二区三区免费观看| 久久精品国产亚洲av品善| 看一级毛片免费观看视频| www免费插插视频| 免费国产黄网站在线观看可以下载| AV大片在线无码永久免费| 大学生高清一级毛片免费| 亚洲伦乱亚洲h视频| 久久亚洲中文字幕精品有坂深雪| 亚洲av无码片区一区二区三区 | 亚洲av综合av一区| 亚洲国产中文在线二区三区免| 国产成人精品日本亚洲语音 | 久久这里只有精品国产免费10| 免费va在线观看| 亚洲午夜视频在线观看| 亚洲色中文字幕在线播放| 成年女人A毛片免费视频| 又黄又爽又成人免费视频| 亚洲精品成人区在线观看| 久久久久久亚洲Av无码精品专口| 亚洲色成人WWW永久在线观看| 精品无码一级毛片免费视频观看| 黄网站色在线视频免费观看| 免费一级毛片在线观看| 亚洲精品在线观看视频| 人人狠狠综合久久亚洲| 久久这里只精品热免费99| 国产一级淫片免费播放| 亚洲天堂久久精品| 免费夜色污私人影院网站| 又大又硬又爽又粗又快的视频免费| 免费人妻无码不卡中文字幕18禁| 亚洲一区免费观看| 高潮内射免费看片| 免费观看的毛片大全| 亚洲精品国产字幕久久不卡| 亚洲乱亚洲乱妇24p| 亚洲视频在线免费播放| 亚洲午夜无码AV毛片久久|