<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0
    -

    1.源碼包結構


    2.測試線程

         1.啟動mina-exampleechoserver#Main

         2.啟動jconsole查看線程:

             1.NioSocketAcceptor


           RunnableAbstractPollingIoAcceptor$Acceptor

            
    3.啟動com.game.landon.entrance.EchoClient,連接已啟動的echoserver.

             ->此時查看線程->多了一個
                 2.NioProcessor        

            

         RunnableAbstractPollingIoProcessor$Processor

        4.兩個線程池來源:
              1.AbstractIoService(IoSessionConfig sessionConfig,Executor executor)
                   該構造函數中判斷
                   if(executor == null){
                        this.executor = Executors.newCachedThreadPool();
                   }
       詳見源碼.
          
               2.SimpleIoProcessorPool
                    
    其構造函數判斷同上

    3.內部基本流程
         1.SocketAcceptor acceptor = new NioSocketAcceptor();

              1.初始化NioSocketAcceptor線程池 {@link AbstractIoService }
              2.初始化NioProcessor線程池  {@link SimpleIoProcessorPool }
              3. NioSocketAcceptor#init
                初始化Selector:selector = Selector.open();

         2.acceptor.bind(new InetSocketAddress(PORT));
              1.AbstractPollingIoAcceptor#bindInternal
                   #startupAcceptor
                    >NioSocketAcceptor線程池執行Acceptor這個任務.

        //1.startupAcceptor啟動的時候也會判斷是否已存在Acceptor這個任務,如果不存在才會創建.
        
    // 2.Accetpor這個任務結束條件:所有bind的端口unbind->也會將Acceptor引用置為null.
        
    // 3.每個NioSocketAcceptor只有一個selector/且只對應一個Acceptor任務,即只有一個Acceptor線程.所以我們可以說Acceptor就是單線程的.(即便是一個CachedThreadPool)


        3.Acceptor#run
              1.while(selectable)
                   1.selector.select

                   2.#registerHandles
                       ->NioSocketAcceptor#open
                       設置socket選項并向selector注冊OP_ACCEPT事件

                   3.#processHandles
                        1.NioSocketAcceptor#accept->返回NioSocketSession
                        2.SimpleIoProcessorPool#add
                             1.根據sessionId將session與pool中的某個NioProcessor綁定 {@link SimpleIoProcessorPool#getProcessor}
                             2.AbstractPollingIoProcessor#add
                             3. AbstractPollingIoProcessor#startupProcessor
                                  ->NioProcessor線程池執行Processor這個任務

        // 從這段代碼看出:
           
    //1.Processor這個任務只會創建一次.即每一個NioProcessor對象最多擁有一個Processor
           
    // 2.每個NioProcessor只會向線程池提交一次Processor任務.而Processor任務是一個無限循環的任務.
           
    // 也可以說,每個Processor就占據著線程池的一個線程.->即每個NioProcessor對象對應線程池中的一個線程
           
    // 3.而session是與某個固定的NioProcessor綁定的(取模)->也就是說每個session的處理都是單線程的.(均在NioProcessor的唯一Processor線程執行)
           
    // 4.public NioSocketAcceptor(int processorCount)構造中可指定processor的數目,其實最終是指定CacheThreadPool中多少用于prossor的線程數目.
            
    //(默認:private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1)
             // 5.每個NioProcessor中只有一個selector.
            private void startupProcessor() {
                Processor processor = processorRef.get();

               
    if (processor == null) {
                    processor = new Processor();

                   
    if (processorRef.compareAndSet(null, processor)) {
                    executor.execute(new NamePreservingRunnable(processor, threadName));
                    }

                }


               
    // Just stop the select() and start it again, so that the processor
               
    // can be activated immediately.
                wakeup();
                }

     

    /**
         * This private class is used to accept incoming connection from
         * clients. It's an infinite loop, which can be stopped when all
         * the registered handles have been removed (unbound).
         
    */

        
    private class Acceptor implements Runnable
        
    {
            
    public void run()
            
    {
                
    int nHandles = 0;
                lastIdleCheckTime = System.currentTimeMillis();

                
    // Release the lock
                lock.release();

                
    while ( selectable )
                
    {
                    
    try
                    
    {
                        
    int selected = select( SELECT_TIMEOUT );

                        nHandles += registerHandles();

                        
    if ( nHandles == 0 )
                        
    {
                            
    try
                            
    {
                                lock.acquire();

                                
    if ( registerQueue.isEmpty() && cancelQueue.isEmpty() )
                                
    {
                                    acceptor = null;
                                    
    break;
                                }

                            }

                            
    finally
                            
    {
                                lock.release();
                            }

                        }


                        
    if ( selected > 0 )
                        
    {
                            processReadySessions( selectedHandles() );
                        }


                        
    long currentTime = System.currentTimeMillis();
                        flushSessions( currentTime );
                        nHandles -= unregisterHandles();

                        notifyIdleSessions( currentTime );
                    }

                    
    catch ( ClosedSelectorException cse )
                    
    {
                        
    // If the selector has been closed, we can exit the loop
                        break;
                    }

                    
    catch ( Exception e )
                    
    {
                        ExceptionMonitor.getInstance().exceptionCaught( e );

                        
    try
                        
    {
                            Thread.sleep( 1000 );
                        }

                        
    catch ( InterruptedException e1 )
                        
    {
                        }

                    }

                }


                
    if ( selectable && isDisposing() )
                
    {
                    selectable = false;
                    
    try
                    
    {
                        destroy();
                    }

                    
    catch ( Exception e )
                    
    {
                        ExceptionMonitor.getInstance().exceptionCaught( e );
                    }

                    
    finally
                    
    {
                        disposalFuture.setValue( true );
                    }

                }

            }

        }

      
        2.Processor#run
           1.for(;;)
                1.select(SELECT_TIMEOUT)
                2.#handleNewSessions
                     1. AbstractPollingIoProcessor#addNow

                     2. AbstractPollingIoProcessor#init
                          NioProcessor#init->session.getChannel向selector注冊OP_READ事件

                    3.#updateTrafficmask

                    4.#process
                         //process reads/process writes

                    5.#flush
                    6.#nofifyIdleSessions

    /**
         * The main loop. This is the place in charge to poll the Selector, and to
         * process the active sessions. It's done in
         * - handle the newly created sessions
         * -
        
    */

       
    private class Processor implements Runnable {
           
    public void run() {
               
    assert (processorRef.get() == this);

               
    int nSessions = 0;
                lastIdleCheckTime = System.currentTimeMillis();

               
    for (;;) {
                   
    try {
                       
    // This select has a timeout so that we can manage
                       
    // idle session when we get out of the select every
                       
    // second. (note : this is a hack to avoid creating
                       
    // a dedicated thread).
                        long t0 = System.currentTimeMillis();
                       
    int selected = select(SELECT_TIMEOUT);
                       
    long t1 = System.currentTimeMillis();
                       
    long delta = (t1 - t0);

                       
    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                           
    // Last chance : the select() may have been
                           
    // interrupted because we have had an closed channel.
                            if (isBrokenConnection()) {
                                LOG.warn("Broken connection");

                               
    // we can reselect immediately
                               
    // set back the flag to false
                                wakeupCalled.getAndSet(false);

                               
    continue;
                            }
    else {
                                LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                               
    // Ok, we are hit by the nasty epoll
                               
    // spinning.
                               
    // Basically, there is a race condition
                               
    // which causes a closing file descriptor not to be
                               
    // considered as available as a selected channel, but
                               
    // it stopped the select. The next time we will
                               
    // call select(), it will exit immediately for the same
                               
    // reason, and do so forever, consuming 100%
                               
    // CPU.
                               
    // We have to destroy the selector, and
                               
    // register all the socket on a new one.
                                registerNewSelector();
                            }


                           
    // Set back the flag to false
                            wakeupCalled.getAndSet(false);

                           
    // and continue the loop
                            continue;
                        }


                       
    // Manage newly created session first
                        nSessions += handleNewSessions();

                        updateTrafficMask();

                       
    // Now, if we have had some incoming or outgoing events,
                       
    // deal with them
                        if (selected > 0) {
                           
    //LOG.debug("Processing "); // This log hurts one of the MDCFilter test
                            process();
                        }


                       
    // Write the pending requests
                        long currentTime = System.currentTimeMillis();
                        flush(currentTime);

                       
    // And manage removed sessions
                        nSessions -= removeSessions();

                       
    // Last, not least, send Idle events to the idle sessions
                        notifyIdleSessions(currentTime);

                       
    // Get a chance to exit the infinite loop if there are no
                       
    // more sessions on this Processor
                        if (nSessions == 0) {
                            processorRef.set(null);

                           
    if (newSessions.isEmpty() && isSelectorEmpty()) {
                               
    // newSessions.add() precedes startupProcessor
                                assert (processorRef.get() != this);
                               
    break;
                            }


                           
    assert (processorRef.get() != this);

                           
    if (!processorRef.compareAndSet(null, this)) {
                               
    // startupProcessor won race, so must exit processor
                                assert (processorRef.get() != this);
                               
    break;
                            }


                           
    assert (processorRef.get() == this);
                        }


                       
    // Disconnect all sessions immediately if disposal has been
                       
    // requested so that we exit this loop eventually.
                        if (isDisposing()) {
                           
    for (Iterator<S> i = allSessions(); i.hasNext();) {
                                scheduleRemove(i.next());
                            }


                            wakeup();
                        }

                    }
    catch (ClosedSelectorException cse) {
                       
    // If the selector has been closed, we can exit the loop
                        break;
                    }
    catch (Throwable t) {
                        ExceptionMonitor.getInstance().exceptionCaught(t);

                       
    try {
                            Thread.sleep(1000);
                        }
    catch (InterruptedException e1) {
                            ExceptionMonitor.getInstance().exceptionCaught(e1);
                        }

                    }

                }


               
    try {
                   
    synchronized (disposalLock) {
                       
    if (disposing) {
                            doDispose();
                        }

                    }

                }
    catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);
                }
    finally {
                    disposalFuture.setValue(
    true);
                }

            }

        }

     

     
    4.相關類圖關系

        1.IoService關系類圖

        2.IoProcessor關系類圖
        



        




        3.IoSession關系類圖 
            


     5.總結:
        本篇只是引入篇,著重介紹了mina2內部的兩個acceptor線程池和processor線程池.關于nio相關請看我之前的文章.

    http://www.tkk7.com/landon/archive/2013/08/16/402947.html


     

    posted on 2013-11-18 17:24 landon 閱讀(2049) 評論(2)  編輯  收藏 所屬分類: Sources

    FeedBack:
    # re: apache-mina-2.07源碼筆記1-初步
    2013-11-18 18:41 | foo
    你這筆記可讀性也太差了  回復  更多評論
      
    # re: apache-mina-2.07源碼筆記1-初步[未登錄]
    2013-11-19 14:26 | landon
    確實有點.不過自己明白即可.哈哈.@foo
      回復  更多評論
      
    主站蜘蛛池模板: 91禁漫免费进入| 亚洲日本国产综合高清| 国产精品免费视频网站| 免费无遮挡无码永久视频| 一区二区三区精品高清视频免费在线播放| 亚洲大片免费观看| 国产亚洲人成网站观看| 亚洲成人高清在线| 无码国模国产在线观看免费| 中文字幕无码播放免费| 日本免费人成网ww555在线| 一级一级毛片免费播放| 亚洲av综合日韩| 亚洲精品无码专区在线| 亚洲 欧洲 日韩 综合在线| 亚洲精品**中文毛片| 亚洲av网址在线观看| 亚洲精品乱码久久久久久蜜桃不卡 | 91亚洲va在线天线va天堂va国产| 国产亚洲欧洲Aⅴ综合一区| 亚洲av再在线观看| 免费女人18毛片a级毛片视频| 成年女人免费视频播放77777 | 亚洲国产精品无码久久久不卡| 亚洲日韩精品无码专区网站 | 精品视频免费在线| 国产亚洲精品美女2020久久 | 波多野结衣中文一区二区免费| 浮力影院第一页小视频国产在线观看免费| 国产a视频精品免费观看| 最近中文字幕完整版免费高清| 免费无码又爽又刺激高潮视频| 免费无码毛片一区二区APP| 日韩插啊免费视频在线观看| 国产激情免费视频在线观看| 久久久久久AV无码免费网站下载| 久爱免费观看在线网站| 99久久99久久精品免费观看| 四虎在线视频免费观看视频| 国产无人区码卡二卡三卡免费| 在线观看AV片永久免费|