<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    the journey is the reward...

    常用鏈接

    統計

    最新評論

    MINA,xSocket同樣的性能缺陷及陷阱,Grizzly better

    MINA,Grizzly[grizzly-nio-framework],xSocket都是基于 java nio的 server framework.
    這里的性能缺陷的焦點是指當一條channel上的SelectionKey.OP_READ ready時,1.是由select thread讀完數據之后再分發給應用程序的handler,2.還是直接就分發,由handler thread來負責讀數據和handle.
    mina,xsocket是1. grizzly-nio-framework是2.
    盡管讀channel buffer中bytes是很快的,但是如果我們放大,當連接channel達到上萬數量級,甚至更多,這種延遲響應的效果將會愈加明顯.
    MINA:
    for all selectedKeys
    {
        read data then fireMessageReceived.
    }
    xSocket:
    for all selectedKeys
    {
        read data ,append it to readQueue then performOnData.
    }
    其中mina在fireMessageReceived時沒有使用threadpool來分發,所以需要應用程序在handler.messageReceived中再分發.而xsocket的performOnData默認是分發給threadpool[WorkerPool],WorkerPool雖然解決了線程池中的線程不能充到最大的問題[跟tomcat6的做法一樣],但是它的調度機制依然缺乏靈活性.
    Grizzly:
    for all selectedKeys
    {
       [NIOContext---filterChain.execute--->our filter.execute]<------run In DefaultThreadPool
    }
    grizzly的DefaultThreadPool幾乎重寫了java util concurrent threadpool,并使用自己的LinkedTransferQueue,但同樣缺乏靈活的池中線程的調度機制

    下面分別是MINA,xSocket,Grizzly的源碼分析:
    Apache MINA (mina-2.0.0-M6源碼為例):
        我們使用mina nio tcp最常用的樣例如下:
            NioSocketAcceptor acceptor = new NioSocketAcceptor(/*NioProcessorPool's size*/);
            DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();        
            //chain.addLast("codec", new ProtocolCodecFilter(
                    //new TextLineCodecFactory()));
            ......
            // Bind
            acceptor.setHandler(/*our IoHandler*/);
            acceptor.bind(new InetSocketAddress(port));
    ------------------------------------------------------------------------------------
        首先從NioSocketAcceptor(extends AbstractPollingIoAcceptor)開始,
    bind(SocketAddress)--->bindInternal--->startupAcceptor:啟動AbstractPollingIoAcceptor.Acceptor.run使用executor[Executor]的線程,注冊[interestOps:SelectionKey.OP_ACCEPT],然后wakeup selector.
    一旦有連接進來就構建NioSocketSession--對應--channal,然后session.getProcessor().add(session)將當前的channal加入到NioProcessor的selector中去[interestOps:SelectionKey.OP_READ],這樣每個連接中有請求過來就由相應的NioProcessor來處理.

    這里有幾點要說明的是:
    1.一個NioSocketAcceptor對應了多個NioProcessor,比如NioSocketAcceptor就使用了SimpleIoProcessorPool DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1.當然這個size在new NioSocketAcceptor的時候可以設定.
    2.一個NioSocketAcceptor對應一個java nio selector[OP_ACCEPT],一個NioProcessor也對應一個java nio selector[OP_READ].
    3.一個NioSocketAcceptor對應一個內部的AbstractPollingIoAcceptor.Acceptor---thread.
    4.一個NioProcessor也對應一個內部的AbstractPollingIoProcessor.Processor---thread.
    5.在new NioSocketAcceptor的時候如果你不提供Executor(線程池)的話,那么默認使用Executors.newCachedThreadPool().
    這個Executor將被NioSocketAcceptor和NioProcessor公用,也就是說上面的Acceptor---thread(一條)和Processor---thread(多條)都是源于這個Executor.
          當一個連接java nio channal--NioSession被加到ProcessorPool[i]--NioProcessor中去后就轉入了AbstractPollingIoProcessor.Processor.run,
    AbstractPollingIoProcessor.Processor.run方法是運行在上面的Executor中的一條線程中的,當前的NioProcessor將處理注冊在它的selector上的所有連接的請求[interestOps:SelectionKey.OP_READ].

    AbstractPollingIoProcessor.Processor.run的主要執行流程:
    for (;;) {      
           ......
           int selected = selector(final SELECT_TIMEOUT = 1000L);
           .......
           if (selected > 0) {
              process();
           }
           ......
    }

    process()-->for all session-channal:OP_READ -->read(session):這個read方法是AbstractPollingIoProcessor.private void read(T session)方法.
    read(session)的主要執行流程是read channal-data to buf,if readBytes>0 then IoFilterChain.fireMessageReceived(buf)/*我們的IoHandler.messageReceived將在其中被調用*/;
        到此mina Nio 處理請求的流程已經明了.
        mina處理請求的線程模型也出來了,性能問題也來了,那就是在AbstractPollingIoProcessor.Processor.run-->process-->read(per session)中,在process的時候mina是for all selected-channals 逐次read data再fireMessageReceived到我們的IoHandler.messageReceived中,而不是并發處理,這樣一來很明顯后來的請求將被延遲處理.
    我們假設:如果NioProcessorPool's size=2 現在有200個客戶端同時連接過來,假設每個NioProcessor都注冊了100個連接,對于每個NioProcessor將依次順序處理這100個請求,那么這其中的第100個請求要得到處理,那它只有等到前面的99個被處理完了.
        有人提出了改進方案,那就是在我們自己的IoHandler.messageReceived中利用線程池再進行分發dispatching,這個當然是個好主意.
        但是請求還是被延遲處理了,因為還有read data所消耗的時間,這樣第100個請求它的數據要被讀,就要等前面的99個都被讀完才行,即便是增加ProcessorPool的尺寸也不能解決這個問題.
        此外mina的陷阱(這個詞較時髦)也出來了,就是在read(session)中,在說這個陷阱之前先說明一下,我們的client端向server端發送一個消息體的時候不一定是完整的只發送一次,可能分多次發送,特別是在client端忙或要發送的消息體的長度較長的時候.而mina在這種情況下就會call我們的IoHandler.messageReceived多次,結果就是消息體被分割了若干份,等于我們在IoHandler.messageReceived中每次處理的數據都是不完整的,這會導致數據丟失,無效.
    下面是read(session)的源碼:
    private void read(T session) {
            IoSessionConfig config = session.getConfig();
            IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

            final boolean hasFragmentation =
                session.getTransportMetadata().hasFragmentation();

            try {
                int readBytes = 0;
                int ret;

                try {
                    if (hasFragmentation/*hasFragmentation一定為ture,也許mina的開發人員也意識到了傳輸數據的碎片問題,但是靠下面的處理是遠遠不夠的,因為client一旦間隔發送,ret就可能為0,退出while,不完整的readBytes將被fire*/) {
                        while ((ret = read(session, buf)) > 0) {
                            readBytes += ret;
                            if (!buf.hasRemaining()) {
                                break;
                            }
                        }
                    } else {
                        ret = read(session, buf);
                        if (ret > 0) {
                            readBytes = ret;
                        }
                    }
                } finally {
                    buf.flip();
                }

                if (readBytes > 0) {
                    IoFilterChain filterChain = session.getFilterChain();
                    filterChain.fireMessageReceived(buf);
                    buf = null;

                    if (hasFragmentation) {
                        if (readBytes << 1 < config.getReadBufferSize()) {
                            session.decreaseReadBufferSize();
                        } else if (readBytes == config.getReadBufferSize()) {
                            session.increaseReadBufferSize();
                        }
                    }
                }
                if (ret < 0) {
                    scheduleRemove(session);
                }
            } catch (Throwable e) {
                if (e instanceof IOException) {
                    scheduleRemove(session);
                }
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireExceptionCaught(e);
            }
        }
    這個陷阱大家可以測試一下,看會不會一個完整的消息被多次發送,你的IoHandler.messageReceived有沒有被多次調用.
    要保持我們應用程序消息體的完整性也很簡單只需創建一個斷點breakpoint,然后set it to the current IoSession,一旦消息體數據完整就dispatching it and remove it from the current session.
    -------------------------------------------------------------------------------------------------- 
    下面以xSocket v2_8_8源碼為例:
    tcp usage e.g:
    IServer srv = new Server(8090, new EchoHandler());
    srv.start() or run();
    -----------------------------------------------------------------------
    class EchoHandler implements IDataHandler {  
        public boolean onData(INonBlockingConnection nbc)
                 throws IOException,
                 BufferUnderflowException,
                 MaxReadSizeExceededException {
           String data = nbc.readStringByDelimiter("\r\n");
           nbc.write(data + "\r\n");
           return true;
        }
      }
    ------------------------------------------------------------------------
    說明1.Server:Acceptor:IDataHandler ------1:1:1
    Server.run-->IoAcceptor.accept()在port上阻塞,一旦有channel就從IoSocketDispatcherPool中獲取一個IoSocketDispatcher,同時構建一個IoSocketHandler和NonBlockingConnection,調用Server.LifeCycleHandler.onConnectionAccepted(ioHandler)  initialize the IoSocketHandler.注意:IoSocketDispatcherPool.size默認為2,也就是說只有2條do select的線程和相應的2個IoSocketDispatcher.這個和MINA的NioProcessor數是一樣的.
    說明2.IoSocketDispatcher[java nio Selector]:IoSocketHandler:NonBlockingConnection------1:1:1
    在IoSocketDispatcher[對應一個Selector].run中--->IoSocketDispatcher.handleReadWriteKeys:
    for all selectedKeys
    {
        IoSocketHandler.onReadableEvent/onWriteableEvent.

    IoSocketHandler.onReadableEvent的處理過程如下:
    1.readSocket();
    2.NonBlockingConnection.IoHandlerCallback.onData
    NonBlockingConnection.onData--->appendDataToReadBuffer: readQueue append data
    3.NonBlockingConnection.IoHandlerCallback.onPostData
    NonBlockingConnection.onPostData--->HandlerAdapter.onData[our dataHandler] performOnData in WorkerPool[threadpool]. 

    因為是把channel中的數據讀到readQueue中,應用程序的dataHandler.onData會被多次調用直到readQueue中的數據讀完為止.所以依然存在類似mina的陷阱.解決的方法依然類似,因為這里有NonBlockingConnection.
    ----------------------------------------------------------------------------------------------
    再下面以grizzly-nio-framework v1.9.18源碼為例:
    tcp usage e.g:
    Controller sel = new Controller();
             sel.setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler(){
                 public ProtocolChain poll() {
                     ProtocolChain protocolChain = protocolChains.poll();
                     if (protocolChain == null){
                         protocolChain = new DefaultProtocolChain();
                         //protocolChain.addFilter(our app's filter/*應用程序的處理從filter開始,類似mina.ioHandler,xSocket.dataHandler*/);
                         //protocolChain.addFilter(new ReadFilter());
                     }
                     return protocolChain;
                 }
             });
             //如果你不增加自己的SelectorHandler,Controller就默認使用TCPSelectorHandler port:18888
             sel.addSelectorHandler(our app's selectorHandler on special port);        
      sel.start();
    ------------------------------------------------------------------------------------------------------------
     說明1.Controller:ProtocolChain:Filter------1:1:n,Controller:SelectorHandler------1:n,
    SelectorHandler[對應一個Selector]:SelectorHandlerRunner------1:1,
    Controller. start()--->for per SelectorHandler start SelectorHandlerRunner to run.
    SelectorHandlerRunner.run()--->selectorHandler.select()  then handleSelectedKeys:
    for all selectedKeys
    {
       NIOContext.execute:dispatching to threadpool for ProtocolChain.execute--->our filter.execute.

    你會發現這里沒有read data from channel的動作,因為這將由你的filter來完成.所以自然沒有mina,xsocket它們的陷阱問題,分發提前了.但是你要注意SelectorHandler:Selector:SelectorHandlerRunner:Thread[SelectorHandlerRunner.run]都是1:1:1:1,也就是說只有一條線程在doSelect then handleSelectedKeys.

        相比之下雖然grizzly在并發性能上更優,但是在易用性方面卻不如mina,xsocket,比如類似mina,xsocket中表示當前連接或會話的IoSession,INonBlockingConnection對象在grizzly中由NIOContext來負責,但是NIOContext并沒有提供session/connection lifecycle event,以及常規的read/write操作,這些都需要你自己去擴展SelectorHandler和ProtocolFilter,從另一個方面也可以說明grizzly的可擴展性,靈活性更勝一籌.

     

    posted on 2010-03-05 09:37 adapterofcoms 閱讀(5474) 評論(0)  編輯  收藏 所屬分類: java techs

    主站蜘蛛池模板: 国产精品视频免费观看| 亚洲av无码乱码在线观看野外 | 久久久无码精品亚洲日韩蜜桃 | 国产成人高清亚洲| 久久亚洲AV成人出白浆无码国产| 久久亚洲精品无码aⅴ大香| 污污的视频在线免费观看| 亚洲综合色丁香婷婷六月图片| 亚洲性线免费观看视频成熟| 亚洲啪AV永久无码精品放毛片| 亚洲AV永久无码精品一区二区国产| 久久青草精品38国产免费| 亚洲精品乱码久久久久久蜜桃不卡| 特级无码毛片免费视频| 日木av无码专区亚洲av毛片| 无码国模国产在线观看免费| 亚洲 日韩经典 中文字幕| 在线观看亚洲天天一三视| 无码乱肉视频免费大全合集| jizz在线免费播放| 亚洲精品无码久久久久sm| 毛片免费vip会员在线看| a视频在线观看免费| 亚洲国产精品婷婷久久| 不卡视频免费在线观看| 亚洲夂夂婷婷色拍WW47| 亚洲丁香色婷婷综合欲色啪| www.亚洲一区| 亚洲视频在线免费| 亚洲午夜精品久久久久久app| 亚洲AV无码成人专区片在线观看| 国产91色综合久久免费分享| 一级A毛片免费观看久久精品| 亚洲日韩国产精品乱-久| 亚洲网址在线观看你懂的| 亚洲黄黄黄网站在线观看| 日韩免费观看的一级毛片| 中文字幕无码免费久久99| 99蜜桃在线观看免费视频网站| 亚洲看片无码在线视频| 久久亚洲精品无码AV红樱桃|