<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    瘋狂

    STANDING ON THE SHOULDERS OF GIANTS
    posts - 481, comments - 486, trackbacks - 0, articles - 1
      BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

    xsocket源碼解讀

    Posted on 2011-10-19 17:34 瘋狂 閱讀(6323) 評論(3)  編輯  收藏 所屬分類: 網(wǎng)絡通訊讀代碼

    關于xsocket可見于 我的另外一篇文章http://www.tkk7.com/freeman1984/archive/2011/04/25/302706.html,或者查看官網(wǎng)http://xsocket.org/
    當然閱讀xsocket需要一些線程,nio,niosocket,和java.util.concurrent(鎖,線程池等)包的一些知識。要不讀起來很費勁,建議先去了解下這些知識。可以在我的文章分類concurrent里面有一些,其他去網(wǎng)上找找。
    本文只讀了一個主要的流程,對于一些其他的代碼例如:ssl相關,讀數(shù)據(jù)相關沒有涉及,看有時間能補上。
    首先xsocket的幾個關鍵的類
    Server: 服務器端初始化線程池創(chuàng)建IoAcceptor
    IoAcceptor:采用while循環(huán)接收客戶端連接,并創(chuàng)建IoSocketDispatcher和IoChainableHandler
    IoSocketDispatcher:負責注冊SelectionKey以及事件的分發(fā),并交給IoChainableHandler處理,通過一個while循環(huán)來處理注冊的SelectionKey事件。
    IHandler:事件處理,數(shù)據(jù)的讀寫等等。
    INonBlockingConnection客戶端接口。
    (1)首先看下Server創(chuàng)建:
    構(gòu)造方法

    protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
     
    boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) 
    這個方法主要是初始化線程池,構(gòu)件acceptor
            defaultWorkerPool 
    = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
         workerpool 
    = defaultWorkerPool;
         
      
    if (sslContext != null{//是否使用ssl
       acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
       
      }
     else {
       acceptor 
    = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
      }



    其中線程池:使用jdk1.5以后的ThreadPoolExecutor,線程池最小默認2,最大100,QUEUE的大小默認也是100
    線程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
    SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
    TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));

    (2)構(gòu)件acceptor細節(jié),最后server啟動的時候會啟動acceptor監(jiān)聽客戶端

     public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
            .
            serverChannel 
    = ServerSocketChannel.open();
            
            serverChannel.configureBlocking(
    true);
            serverChannel.socket().setSoTimeout(
    0);  // accept method never times out
            serverChannel.socket().setReuseAddress(isReuseAddress); 
            .
            serverChannel.socket().bind(address, backlog);
            dispatcherPool 
    = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
            .
        }


    (3)啟動server
    server.start();
    服務的啟動用了一個單獨的線程,這里面使用到了CountDownLatch可參見另外一篇關于CountDownLatch用法的文章:
    http://www.tkk7.com/freeman1984/archive/2011/07/04/353654.html
    使用CountDownLatch來控制server的啟動時間,操作多少時間為啟動就,默認是60秒,這里就不講CountDownLatch的代碼了
    整個啟動的方法如下,能看懂CountDownLatch的用法基本上就理解了。

    public static void start(IServer server, int timeoutSec) throws SocketTimeoutException {
      
      
      
    // start server within a dedicated thread 
      Thread t = new Thread(server);
      t.setName(
    "xServer");
      t.start();
    //請看下面的run方法分析
      
     }


    看看他的server線程的run方法:

    public void run() {
     
     acceptor.listen();
    //啟動前面創(chuàng)建的acceptor開始監(jiān)聽客戶端連接
     
    }



    接著查看listen()方法:

    public void listen() throws IOException {
         callback.onConnected();
    //通知server已經(jīng)啟動
         accept();//接受客戶端連接
        }

    }



    查看 accept();很明了,使用一個while循環(huán)監(jiān)聽客戶端連接,并建立可客戶端相關的處理類:

     

    while (isOpen.get()) {

        
    // blocking accept call
        SocketChannel channel = serverChannel.accept();

        
    // create IoSocketHandler
        
    //創(chuàng)建事件分發(fā)器
        IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
        
    //創(chuàng)建io處理
        IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
        
    // notify call back
        callback.onConnectionAccepted(ioHandler);//很關鍵的一個地方,會注冊SelectionKey.OP_READ,此時客戶端發(fā)來的消息就可以北服務端獲取
    }


    查看callback.onConnectionAccepted(ioHandler);
    此方法會初始化server端的ioHandler,查看初始化的代碼:
    dispatcher.register(this, SelectionKey.OP_READ);首先會注冊read選擇器,
    如果有read事件發(fā)生dispatcher就會處理。看看dispatcher的run方法(通過一個循環(huán)來不停的處理已經(jīng)注冊的事件)

    while(isOpen.get()) {
        
        
    int eventCount = selector.select(5000); 
       
        handledTasks 
    = performRegisterHandlerTasks();//處理事件
        handledTasks += performKeyUpdateTasks();
        
    if (eventCount > 0{
         handleReadWriteKeys();
    //處理讀寫,調(diào)用我們自己定義的hander來處理onData等事件
        }

        handledTasks 
    += performDeregisterHandlerTasks();
          
      }



      
    (4)客戶端
    NonBlockingConnection,例如:
    new NonBlockingConnection("localhost", 8090,new MyHandler() )
    構(gòu)造方法的主要代碼:

     .
     SocketChannel channel 
    = openSocket(localAddress, options);//實際調(diào)用:SocketChannel channel = SocketChannel.open();
       
     IoConnector connector 
    = getDefaultConnector();
           
        IIoConnectorCallback callback 
    = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
        connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);
          
    }



    建立連接,生成IoConnector用來管理連接,然后connector開始啟動,做一些初始化的工作:

    其中connector.connectAsync(…方法會執(zhí)行會產(chǎn)生一個RegisterTask任務到IoConnector,這個RegisterTask做的事情如下:
    selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注冊SelectionKey.OP_CONNECT
    當IoConnector運行會執(zhí)行這個任務:
    看下他的run方法主要代碼:

    while(isOpen.get()) {
               
                    handledTasks 
    = performTaskQueue();//首先運行上一步創(chuàng)建的RegisterTask注冊SelectionKey.OP_CONNECT
                    int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已經(jīng)準備好
                    if (eventCount > 0{
                        handleConnect();
    //如果準備好就處理連接事件
                    }
     else {
                     checkForLooping(handledTasks);
                    }

             
      }



     handleConnect()的代碼:
     

     private void handleConnect() {
            Set
    <SelectionKey> selectedEventKeys = selector.selectedKeys();

            Iterator
    <SelectionKey> it = selectedEventKeys.iterator();
            
    while (it.hasNext()) {
                SelectionKey eventKey 
    = it.next();
                it.remove();

                RegisterTask registerTask 
    = (RegisterTask) eventKey.attachment();

                
    if (eventKey.isValid() && eventKey.isConnectable()) {
                    
                    
    try {
                        
    boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已經(jīng)通訊連接
                        if (isConnected) {
                         eventKey.cancel();
                         registerTask.callback.onConnectionEstablished();
    //連接建立好就做下一步工作,注冊read事件。
                        }

                    
                }

            }

        }


     接著看registerTask.callback.onConnectionEstablished()
     主要初始化iohander并注冊read事件

     private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException {
      
    this.ioHandler = ioHandler;
      ioHandler.init(handlerCallback);
    //這個方法里面注冊了read
      isConnected.set(true);//這個時候通訊連接才真正建立起來了
     }



    繼續(xù)看ioHandler.init(handlerCallback)方法:

    public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException {
         
      dispatcher.register(
    this, SelectionKey.OP_READ);//注冊SelectionKey.OP_READ時間,可以接受服務端的消息了
    }



    服務端和客戶端就可以互相通信了。本文大致講解了xsocket的代碼的流程,其中講解有誤的地方請兄弟們指出,多謝!


    評論

    # re: xsocket源碼解讀  回復  更多評論   

    2011-10-20 08:32 by tbw
    不錯 不錯 學習了

    # re: xsocket源碼解讀  回復  更多評論   

    2012-03-05 16:18 by Miao
    請問,xsocket怎么連接代理呢??實在找不到相關資料....謝謝..

    # re: xsocket源碼解讀  回復  更多評論   

    2014-11-21 18:51 by xsank
    有幫助,寫樓主分享
    主站蜘蛛池模板: 无码国产精品一区二区免费I6| 亚洲国产aⅴ成人精品无吗| 亚洲AV无码一区二三区| 在线看片v免费观看视频777| 国产做国产爱免费视频| 韩国亚洲伊人久久综合影院| 亚洲国产精品成人综合久久久| 亚洲熟女一区二区三区| 又黄又大又爽免费视频| 永久免费毛片手机版在线看| 91免费在线播放| 日日麻批免费40分钟无码| 中文字幕免费在线看线人动作大片 | 在线观看的免费网站无遮挡| fc2成年免费共享视频网站| 极品色天使在线婷婷天堂亚洲| 国产成人精品日本亚洲直接| 亚洲成电影在线观看青青| 久久精品亚洲综合| 亚洲AV永久无码区成人网站| 亚洲色偷拍另类无码专区| 国产亚洲精久久久久久无码AV| 四虎亚洲国产成人久久精品| 成人免费视频观看无遮挡| 成人午夜免费福利视频| 国产人成免费视频网站| 最近高清中文字幕免费| 99re6热视频精品免费观看 | 亚洲三级电影网址| 国产成人精品日本亚洲| 亚洲中文字幕第一页在线| 亚洲一区二区三区无码中文字幕| 亚洲一区二区三区在线视频| 亚洲片国产一区一级在线观看 | 亚洲国产精品无码观看久久| 亚洲女子高潮不断爆白浆| 亚洲av无码一区二区三区在线播放 | 黄色三级三级三级免费看| 日韩少妇内射免费播放| 亚美影视免费在线观看| 两性色午夜免费视频|