<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    瘋狂

    STANDING ON THE SHOULDERS OF GIANTS
    posts - 481, comments - 486, trackbacks - 0, articles - 1
      BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

    xsocket源碼解讀

    Posted on 2011-10-19 17:34 瘋狂 閱讀(6324) 評(píng)論(3)  編輯  收藏 所屬分類: 網(wǎng)絡(luò)通訊讀代碼

    關(guān)于xsocket可見(jiàn)于 我的另外一篇文章http://www.tkk7.com/freeman1984/archive/2011/04/25/302706.html,或者查看官網(wǎng)http://xsocket.org/
    當(dāng)然閱讀xsocket需要一些線程,nio,niosocket,和java.util.concurrent(鎖,線程池等)包的一些知識(shí)。要不讀起來(lái)很費(fèi)勁,建議先去了解下這些知識(shí)。可以在我的文章分類concurrent里面有一些,其他去網(wǎng)上找找。
    本文只讀了一個(gè)主要的流程,對(duì)于一些其他的代碼例如:ssl相關(guān),讀數(shù)據(jù)相關(guān)沒(méi)有涉及,看有時(shí)間能補(bǔ)上。
    首先xsocket的幾個(gè)關(guān)鍵的類
    Server: 服務(wù)器端初始化線程池創(chuàng)建IoAcceptor
    IoAcceptor:采用while循環(huán)接收客戶端連接,并創(chuàng)建IoSocketDispatcher和IoChainableHandler
    IoSocketDispatcher:負(fù)責(zé)注冊(cè)SelectionKey以及事件的分發(fā),并交給IoChainableHandler處理,通過(guò)一個(gè)while循環(huán)來(lái)處理注冊(cè)的SelectionKey事件。
    IHandler:事件處理,數(shù)據(jù)的讀寫等等。
    INonBlockingConnection客戶端接口。
    (1)首先看下Server創(chuàng)建:
    構(gòu)造方法

    protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
     
    boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) 
    這個(gè)方法主要是初始化線程池,構(gòu)件acceptor
            defaultWorkerPool 
    = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
         workerpool 
    = defaultWorkerPool;
         
      
    if (sslContext != null{//是否使用ssl
       acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
       
      }
     else {
       acceptor 
    = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
      }



    其中線程池:使用jdk1.5以后的ThreadPoolExecutor,線程池最小默認(rèn)2,最大100,QUEUE的大小默認(rèn)也是100
    線程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
    SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
    TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));

    (2)構(gòu)件acceptor細(xì)節(jié),最后server啟動(dòng)的時(shí)候會(huì)啟動(dòng)acceptor監(jiān)聽(tīng)客戶端

     public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
            .
            serverChannel 
    = ServerSocketChannel.open();
            
            serverChannel.configureBlocking(
    true);
            serverChannel.socket().setSoTimeout(
    0);  // accept method never times out
            serverChannel.socket().setReuseAddress(isReuseAddress); 
            .
            serverChannel.socket().bind(address, backlog);
            dispatcherPool 
    = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
            .
        }


    (3)啟動(dòng)server
    server.start();
    服務(wù)的啟動(dòng)用了一個(gè)單獨(dú)的線程,這里面使用到了CountDownLatch可參見(jiàn)另外一篇關(guān)于CountDownLatch用法的文章:
    http://www.tkk7.com/freeman1984/archive/2011/07/04/353654.html
    使用CountDownLatch來(lái)控制server的啟動(dòng)時(shí)間,操作多少時(shí)間為啟動(dòng)就,默認(rèn)是60秒,這里就不講CountDownLatch的代碼了
    整個(gè)啟動(dòng)的方法如下,能看懂CountDownLatch的用法基本上就理解了。

    public static void start(IServer server, int timeoutSec) throws SocketTimeoutException {
      
      
      
    // start server within a dedicated thread 
      Thread t = new Thread(server);
      t.setName(
    "xServer");
      t.start();
    //請(qǐng)看下面的run方法分析
      
     }


    看看他的server線程的run方法:

    public void run() {
     
     acceptor.listen();
    //啟動(dòng)前面創(chuàng)建的acceptor開(kāi)始監(jiān)聽(tīng)客戶端連接
     
    }



    接著查看listen()方法:

    public void listen() throws IOException {
         callback.onConnected();
    //通知server已經(jīng)啟動(dòng)
         accept();//接受客戶端連接
        }

    }



    查看 accept();很明了,使用一個(gè)while循環(huán)監(jiān)聽(tīng)客戶端連接,并建立可客戶端相關(guān)的處理類:

     

    while (isOpen.get()) {

        
    // blocking accept call
        SocketChannel channel = serverChannel.accept();

        
    // create IoSocketHandler
        
    //創(chuàng)建事件分發(fā)器
        IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
        
    //創(chuàng)建io處理
        IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
        
    // notify call back
        callback.onConnectionAccepted(ioHandler);//很關(guān)鍵的一個(gè)地方,會(huì)注冊(cè)SelectionKey.OP_READ,此時(shí)客戶端發(fā)來(lái)的消息就可以北服務(wù)端獲取
    }


    查看callback.onConnectionAccepted(ioHandler);
    此方法會(huì)初始化server端的ioHandler,查看初始化的代碼:
    dispatcher.register(this, SelectionKey.OP_READ);首先會(huì)注冊(cè)read選擇器,
    如果有read事件發(fā)生dispatcher就會(huì)處理。看看dispatcher的run方法(通過(guò)一個(gè)循環(huán)來(lái)不停的處理已經(jīng)注冊(cè)的事件)

    while(isOpen.get()) {
        
        
    int eventCount = selector.select(5000); 
       
        handledTasks 
    = performRegisterHandlerTasks();//處理事件
        handledTasks += performKeyUpdateTasks();
        
    if (eventCount > 0{
         handleReadWriteKeys();
    //處理讀寫,調(diào)用我們自己定義的hander來(lái)處理onData等事件
        }

        handledTasks 
    += performDeregisterHandlerTasks();
          
      }



      
    (4)客戶端
    NonBlockingConnection,例如:
    new NonBlockingConnection("localhost", 8090,new MyHandler() )
    構(gòu)造方法的主要代碼:

     .
     SocketChannel channel 
    = openSocket(localAddress, options);//實(shí)際調(diào)用:SocketChannel channel = SocketChannel.open();
       
     IoConnector connector 
    = getDefaultConnector();
           
        IIoConnectorCallback callback 
    = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
        connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);
          
    }



    建立連接,生成IoConnector用來(lái)管理連接,然后connector開(kāi)始啟動(dòng),做一些初始化的工作:

    其中connector.connectAsync(…方法會(huì)執(zhí)行會(huì)產(chǎn)生一個(gè)RegisterTask任務(wù)到IoConnector,這個(gè)RegisterTask做的事情如下:
    selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注冊(cè)SelectionKey.OP_CONNECT
    當(dāng)IoConnector運(yùn)行會(huì)執(zhí)行這個(gè)任務(wù):
    看下他的run方法主要代碼:

    while(isOpen.get()) {
               
                    handledTasks 
    = performTaskQueue();//首先運(yùn)行上一步創(chuàng)建的RegisterTask注冊(cè)SelectionKey.OP_CONNECT
                    int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已經(jīng)準(zhǔn)備好
                    if (eventCount > 0{
                        handleConnect();
    //如果準(zhǔn)備好就處理連接事件
                    }
     else {
                     checkForLooping(handledTasks);
                    }

             
      }



     handleConnect()的代碼:
     

     private void handleConnect() {
            Set
    <SelectionKey> selectedEventKeys = selector.selectedKeys();

            Iterator
    <SelectionKey> it = selectedEventKeys.iterator();
            
    while (it.hasNext()) {
                SelectionKey eventKey 
    = it.next();
                it.remove();

                RegisterTask registerTask 
    = (RegisterTask) eventKey.attachment();

                
    if (eventKey.isValid() && eventKey.isConnectable()) {
                    
                    
    try {
                        
    boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已經(jīng)通訊連接
                        if (isConnected) {
                         eventKey.cancel();
                         registerTask.callback.onConnectionEstablished();
    //連接建立好就做下一步工作,注冊(cè)read事件。
                        }

                    
                }

            }

        }


     接著看registerTask.callback.onConnectionEstablished()
     主要初始化iohander并注冊(cè)read事件

     private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException {
      
    this.ioHandler = ioHandler;
      ioHandler.init(handlerCallback);
    //這個(gè)方法里面注冊(cè)了read
      isConnected.set(true);//這個(gè)時(shí)候通訊連接才真正建立起來(lái)了
     }



    繼續(xù)看ioHandler.init(handlerCallback)方法:

    public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException {
         
      dispatcher.register(
    this, SelectionKey.OP_READ);//注冊(cè)SelectionKey.OP_READ時(shí)間,可以接受服務(wù)端的消息了
    }



    服務(wù)端和客戶端就可以互相通信了。本文大致講解了xsocket的代碼的流程,其中講解有誤的地方請(qǐng)兄弟們指出,多謝!


    評(píng)論

    # re: xsocket源碼解讀  回復(fù)  更多評(píng)論   

    2011-10-20 08:32 by tbw
    不錯(cuò) 不錯(cuò) 學(xué)習(xí)了

    # re: xsocket源碼解讀  回復(fù)  更多評(píng)論   

    2012-03-05 16:18 by Miao
    請(qǐng)問(wèn),xsocket怎么連接代理呢??實(shí)在找不到相關(guān)資料....謝謝..

    # re: xsocket源碼解讀  回復(fù)  更多評(píng)論   

    2014-11-21 18:51 by xsank
    有幫助,寫樓主分享
    主站蜘蛛池模板: 亚洲国产成人精品无码区在线观看| 亚洲成av人片不卡无码久久| 成人无遮挡毛片免费看| 欧洲美熟女乱又伦免费视频| 日本一道一区二区免费看| 亚洲人成国产精品无码| 亚洲AV无码不卡在线播放| 亚洲白嫩在线观看| 亚洲高清国产拍精品熟女| 思思久久99热免费精品6| 黄色网址在线免费| 四虎国产精品免费久久| 全亚洲最新黄色特级网站| 亚洲精品V欧洲精品V日韩精品| 久久久久亚洲AV无码观看| 亚洲中文字幕久久无码| 一区二区视频免费观看| 最近中文字幕mv免费高清在线 | 亚洲成人在线免费观看| 在线不卡免费视频| 久久精品国产亚洲5555| 亚洲成人福利在线| 黄页网站在线免费观看| 一级毛片全部免费播放| 日韩在线免费播放| 亚洲AV中文无码字幕色三| 久久夜色精品国产噜噜亚洲a| 一级毛片试看60分钟免费播放| 97精品免费视频| 免费成人午夜视频| 亚洲性天天干天天摸| 久久亚洲精品无码av| 久久免费看少妇高潮V片特黄| 日本免费一区尤物| 亚洲AV成人无码久久精品老人| 亚洲色成人网站WWW永久四虎| 伊人免费在线观看| 国产精品公开免费视频| 蜜芽亚洲av无码精品色午夜| 免费看黄福利app导航看一下黄色录像| 久久99国产乱子伦精品免费|