<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    底層架構(gòu)-遠(yuǎn)程通訊-Mina

    一:Mina概要
        Apache Mina是一個(gè)能夠幫助用戶開(kāi)發(fā)高性能和高伸縮性網(wǎng)絡(luò)應(yīng)用程序的框架。它通過(guò)Java nio技術(shù)基于TCP/IP和UDP/IP協(xié)議提供了抽象的、事件驅(qū)動(dòng)的、異步的API。
    如下的特性:
    1、  基于Java nio的TCP/IP和UDP/IP實(shí)現(xiàn)
    基于RXTX的串口通信(RS232)
    VM 通道通信
    2、通過(guò)filter接口實(shí)現(xiàn)擴(kuò)展,類似于Servlet filters
    3、low-level(底層)和high-level(高級(jí)封裝)的api:
           low-level:使用ByteBuffers
           High-level:使用自定義的消息對(duì)象和解碼器
    4、Highly customizable(易用的)線程模式(MINA2.0 已經(jīng)禁用線程模型了):
           單線程
           線程池
           多個(gè)線程池
    5、基于java5 SSLEngine的SSL、TLS、StartTLS支持
    6、負(fù)載平衡
    7、使用mock進(jìn)行單元測(cè)試
    8、jmx整合
    9、基于StreamIoHandler的流式I/O支持
    10、IOC容器的整合:Spring、PicoContainer
    11、平滑遷移到Netty平臺(tái)

    二:實(shí)踐
        首先講一下客戶端的通信過(guò)程:
    1.通過(guò)SocketConnector同服務(wù)器端建立連接 
    2.鏈接建立之后I/O的讀寫交給了I/O Processor線程,I/O Processor是多線程的 
    3.通過(guò)I/O Processor讀取的數(shù)據(jù)經(jīng)過(guò)IoFilterChain里所有配置的IoFilter,IoFilter進(jìn)行消息的過(guò)濾,格式的轉(zhuǎn)換,在這個(gè)層面可以制定一些自定義的協(xié)議 
    4.最后IoFilter將數(shù)據(jù)交給Handler進(jìn)行業(yè)務(wù)處理,完成了整個(gè)讀取的過(guò)程 
    5.寫入過(guò)程也是類似,只是剛好倒過(guò)來(lái),通過(guò)IoSession.write寫出數(shù)據(jù),然后Handler進(jìn)行寫入的業(yè)務(wù)處理,處理完成后交給IoFilterChain,進(jìn)行消息過(guò)濾和協(xié)議的轉(zhuǎn)換,最后通過(guò)I/O Processor將數(shù)據(jù)寫出到socket通道 
    IoFilterChain作為消息過(guò)濾鏈 
    1.讀取的時(shí)候是從低級(jí)協(xié)議到高級(jí)協(xié)議的過(guò)程,一般來(lái)說(shuō)從byte字節(jié)逐漸轉(zhuǎn)換成業(yè)務(wù)對(duì)象的過(guò)程 
    2.寫入的時(shí)候一般是從業(yè)務(wù)對(duì)象到字節(jié)byte的過(guò)程 
    IoSession貫穿整個(gè)通信過(guò)程的始終 
        客戶端通信過(guò)程 

       
    1.創(chuàng)建服務(wù)器
        package com.gewara.web.module.base;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;

    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

    /**
     * Mina服務(wù)器
     * 
     * 
    @author mike
     *
     * 
    @since 2012-3-15
     
    */
    public class HelloServer {

            private static final int PORT = 8901;// 定義監(jiān)聽(tīng)端口

            public static void main(String[] args) throws IOException{
                // 創(chuàng)建服務(wù)端監(jiān)控線程
                IoAcceptor acceptor = new NioSocketAcceptor();
                
                // 設(shè)置日志記錄器
                acceptor.getFilterChain().addLast("logger", new LoggingFilter());
                
                // 設(shè)置編碼過(guò)濾器
                acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
               
                // 指定業(yè)務(wù)邏輯處理器
                acceptor.setHandler(new HelloServerHandler());
                
                // 設(shè)置端口號(hào)
                acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));
                
                // 啟動(dòng)監(jiān)聽(tīng)線程
                acceptor.bind();
            }
    }
     
    2.創(chuàng)建服務(wù)器端業(yè)務(wù)邏輯
        
    package com.gewara.web.module.base;

    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IoSession;

    /**
     * 服務(wù)器端業(yè)務(wù)邏輯
     * 
     * 
    @author mike
     *
     * 
    @since 2012-3-15
     
    */
    public class HelloServerHandler extends IoHandlerAdapter {
             @Override
            /**
             * 連接創(chuàng)建事件
             
    */
            public void sessionCreated(IoSession session){
                // 顯示客戶端的ip和端口
                System.out.println(session.getRemoteAddress().toString());
            }

            @Override
            /**
             * 消息接收事件
             
    */
            public void messageReceived(IoSession session, Object message) throws Exception{
                String str = message.toString();
                if (str.trim().equalsIgnoreCase("quit")){
                    // 結(jié)束會(huì)話
                    session.close(true);
                    return;
                }
                
                // 返回消息字符串
                session.write("Hi Client!");
                // 打印客戶端傳來(lái)的消息內(nèi)容
                System.out.println("Message written" + str);
            }
    }

    3.創(chuàng)建客戶端
         package com.gewara.web.module.base;

    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;

    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;

    /**
     * Mina客戶端
     * 
     * 
    @author mike
     *
     * 
    @since 2012-3-15
     
    */
    public class HelloClient {
           public static void main(String[] args){
                // 創(chuàng)建客戶端連接器.
                NioSocketConnector connector = new NioSocketConnector();
                
                // 設(shè)置日志記錄器
                connector.getFilterChain().addLast("logger", new LoggingFilter());
                
                // 設(shè)置編碼過(guò)濾器
                connector.getFilterChain().addLast("codec", 
                        new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 
                
                // 設(shè)置連接超時(shí)檢查時(shí)間
                connector.setConnectTimeoutCheckInterval(30);
                
                // 設(shè)置事件處理器
                connector.setHandler(new HelloClientHandler());
                
                // 建立連接
                ConnectFuture cf = connector.connect(new InetSocketAddress("192.168.2.89", 8901));
                
                // 等待連接創(chuàng)建完成
                cf.awaitUninterruptibly();
                
                // 發(fā)送消息  
                cf.getSession().write("Hi Server!");
               
                // 發(fā)送消息
                cf.getSession().write("quit");
                
                // 等待連接斷開(kāi)
                cf.getSession().getCloseFuture().awaitUninterruptibly();

                // 釋放連接
                connector.dispose();
            }
    }

    4.客戶端業(yè)務(wù)邏輯
    package com.gewara.web.module.base;

    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IoSession;

    public class HelloClientHandler extends IoHandlerAdapter {
            @Override
            /**
             * 消息接收事件
             
    */
            public void messageReceived(IoSession session, Object message) throws Exception{
                //顯示接收到的消息
                System.out.println("server message:"+message.toString());
            }
    }

    5.先啟動(dòng)服務(wù)器端,然后啟動(dòng)客戶端
    2012-03-15 14:45:41,456  INFO  logging.LoggingFilter - CREATED
    /192.168.2.89:2691
    2012-03-15 14:45:41,456  INFO  logging.LoggingFilter - OPENED
    2012-03-15 14:45:41,487  INFO  logging.LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=11 cap=2048: 48 69 20 53 65 72 76 65 72 21 0A]
    2012-03-15 14:45:41,487  DEBUG codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1
    Message writtenHi Server!
    2012-03-15 14:45:41,487  INFO  logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
    2012-03-15 14:45:41,487  INFO  logging.LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=5 cap=2048: 71 75 69 74 0A]
    2012-03-15 14:45:41,487  DEBUG codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1
    2012-03-15 14:45:41,487  INFO  logging.LoggingFilter - CLOSED

    三:分析源碼
    1.首先看服務(wù)器
                // 創(chuàng)建服務(wù)端監(jiān)控線程
                IoAcceptor acceptor = new NioSocketAcceptor();
                
                // 設(shè)置日志記錄器
                acceptor.getFilterChain().addLast("logger", new LoggingFilter());
                
                // 設(shè)置編碼過(guò)濾器
                acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
               
                // 指定業(yè)務(wù)邏輯處理器
                acceptor.setHandler(new HelloServerHandler());
                
                // 設(shè)置端口號(hào)
                acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));
                
                // 啟動(dòng)監(jiān)聽(tīng)線程
                acceptor.bind();

      1)先創(chuàng)建NioSocketAcceptor nio的接收器,談到Socket就要說(shuō)到Reactor模式 
        當(dāng)前分布式計(jì)算 Web Services盛行天下,這些網(wǎng)絡(luò)服務(wù)的底層都離不開(kāi)對(duì)socket的操作。他們都有一個(gè)共同的結(jié)構(gòu):
    1. Read request
    2. Decode request
    3. Process service                                     
    4. Encode reply
    5. Send reply 

    但這種模式在用戶負(fù)載增加時(shí),性能將下降非常的快。我們需要重新尋找一個(gè)新的方案,保持?jǐn)?shù)據(jù)處理的流暢,很顯然,事件觸發(fā)機(jī)制是最好的解決辦法,當(dāng)有事件發(fā)生時(shí),會(huì)觸動(dòng)handler,然后開(kāi)始數(shù)據(jù)的處理。
    Reactor模式類似于AWT中的Event處理。

    Reactor模式參與者

    1.Reactor 負(fù)責(zé)響應(yīng)IO事件,一旦發(fā)生,廣播發(fā)送給相應(yīng)的Handler去處理,這類似于AWT的thread
    2.Handler 是負(fù)責(zé)非堵塞行為,類似于AWT ActionListeners;同時(shí)負(fù)責(zé)將handlers與event事件綁定,類似于AWT addActionListener


    并發(fā)系統(tǒng)常采用reactor模式,簡(jiǎn)稱觀察者模式,代替常用的多線程處理方式,利用有限的系統(tǒng)的資源,提高系統(tǒng)的吞吐量

    可以看一下這篇文章,講解的很生動(dòng)具體,一看就明白reactor模式的好處http://daimojingdeyu.iteye.com/blog/828696 
    Reactor模式是編寫高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之一,它具有如下的優(yōu)點(diǎn):
        1)響應(yīng)快,不必為單個(gè)同步時(shí)間所阻塞,雖然Reactor本身依然是同步的;
        2)編程相對(duì)簡(jiǎn)單,可以最大程度的避免復(fù)雜的多線程及同步問(wèn)題,并且避免了多線程/進(jìn)程的切換開(kāi)銷;
        3)可擴(kuò)展性,可以方便的通過(guò)增加Reactor實(shí)例個(gè)數(shù)來(lái)充分利用CPU資源;
        4)可復(fù)用性,reactor框架本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性; 

    2)其次,再說(shuō)說(shuō)NIO的基本原理和使用
        NIO 有一個(gè)主要的類Selector,這個(gè)類似一個(gè)觀察者,只要我們把需要探知的socketchannel告訴Selector,我們接著做別的事情,當(dāng)有事件發(fā)生時(shí),他會(huì)通知我們,傳回一組 SelectionKey,我們讀取這些Key,就會(huì)獲得我們剛剛注冊(cè)過(guò)的socketchannel,然后,我們從這個(gè)Channel中讀取數(shù)據(jù),放心,包準(zhǔn)能夠讀到,接著我們可以處理這些數(shù)據(jù)。
      Selector內(nèi)部原理實(shí)際是在做一個(gè)對(duì)所注冊(cè)的channel的輪詢?cè)L問(wèn),不斷的輪詢(目前就這一個(gè)算法),一旦輪詢到一個(gè)channel有所注冊(cè)的事情發(fā)生,比如數(shù)據(jù)來(lái)了,他就會(huì)站起來(lái)報(bào)告,交出一把鑰匙,讓我們通過(guò)這把鑰匙(SelectionKey表示 SelectableChannel 在 Selector 中的注冊(cè)的標(biāo)記。 )來(lái)讀取這個(gè)channel的內(nèi)容。










































    posted on 2012-03-15 14:49 陳睿 閱讀(5661) 評(píng)論(0)  編輯  收藏 所屬分類: 底層架構(gòu)


    只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


    網(wǎng)站導(dǎo)航:
     

    導(dǎo)航

    <2012年3月>
    26272829123
    45678910
    11121314151617
    18192021222324
    25262728293031
    1234567

    統(tǒng)計(jì)

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    最新評(píng)論

    閱讀排行榜

    評(píng)論排行榜

    主站蜘蛛池模板: 亚洲一区精品伊人久久伊人| 亚洲乱码在线观看| 免费看大黄高清网站视频在线| 国产在线观看无码免费视频| 亚洲人成电影网站色| 久久精品国产精品亚洲毛片| 亚洲一本大道无码av天堂| 成年人视频在线观看免费| 免费无码一区二区三区| 久久毛片免费看一区二区三区| 亚洲乱亚洲乱妇24p| 亚洲国产成人久久综合一区| 亚洲AV无码第一区二区三区| 亚洲中文字幕成人在线| 国产成人免费ā片在线观看| 青青久在线视频免费观看| www视频免费看| 18观看免费永久视频| 嫩草在线视频www免费观看| 国产伦精品一区二区免费| 国产成人亚洲午夜电影| 久久久久久亚洲精品影院| 亚洲一本之道高清乱码| 亚洲白色白色永久观看| 亚洲国产精品人久久| 亚洲AV午夜成人片| 日本亚洲视频在线| 国产亚洲人成网站在线观看不卡| 亚洲性在线看高清h片| 精品无码一区二区三区亚洲桃色 | 久99久精品免费视频热77| 中文字幕视频免费在线观看| 成人嫩草影院免费观看| 曰批免费视频播放在线看片二| 亚洲人成网站999久久久综合| 中文字幕亚洲综合小综合在线| 亚洲乱码日产精品BD在线观看| 亚洲第一二三四区| 99久久婷婷国产综合亚洲| 亚洲人片在线观看天堂无码| 亚洲AV性色在线观看|