<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    瘋狂

    STANDING ON THE SHOULDERS OF GIANTS
    posts - 481, comments - 486, trackbacks - 0, articles - 1
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    xsocket源碼解讀

    Posted on 2011-10-19 17:34 瘋狂 閱讀(6340) 評論(3)  編輯  收藏 所屬分類: 網絡通訊讀代碼

    關于xsocket可見于 我的另外一篇文章http://www.tkk7.com/freeman1984/archive/2011/04/25/302706.html,或者查看官網http://xsocket.org/
    當然閱讀xsocket需要一些線程,nio,niosocket,和java.util.concurrent(鎖,線程池等)包的一些知識。要不讀起來很費勁,建議先去了解下這些知識。可以在我的文章分類concurrent里面有一些,其他去網上找找。
    本文只讀了一個主要的流程,對于一些其他的代碼例如:ssl相關,讀數據相關沒有涉及,看有時間能補上。
    首先xsocket的幾個關鍵的類
    Server: 服務器端初始化線程池創建IoAcceptor
    IoAcceptor:采用while循環接收客戶端連接,并創建IoSocketDispatcher和IoChainableHandler
    IoSocketDispatcher:負責注冊SelectionKey以及事件的分發,并交給IoChainableHandler處理,通過一個while循環來處理注冊的SelectionKey事件。
    IHandler:事件處理,數據的讀寫等等。
    INonBlockingConnection客戶端接口。
    (1)首先看下Server創建:
    構造方法

    protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
     
    boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) 
    這個方法主要是初始化線程池,構件acceptor
            defaultWorkerPool 
    = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
         workerpool 
    = defaultWorkerPool;
         
      
    if (sslContext != null{//是否使用ssl
       acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
       
      }
     else {
       acceptor 
    = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
      }



    其中線程池:使用jdk1.5以后的ThreadPoolExecutor,線程池最小默認2,最大100,QUEUE的大小默認也是100
    線程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
    SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
    TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));

    (2)構件acceptor細節,最后server啟動的時候會啟動acceptor監聽客戶端

     public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
            .
            serverChannel 
    = ServerSocketChannel.open();
            
            serverChannel.configureBlocking(
    true);
            serverChannel.socket().setSoTimeout(
    0);  // accept method never times out
            serverChannel.socket().setReuseAddress(isReuseAddress); 
            .
            serverChannel.socket().bind(address, backlog);
            dispatcherPool 
    = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
            .
        }


    (3)啟動server
    server.start();
    服務的啟動用了一個單獨的線程,這里面使用到了CountDownLatch可參見另外一篇關于CountDownLatch用法的文章:
    http://www.tkk7.com/freeman1984/archive/2011/07/04/353654.html
    使用CountDownLatch來控制server的啟動時間,操作多少時間為啟動就,默認是60秒,這里就不講CountDownLatch的代碼了
    整個啟動的方法如下,能看懂CountDownLatch的用法基本上就理解了。

    public static void start(IServer server, int timeoutSec) throws SocketTimeoutException {
      
      
      
    // start server within a dedicated thread 
      Thread t = new Thread(server);
      t.setName(
    "xServer");
      t.start();
    //請看下面的run方法分析
      
     }


    看看他的server線程的run方法:

    public void run() {
     
     acceptor.listen();
    //啟動前面創建的acceptor開始監聽客戶端連接
     
    }



    接著查看listen()方法:

    public void listen() throws IOException {
         callback.onConnected();
    //通知server已經啟動
         accept();//接受客戶端連接
        }

    }



    查看 accept();很明了,使用一個while循環監聽客戶端連接,并建立可客戶端相關的處理類:

     

    while (isOpen.get()) {

        
    // blocking accept call
        SocketChannel channel = serverChannel.accept();

        
    // create IoSocketHandler
        
    //創建事件分發器
        IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
        
    //創建io處理
        IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
        
    // notify call back
        callback.onConnectionAccepted(ioHandler);//很關鍵的一個地方,會注冊SelectionKey.OP_READ,此時客戶端發來的消息就可以北服務端獲取
    }


    查看callback.onConnectionAccepted(ioHandler);
    此方法會初始化server端的ioHandler,查看初始化的代碼:
    dispatcher.register(this, SelectionKey.OP_READ);首先會注冊read選擇器,
    如果有read事件發生dispatcher就會處理。看看dispatcher的run方法(通過一個循環來不停的處理已經注冊的事件)

    while(isOpen.get()) {
        
        
    int eventCount = selector.select(5000); 
       
        handledTasks 
    = performRegisterHandlerTasks();//處理事件
        handledTasks += performKeyUpdateTasks();
        
    if (eventCount > 0{
         handleReadWriteKeys();
    //處理讀寫,調用我們自己定義的hander來處理onData等事件
        }

        handledTasks 
    += performDeregisterHandlerTasks();
          
      }



      
    (4)客戶端
    NonBlockingConnection,例如:
    new NonBlockingConnection("localhost", 8090,new MyHandler() )
    構造方法的主要代碼:

     .
     SocketChannel channel 
    = openSocket(localAddress, options);//實際調用:SocketChannel channel = SocketChannel.open();
       
     IoConnector connector 
    = getDefaultConnector();
           
        IIoConnectorCallback callback 
    = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
        connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);
          
    }



    建立連接,生成IoConnector用來管理連接,然后connector開始啟動,做一些初始化的工作:

    其中connector.connectAsync(…方法會執行會產生一個RegisterTask任務到IoConnector,這個RegisterTask做的事情如下:
    selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注冊SelectionKey.OP_CONNECT
    當IoConnector運行會執行這個任務:
    看下他的run方法主要代碼:

    while(isOpen.get()) {
               
                    handledTasks 
    = performTaskQueue();//首先運行上一步創建的RegisterTask注冊SelectionKey.OP_CONNECT
                    int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已經準備好
                    if (eventCount > 0{
                        handleConnect();
    //如果準備好就處理連接事件
                    }
     else {
                     checkForLooping(handledTasks);
                    }

             
      }



     handleConnect()的代碼:
     

     private void handleConnect() {
            Set
    <SelectionKey> selectedEventKeys = selector.selectedKeys();

            Iterator
    <SelectionKey> it = selectedEventKeys.iterator();
            
    while (it.hasNext()) {
                SelectionKey eventKey 
    = it.next();
                it.remove();

                RegisterTask registerTask 
    = (RegisterTask) eventKey.attachment();

                
    if (eventKey.isValid() && eventKey.isConnectable()) {
                    
                    
    try {
                        
    boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已經通訊連接
                        if (isConnected) {
                         eventKey.cancel();
                         registerTask.callback.onConnectionEstablished();
    //連接建立好就做下一步工作,注冊read事件。
                        }

                    
                }

            }

        }


     接著看registerTask.callback.onConnectionEstablished()
     主要初始化iohander并注冊read事件

     private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException {
      
    this.ioHandler = ioHandler;
      ioHandler.init(handlerCallback);
    //這個方法里面注冊了read
      isConnected.set(true);//這個時候通訊連接才真正建立起來了
     }



    繼續看ioHandler.init(handlerCallback)方法:

    public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException {
         
      dispatcher.register(
    this, SelectionKey.OP_READ);//注冊SelectionKey.OP_READ時間,可以接受服務端的消息了
    }



    服務端和客戶端就可以互相通信了。本文大致講解了xsocket的代碼的流程,其中講解有誤的地方請兄弟們指出,多謝!


    評論

    # re: xsocket源碼解讀  回復  更多評論   

    2011-10-20 08:32 by tbw
    不錯 不錯 學習了

    # re: xsocket源碼解讀  回復  更多評論   

    2012-03-05 16:18 by Miao
    請問,xsocket怎么連接代理呢??實在找不到相關資料....謝謝..

    # re: xsocket源碼解讀  回復  更多評論   

    2014-11-21 18:51 by xsank
    有幫助,寫樓主分享
    主站蜘蛛池模板: av无码免费一区二区三区| 国产午夜精品理论片免费观看| 97av免费视频| 亚洲成a人片77777老司机| 国产精品网站在线观看免费传媒| 亚洲人成影院在线无码观看| 日韩精品无码免费视频| 亚洲综合激情另类专区| 国产免费久久精品丫丫| 亚洲国产成人高清在线观看| 久久国产精品一区免费下载| 久久久久亚洲AV无码专区体验| 久久久久国产免费| 亚洲国产成人精品久久| 67194成是人免费无码| 色窝窝亚洲AV网在线观看| 亚洲AV无码之日韩精品| 99久久99这里只有免费的精品 | 免费精品无码AV片在线观看| 久久精品国产亚洲av影院| 91九色老熟女免费资源站| 亚洲人成图片网站| 全部免费a级毛片| 国产成年无码久久久免费| 亚洲视频免费观看| 香蕉视频在线观看免费国产婷婷| MM1313亚洲国产精品| 亚洲精品无码专区久久久| 精品福利一区二区三区免费视频| 亚洲夂夂婷婷色拍WW47| 亚洲乱码中文字幕手机在线| 免费一区二区三区| 亚洲中文字幕一区精品自拍| 亚洲日韩在线观看| 永久黄色免费网站| 麻豆亚洲AV成人无码久久精品 | 国产在线98福利播放视频免费| 一级做a爱过程免费视频高清| 国产av天堂亚洲国产av天堂| 97无码免费人妻超级碰碰碰碰 | 亚洲国产天堂久久久久久|