<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    weidagang2046的專欄

    物格而后知致
    隨筆 - 8, 文章 - 409, 評(píng)論 - 101, 引用 - 0
    數(shù)據(jù)加載中……

    Reactor模式和NIO

    Reactor模式和NIO

    板橋里人 jdon.com 2002/11/08

    本文可看成是對(duì)Doug Lea Scalable IO in Java一文的翻譯。

    當(dāng)前分布式計(jì)算 Web Services盛行天下,這些網(wǎng)絡(luò)服務(wù)的底層都離不開(kāi)對(duì)socket的操作。他們都有一個(gè)共同的結(jié)構(gòu):
    1. Read request
    2. Decode request
    3. Process service
    4. Encode reply
    5. Send reply

    經(jīng)典的網(wǎng)絡(luò)服務(wù)的設(shè)計(jì)如下圖,在每個(gè)線程中完成對(duì)數(shù)據(jù)的處理:

    但這種模式在用戶負(fù)載增加時(shí),性能將下降非常的快。我們需要重新尋找一個(gè)新的方案,保持?jǐn)?shù)據(jù)處理的流暢,很顯然,事件觸發(fā)機(jī)制是最好的解決辦法,當(dāng)有事件發(fā)生時(shí),會(huì)觸動(dòng)handler,然后開(kāi)始數(shù)據(jù)的處理。

    Reactor模式類似于AWT中的Event處理:

    Reactor模式參與者

    1.Reactor 負(fù)責(zé)響應(yīng)IO事件,一旦發(fā)生,廣播發(fā)送給相應(yīng)的Handler去處理,這類似于AWT的thread
    2.Handler 是負(fù)責(zé)非堵塞行為,類似于AWT ActionListeners;同時(shí)負(fù)責(zé)將handlers與event事件綁定,類似于AWT addActionListener

    如圖:

    Java的NIO為reactor模式提供了實(shí)現(xiàn)的基礎(chǔ)機(jī)制,它的Selector當(dāng)發(fā)現(xiàn)某個(gè)channel有數(shù)據(jù)時(shí),會(huì)通過(guò)SlectorKey來(lái)告知我們,在此我們實(shí)現(xiàn)事件和handler的綁定。

    我們來(lái)看看Reactor模式代碼:


    public class Reactor implements Runnable{

      final Selector selector;
      final ServerSocketChannel serverSocket;

      Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
        serverSocket.socket().bind(address);

        serverSocket.configureBlocking(false);
        //向selector注冊(cè)該channel
         SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

        logger.debug("-->Start serverSocket.register!");

        //利用sk的attache功能綁定Acceptor 如果有事情,觸發(fā)Acceptor
        sk.attach(new Acceptor());
        logger.debug("-->attach(new Acceptor()!");
      }


      public void run() { // normally in a new Thread
        try {
        while (!Thread.interrupted())
        {
          selector.select();
          Set selected = selector.selectedKeys();
          Iterator it = selected.iterator();
          //Selector如果發(fā)現(xiàn)channel有OP_ACCEPT或READ事件發(fā)生,下列遍歷就會(huì)進(jìn)行。
          while (it.hasNext())
            //來(lái)一個(gè)事件 第一次觸發(fā)一個(gè)accepter線程
            //以后觸發(fā)SocketReadHandler
            dispatch((SelectionKey)(it.next()));
            selected.clear();
          }
        }catch (IOException ex) {
            logger.debug("reactor stop!"+ex);
        }
      }

      //運(yùn)行Acceptor或SocketReadHandler
      void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null){
          // r.run();

        }
      }

      class Acceptor implements Runnable { // inner
        public void run() {
        try {
          logger.debug("-->ready for accept!");
          SocketChannel c = serverSocket.accept();
          if (c != null)
            //調(diào)用Handler來(lái)處理channel
            new SocketReadHandler(selector, c);
          }
        catch(IOException ex) {
          logger.debug("accept stop!"+ex);
        }
        }
      }
    }

    以上代碼中巧妙使用了SocketChannel的attach功能,將Hanlder和可能會(huì)發(fā)生事件的channel鏈接在一起,當(dāng)發(fā)生事件時(shí),可以立即觸發(fā)相應(yīng)鏈接的Handler。

    再看看Handler代碼:

    public class SocketReadHandler implements Runnable {

      public static Logger logger = Logger.getLogger(SocketReadHandler.class);

      private Test test=new Test();

      final SocketChannel socket;
      final SelectionKey sk;

       static final int READING = 0, SENDING = 1;
      int state = READING;

      public SocketReadHandler(Selector sel, SocketChannel c)
        throws IOException {

        socket = c;

        socket.configureBlocking(false);
         sk = socket.register(sel, 0);

        //將SelectionKey綁定為本Handler 下一步有事件觸發(fā)時(shí),將調(diào)用本類的run方法。
        //參看dispatch(SelectionKey k)
        sk.attach(this);

        //同時(shí)將SelectionKey標(biāo)記為可讀,以便讀取。
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
      }

      public void run() {
        try{
        // test.read(socket,input);
          readRequest() ;
        }catch(Exception ex){
        logger.debug("readRequest error"+ex);
        }
      }


    /**
    * 處理讀取data
    * @param key
    * @throws Exception
    */
    private void readRequest() throws Exception {

      ByteBuffer input = ByteBuffer.allocate(1024);
      input.clear();
      try{

        int bytesRead = socket.read(input);

        ......

        //激活線程池 處理這些request
        requestHandle(new Request(socket,btt));

        .....


      }catch(Exception e) {
      }

    }

    注意在Handler里面又執(zhí)行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發(fā)生時(shí),將直接觸發(fā)Handler.從而開(kāi)始了數(shù)據(jù)的讀 處理 寫 發(fā)出等流程處理。

    將數(shù)據(jù)讀出后,可以將這些數(shù)據(jù)處理線程做成一個(gè)線程池,這樣,數(shù)據(jù)讀出后,立即扔到線程池中,這樣加速處理速度:

    更進(jìn)一步,我們可以使用多個(gè)Selector分別處理連接和讀事件。

    一個(gè)高性能的Java網(wǎng)絡(luò)服務(wù)機(jī)制就要形成,激動(dòng)人心的集群并行計(jì)算即將實(shí)現(xiàn)。

    轉(zhuǎn)自:http://www.jdon.com/concurrent/reactor.htm

    posted on 2005-06-11 21:09 weidagang2046 閱讀(190) 評(píng)論(0)  編輯  收藏 所屬分類: Java

    主站蜘蛛池模板: 免费看美女裸露无档网站| 亚洲视频在线观看地址| 久久夜色精品国产噜噜噜亚洲AV| 亚洲精品福利你懂| 亚洲国产无线乱码在线观看 | 国产免费131美女视频| 国产成人精品男人免费| 亚洲日本一区二区三区在线不卡| 亚洲精品国产免费| 久99精品视频在线观看婷亚洲片国产一区一级在线 | 亚洲日韩在线第一页| 亚洲精品欧洲精品| 久久99久久成人免费播放| 无码一区二区三区免费视频| 怡红院亚洲怡红院首页| 成人亚洲国产va天堂| 日本免费一区二区久久人人澡| 看Aⅴ免费毛片手机播放| 日韩精品内射视频免费观看| 免费在线观看亚洲| 亚洲天然素人无码专区| 久久午夜夜伦鲁鲁片免费无码影视| 中国性猛交xxxxx免费看| 日本成人在线免费观看| 亚洲最新黄色网址| 国产在线观看片a免费观看| 亚洲精品美女久久777777| 人人爽人人爽人人片av免费| 成人永久免费福利视频网站| 亚洲香蕉久久一区二区三区四区| 国产亚洲sss在线播放| 波多野结衣在线免费观看| 中文字幕亚洲综合久久| 天天影院成人免费观看| 亚洲黄色一级毛片| 女人被免费视频网站| 亚洲中文字幕乱码一区| 亚洲AV无码乱码在线观看牲色| 国产大片免费观看中文字幕| 亚洲色大成网站WWW国产| 免费观看的a级毛片的网站|