<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    hengheng123456789

      BlogJava :: 首頁 :: 聯系 :: 聚合  :: 管理
      297 Posts :: 68 Stories :: 144 Comments :: 0 Trackbacks
     

    MINA Beginning

    http://mina.apache.org/

    http://mina.apache.org/documentation.html

    1.         傳統Socket:阻塞式通信

    java傳統socket技術中,每建立一個Socket連接時,須同時創建一個新線程對該Socket進行單獨通信(采用阻塞的方式通信)。

    這種方式具有很高的響應速度,并且控制起來也很簡單,在連接數較少的時候非常有效,但是如果對每一個連接都產生一個線程無疑是對系統資源的一種浪費,如果連接數較多將會出現資源不足的情況。下面的代碼就說明了這一點。

    a)         server code:

    package Socket;

    import java.io.BufferedReader;

    import java.io.IOException;

    import java.io.InputStreamReader;

    import java.net.ServerSocket;

    import java.net.Socket;

    public class MultiUserServer extends Thread {

           private Socket client;

           public MultiUserServer(Socket c) {

                  this.client = c;

           }

           public void run() {

                  try {

                         BufferedReader in = new BufferedReader(new InputStreamReader(client

                                       .getInputStream()));

                         // Mutil User but can't parallel

                         while (true) {

                                String str = in.readLine();

                                System.out.println("receive message: " + str);

                                if (str.equals("end"))

                                       break;

                         }

                         client.close();

                  } catch (IOException ex) {

                  }

           }

           public static void main(String[] args) throws IOException {

                  int port = 10086;

                  if (args.length > 0)

                         port = Integer.parseInt(args[0]);

                  ServerSocket server = new ServerSocket(port);

                  System.out.println("the server socket application is created!");

                  while (true) {

                         // transfer location change Single User or Multi User

                         MultiUserServer mu = new MultiUserServer(server.accept());

                         mu.start();

                  }

           }

    }

    b)        client code:

    package Socket;

    import java.io.BufferedReader;

    import java.io.InputStreamReader;

    import java.io.PrintWriter;

    import java.net.Socket;

    public class Client {

           static Socket server;

           public static void main(String[] args) throws Exception {

                  String host = "192.168.0. 10";

                  int port = 10086;

                  if (args.length > 1) {

                         host = args[0];

                         port = Integer.parseInt(args[1]);

                  }

                  System.out.println("connetioning:" + host + ":" + port);

                  server = new Socket(host, port);

                  PrintWriter out = new PrintWriter(server.getOutputStream());

                  BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));

                  while (true) {

                         String str = wt.readLine();

                         out.println(str);

                         out.flush();

                         if (str.equals("end")) {

                                break;

                         }

                  }

                  server.close();

           }

    }

    2.         nio socket: 非阻塞通訊模式

    a)         NIO 設計背后的基石:反應器模式

    反應器模式: 用于事件多路分離和分派的體系結構模式。

    反應器模式的核心功能如下:

    n         將事件多路分用

    n         將事件分派到各自相應的事件處理程序

    b)        NIO 的非阻塞 I/O 機制是圍繞 選擇器 通道構建的。

    選擇器(Selector) Channel 的多路復用器。 Selector 類將傳入客戶機請求多路分用并將它們分派到各自的請求處理程序。

    通道(Channel ):表示服務器和客戶機之間的一種通信機制,一個通道負責處理一類請求/事件。

    簡單的來說:

    NIO是一個基于事件的IO架構,最基本的思想就是:有事件我會通知你,你再去做與此事件相關的事情。而且NIO主線程只有一個,不像傳統的模型,需要多個線程以應對客戶端請求,也減輕了JVM的工作量。

    c)        Channel注冊至Selector以后,經典的調用方法如下:

            while (somecondition) {

                int n = selector.select(TIMEOUT);

                if (n == 0)

                    continue;

                for (Iterator iter = selector.selectedKeys().iterator(); iter

                        .hasNext();) {

                    if (key.isAcceptable())

                        doAcceptable(key);

                    if (key.isConnectable())

                        doConnectable(key);

                    if (key.isValid() && key.isReadable())

                        doReadable(key);

                    if (key.isValid() && key.isWritable())

                        doWritable(key);

                    iter.remove();

                }

            }

    NIO 有一個主要的類Selector,這個類似一個觀察者,只要我們把需要探知的socketchannel告訴Selector,我們接著做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛注冊過的socketchannel,然后,我們從這個Channel中讀取數據,放心,包準能夠讀到,接著我們可以處理這些數據。

    Selector內部原理實際是在做一個對所注冊的channel的輪詢訪問,不斷的輪詢(目前就這一個算法),一旦輪詢到一個channel有所注冊的事情發生,比如數據來了,他就會站起來報告,交出一把鑰匙,讓我們通過這把鑰匙來讀取這個channel的內容。

    d)        Sample01

    package NIO;

    // ==================== Program Discription =====================

    // 程序名稱:示例12-14 : SocketChannelDemo.java

    // 程序目的:學習Java NIO#SocketChannel

    // ==============================================================

    import java.net.InetSocketAddress;

    import java.net.ServerSocket;

    import java.nio.ByteBuffer;

    import java.nio.channels.SelectableChannel;

    import java.nio.channels.SelectionKey;

    import java.nio.channels.Selector;

    import java.nio.channels.ServerSocketChannel;

    import java.nio.channels.SocketChannel;

    import java.util.Iterator;

    public class SocketChannelDemo {

           public static int PORT_NUMBER = 23;// 監聽端口

           static String line = "";

           ServerSocketChannel serverChannel;

           ServerSocket serverSocket;

           Selector selector;

           private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

           public static void main(String[] args) throws Exception {

                  SocketChannelDemo server = new SocketChannelDemo();

                  server.init(args);

                  server.startWork();

           }

           public void init(String[] argv) throws Exception {

                  int port = PORT_NUMBER;

                  if (argv.length > 0) {

                         port = Integer.parseInt(argv[0]);

                  }

                  System.out.println("Listening on port " + port);

                  // 分配一個ServerSocketChannel

                  serverChannel = ServerSocketChannel.open();

                  // ServerSocketChannel里獲得一個對應的Socket

                  serverSocket = serverChannel.socket();

                  // 生成一個Selector

                  selector = Selector.open();

                  // Socket綁定到端口上

                  serverSocket.bind(new InetSocketAddress(port));

                  // serverChannel為非bolck

                  serverChannel.configureBlocking(false);

                  // 通過Selector注冊ServerSocetChannel

                  serverChannel.register(selector, SelectionKey.OP_ACCEPT);

           }

           public void startWork() throws Exception {

                  while (true) {

                         int n = selector.select();// 獲得IO準備就緒的channel數量

                         if (n == 0) {

                                continue; // 沒有channel準備就緒,繼續執行

                         }

                         // 用一個iterator返回Selectorselectedkeys

                         Iterator it = selector.selectedKeys().iterator();

                         // 處理每一個SelectionKey

                         while (it.hasNext()) {

                                SelectionKey key = (SelectionKey) it.next();

                                // 判斷是否有新的連接到達

                                if (key.isAcceptable()) {

                                       // 返回SelectionKeyServerSocketChannel

                                       ServerSocketChannel server = (ServerSocketChannel) key

                                                     .channel();

                                       SocketChannel channel = server.accept();

                                       registerChannel(selector, channel, SelectionKey.OP_READ);

                                       doWork(channel);

                                }

                                // 判斷是否有數據在此channel里需要讀取

                                if (key.isReadable()) {

                                       processData(key);

                                }

                                // 刪除 selectedkeys

                                it.remove();

                         }

                  }

           }

           protected void registerChannel(Selector selector,

                         SelectableChannel channel, int ops) throws Exception {

                  if (channel == null) {

                         return;

                  }

                  channel.configureBlocking(false);

                  channel.register(selector, ops);

           }

           // 處理接收的數據

           protected void processData(SelectionKey key) throws Exception {

                  SocketChannel socketChannel = (SocketChannel) key.channel();

                  int count;

                  buffer.clear(); // 清空buffer

                  // 讀取所有的數據

                  while ((count = socketChannel.read(buffer)) > 0) {

                         buffer.flip();

                         // send the data, dont assume it goes all at once

                         while (buffer.hasRemaining()) {

                                char c = (char) buffer.get();

                                line += c;

                                // 如果收到回車鍵,則在返回的字符前增加[echo]$字樣,并且server端打印出字符串

                                if (c == (char) 13) {

                                       buffer.clear();

                                       buffer.put("[echo]$".getBytes());

                                       buffer.flip();

                                       System.out.println(line); //

                                       line = "";

                                }

                                socketChannel.write(buffer);// Socket里寫數據

                         }

                         buffer.clear(); // 清空buffer

                  }

                  if (count < 0) {

                         // count<0,說明已經讀取完畢

                         socketChannel.close();

                  }

           }

           private void doWork(SocketChannel channel) throws Exception {

                  buffer.clear();

                  buffer

                                .put("Hello,I am working,please input some thing,and i will echo to you![echo]$"

                                              .getBytes());

                  buffer.flip();

                  channel.write(buffer);

           }

    }

    運行此程序,然后在控制臺輸入命令telnet localhost 23

    e)         Server code:

    public class NonBlockingServer

    {

        public Selector sel = null;

        public ServerSocketChannel server = null;

        public SocketChannel socket = null;

        public int port = 4900;

        String result = null;

        public NonBlockingServer()

        {

                  System.out.println("Inside default ctor");

        }

           public NonBlockingServer(int port)

        {

                  System.out.println("Inside the other ctor");

                  this.port = port;

        }

        public void initializeOperations() throws IOException,UnknownHostException

        {

                  System.out.println("Inside initialization");

                  sel = Selector.open();

                  server = ServerSocketChannel.open();

                  server.configureBlocking(false);

                  InetAddress ia = InetAddress.getLocalHost();

                  InetSocketAddress isa = new InetSocketAddress(ia,port);

                  server.socket().bind(isa);

        }

           public void startServer() throws IOException

        {

                  System.out.println("Inside startserver");

            initializeOperations();

                  System.out.println("Abt to block on select()");

                  SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );     

                  while (acceptKey.selector().select() > 0 )

                  {    

              

                         Set readyKeys = sel.selectedKeys();

                         Iterator it = readyKeys.iterator();

                         while (it.hasNext()) {

                                SelectionKey key = (SelectionKey)it.next();

                                it.remove();

                    

                                if (key.isAcceptable()) {

                                       System.out.println("Key is Acceptable");

                                       ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                                       socket = (SocketChannel) ssc.accept();

                                       socket.configureBlocking(false);

                                       SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);

                                }

                                if (key.isReadable()) {

                                       System.out.println("Key is readable");

                                       String ret = readMessage(key);

                                       if (ret.length() > 0) {

                                              writeMessage(socket,ret);

                                       }

                                }

                                if (key.isWritable()) {

                                       System.out.println("THe key is writable");

                                       String ret = readMessage(key);

                                       socket = (SocketChannel)key.channel();

                                       if (result.length() > 0 ) {

                                              writeMessage(socket,ret);

                                       }

                                }

                         }

                  }

        }

        public void writeMessage(SocketChannel socket,String ret)

        {

                  System.out.println("Inside the loop");

                  if (ret.equals("quit") || ret.equals("shutdown")) {

                         return;

                  }

                  try

                  {

                         String s = "This is content from server!-----------------------------------------";

                         Charset set = Charset.forName("us-ascii");

                         CharsetDecoder dec = set.newDecoder();

                         CharBuffer charBuf = dec.decode(ByteBuffer.wrap(s.getBytes()));

                         System.out.println(charBuf.toString());

                         int nBytes = socket.write(ByteBuffer.wrap((charBuf.toString()).getBytes()));

                         System.out.println("nBytes = "+nBytes);

                                result = null;

                  }

                  catch(Exception e)

                  {

                         e.printStackTrace();

                  }

        }

        public String readMessage(SelectionKey key)

        {

                  int nBytes = 0;

                  socket = (SocketChannel)key.channel();

            ByteBuffer buf = ByteBuffer.allocate(1024);

                  try

                  {

                nBytes = socket.read(buf);

                         buf.flip();

                         Charset charset = Charset.forName("us-ascii");

                         CharsetDecoder decoder = charset.newDecoder();

                         CharBuffer charBuffer = decoder.decode(buf);

                         result = charBuffer.toString();

              

            }

                  catch(IOException e)

                  {

                         e.printStackTrace();

                  }

                  return result;

        }

        public static void main(String args[])

        {

               NonBlockingServer nb;

               if (args.length < 1)

               {

                      nb = new NonBlockingServer();

               }

               else

               {

                      int port = Integer.parseInt(args[0]);

                      nb = new NonBlockingServer(port);

               }

                   

                  try

                  {

                         nb.startServer();

                         System.out.println("the nonBlocking server is started!");

                  }

                  catch (IOException e)

                  {

                         e.printStackTrace();

                         System.exit(-1);

                  }

           }

    }

    2.2.4.2    Client code:

    public class Client {

           public SocketChannel client = null;

           public InetSocketAddress isa = null;

           public RecvThread rt = null;

           private String host;

           private int port;

           public Client(String host, int port) {

                  this.host = host;

                  this.port = port;

           }

           public void makeConnection() {

                  String proxyHost = "192.168.254.212";

                  String proxyPort = "1080";

                  System.getProperties().put("socksProxySet", "true");

                  System.getProperties().put("socksProxyHost", proxyHost);

                  System.getProperties().put("socksProxyPort", proxyPort);

                  int result = 0;

                  try {

                         client = SocketChannel.open();

                         isa = new InetSocketAddress(host, port);

                         client.connect(isa);

                         client.configureBlocking(false);

                         receiveMessage();

                  } catch (UnknownHostException e) {

                         e.printStackTrace();

                  } catch (IOException e) {

                         e.printStackTrace();

                  }

                  long begin = System.currentTimeMillis();

                  sendMessage();

                  long end = System.currentTimeMillis();

                  long userTime = end - begin;

                  System.out.println("use tiem: " + userTime);

                  try {

                         interruptThread();

                         client.close();

                         System.exit(0);

                  } catch (IOException e) {

                         e.printStackTrace();

                  }

           }

           public int sendMessage() {

                  System.out.println("Inside SendMessage");

                  String msg = null;

                  ByteBuffer bytebuf;

                  int nBytes = 0;

                  try {

                         msg = "It's message from client!";

                         System.out.println("msg is "+msg);

                         bytebuf = ByteBuffer.wrap(msg.getBytes());

                         for (int i = 0; i < 1000; i++) {

                                nBytes = client.write(bytebuf);

                                System.out.println(i + " finished");

                         }

                         interruptThread();

                         try {

                                Thread.sleep(5000);

                         } catch (Exception e) {

                                e.printStackTrace();

                         }

                         client.close();

                         return -1;

                  } catch (IOException e) {

                         e.printStackTrace();

                  }

                  return nBytes;

           }

           public void receiveMessage() {

                  rt = new RecvThread("Receive THread", client);

                  rt.start();

           }

           public void interruptThread() {

                  rt.val = false;

           }

           public static void main(String args[]) {

                  if (args.length < 2) {

                         System.err.println("You should put 2 args: host,port");

                  } else {

                         String host = args[0];

                         int port = Integer.parseInt(args[1]);

                         Client cl = new Client(host, port);

                         cl.makeConnection();

                  }

                  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

                  String msg;

           }

           public class RecvThread extends Thread {

                  public SocketChannel sc = null;

                  public boolean val = true;

                  public RecvThread(String str, SocketChannel client) {

                         super(str);

                         sc = client;

                  }

                  public void run() {

                         int nBytes = 0;

                         ByteBuffer buf = ByteBuffer.allocate(2048);

                         try {

                                while (val) {

                                       while ((nBytes = nBytes = client.read(buf)) > 0) {

                                              buf.flip();

                                              Charset charset = Charset.forName("us-ascii");

                                              CharsetDecoder decoder = charset.newDecoder();

                                              CharBuffer charBuffer = decoder.decode(buf);

                                              String result = charBuffer.toString();

                                              System.out.println("the server return: " + result);

                                              buf.flip();

                                       }

                                }

                         } catch (IOException e) {

                                e.printStackTrace();

                         }

                  }

           }

    }

    Reactor模式和NIO

    當前分布式計算 Web Services盛行天下,這些網絡服務的底層都離不開對socket的操作。他們都有一個共同的結構:

    u        Read request

    u        Decode request

    u        Process service

    u        Encode reply

    u        Send reply

    經典的網絡服務的設計如下圖,在每個線程中完成對數據的處理:

    但這種模式在用戶負載增加時,性能將下降非常的快。我們需要重新尋找一個新的方案,保持數據處理的流暢,很顯然,事件觸發機制是最好的解決辦法,當有事件發生時,會觸動handler,然后開始數據的處理。

    Reactor模式類似于AWT中的Event處理:

    Reactor模式參與者

    1.Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理,這類似于AWTthread
    2.Handler
    是負責非堵塞行為,類似于AWT ActionListeners;同時負責將handlersevent事件綁定,類似于AWT addActionListener

    如圖:

    JavaNIOreactor模式提供了實現的基礎機制,它的Selector當發現某個channel有數據時,會通過SlectorKey來告知我們,在此我們實現事件和handler的綁定。

    我們來看看Reactor模式代碼:


    public class Reactor implements Runnable{

      final Selector selector;
      final ServerSocketChannel serverSocket;

      Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
        serverSocket.socket().bind(address);

        serverSocket.configureBlocking(false);
        //selector注冊該channel
         SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

        logger.debug("-->Start serverSocket.register!");

        //利用skattache功能綁定Acceptor 如果有事情,觸發Acceptor
        sk.attach(new Acceptor());
        logger.debug("-->attach(new Acceptor()!");
      }


      public void run() { // normally in a new Thread
        try {
        while (!Thread.interrupted())
        {
          selector.select();
          Set selected = selector.selectedKeys();
          Iterator it = selected.iterator();
          //Selector如果發現channelOP_ACCEPTREAD事件發生,下列遍歷就會進行。
          while (it.hasNext())
            //來一個事件第一次觸發一個accepter線程
            //以后觸發SocketReadHandler
            dispatch((SelectionKey)(it.next()));
            selected.clear();
          }
        }catch (IOException ex) {
            logger.debug("reactor stop!"+ex);
        }
      }

      //運行AcceptorSocketReadHandler
      void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null){
          // r.run();

        }
      }

      class Acceptor implements Runnable { // inner
        public void run() {
        try {
          logger.debug("-->ready for accept!");
          SocketChannel c = serverSocket.accept();
          if (c != null)
            //調用Handler來處理channel
            new SocketReadHandler(selector, c);
          }
        catch(IOException ex) {
          logger.debug("accept stop!"+ex);
        }
        }
      }
    }

    以上代碼中巧妙使用了SocketChannelattach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler

    再看看Handler代碼:

    public class SocketReadHandler implements Runnable {

      public static Logger logger = Logger.getLogger(SocketReadHandler.class);

      private Test test=new Test();

      final SocketChannel socket;
      final SelectionKey sk;

       static final int READING = 0, SENDING = 1;
      int state = READING;

      public SocketReadHandler(Selector sel, SocketChannel c)
        throws IOException {

        socket = c;

        socket.configureBlocking(false);
         sk = socket.register(sel, 0);

        //SelectionKey綁定為本Handler 下一步有事件觸發時,將調用本類的run方法。
        //參看dispatch(SelectionKey k)
        sk.attach(this);

        //同時將SelectionKey標記為可讀,以便讀取。
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
      }

      public void run() {
        try{
        // test.read(socket,input);
          readRequest() ;
        }catch(Exception ex){
        logger.debug("readRequest error"+ex);
        }
      }


    /**
    *
    處理讀取data
    * @param key
    * @throws Exception
    */
    private void readRequest() throws Exception {

      ByteBuffer input = ByteBuffer.allocate(1024);
      input.clear();
      try{

        int bytesRead = socket.read(input);

        ......

        //激活線程池處理這些request
        requestHandle(new Request(socket,btt));

        .....


      }catch(Exception e) {
      }

    }

    注意在Handler里面又執行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發生時,將直接觸發Handler.從而開始了數據的讀 處理 寫 發出等流程處理。

    將數據讀出后,可以將這些數據處理線程做成一個線程池,這樣,數據讀出后,立即扔到線程池中,這樣加速處理速度:

    更進一步,我們可以使用多個Selector分別處理連接和讀事件。

    一個高性能的Java網絡服務機制就要形成,激動人心的集群并行計算即將實現。

    3.         Socket網絡框架 MINA

    a)         Overview

    MINA是一個網絡應用框架,在不犧牲性能和可擴展性的前提下用于解決如下問題:

    n         快速開發自己的應用。

    n         高可維護性,高可復用性:網絡I/O編碼,消息的編/解碼,業務邏輯互相分離。

    n         相對容易的進行單元測試。

    b)        MINA架構:

    IoSessionManager: Where real I/O occurs

    IoFilters: Filters I/O events • requests

    IoHandler: Your protocol logic

    IoSession: Represents a connection

    n         IoFilters

    IoFilterMINA的功能擴展提供了接口。它攔截所有的IO事件進行事件的預處理和河畜處理(AOP)。我們可以把它想象成Servletfilters

    IoFilter能夠實現以下幾種目的:

    事件日志

    性能檢測

    數據轉換(e.g. SSL support)codec

    防火墻…等等

    n         codec: ProtocolCodecFactory

    MINA提供了方便的Protocol支持。如上說講,codecIoFilters中設置。

    通過它的EncoderDecoder,可以方便的擴展并支持各種基于Socket的網絡協議,比如HTTP服務器、FTP服務器、Telnet服務器等等。

    要實現自己的編碼/解碼器(codec)只需要實現interface: ProtocolCodecFactory即可.

    MINA 1.0版本,MINA已經實現了幾個常用的(codec factory):

    DemuxingProtocolCodecFactory,

    NettyCodecFactory,

    ObjectSerializationCodecFactory,

    TextLineCodecFactory

    其中:

    n         TextLineCodecFactory:

    A ProtocolCodecFactory that performs encoding and decoding between a text line data and a Java

    string object. This codec is useful especially when you work with a text-based protocols such as SMTP and IMAP.

    n         ObjectSerializationCodecFactory:

    A ProtocolCodecFactory that serializes and deserializes Java objects. This codec is very useful when

    you have to prototype your application rapidly without any specific codec.

    n         DemuxingProtocolCodecFactory

    A composite ProtocolCodecFactory that consists of multiple MessageEncoders and MessageDecoders. ProtocolEncoder and ProtocolDecoder this factory returns demultiplex incoming messages and buffers to appropriate MessageEncoders and MessageDecoders.

    n         NettyCodecFactory:

    A MINA ProtocolCodecFactory that provides encoder and decoder for Netty2 Messages and MessageRecognizers.

    n         IoHandler :business logic

    MINA中,所有的業務邏輯都在實現了IoHandlerclass完成。

    Interface Handle:

     all protocol events fired by MINA. There are 6 event handler methods, and they are all invoked by MINA automatically.

     當事件發生時,將觸發IoHandler中的方法:

     sessionCreated:當一個session創建的時候調用;

     sessionOpened:在sessionCreated調用之后被調用;

    sessionClosed:當IO連接被關閉時被調用;

     sessionIdle:當在遠程實體和用戶程序之間沒有數據傳輸的時候被調用;

    exceptionCaught:當IoAcceptor 或者IoHandler.中出現異常時被調用;

    messageReceived:當接受到消息時調用;

    messageSent:當發出請求時調用。

    MINA 1.0中,IoHandler的實現類:

    ChainedIoHandler

     DemuxingIoHandler,

    IoHandlerAdapter

     SingleSessionIoHandlerDelegate

     StreamIoHandler

    具體細節可參考javadoc

    c)        MINA的高級主題:線程模式

    MINA通過它靈活的filter機制來提供多種線程模型。

    沒有線程池過濾器被使用時MINA運行在一個單線程模式。

    如果添加了一個IoThreadPoolFilterIoAcceptor,將得到一個leader-follower模式的線程池。

    如果再添加一個ProtocolThreadPoolFilterserver將有兩個線程池:

    一個(IoThreadPoolFilter)被用于對message對象進行轉換,另外一個(ProtocolThreadPoolFilter)被用于處理業務邏輯。

    SimpleServiceRegistry加上IoThreadPoolFilterProtocolThreadPoolFilter的缺省實現即可適用于需要高伸縮性的應用。如果想使用自己的線程模型,請參考SimpleServiceRegistry的源代碼,并且自己

    初始化Acceptor

    IoThreadPoolFilter threadPool = new IoThreadPoolFilter();threadPool.start();

    IoAcceptor acceptor = new SocketAcceptor();

    acceptor.getFilterChain().addLast( "threadPool", threadPool);

    ProtocolThreadPoolFilter threadPool2 = new ProtocolThreadPoolFilter();

    threadPool2.start();

    ProtocolAcceptor acceptor2 = new IoProtocolAcceptor( acceptor );

    acceptor2.getFilterChain().addLast( "threadPool", threadPool2 );

    ...

    threadPool2.stop();

    threadPool.stop();

    d)        采用MINA進行socket開發,一般步驟如下:

    n         Begin:

    IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器

    or client:

    SocketConnector connector = new SocketConnector(); //建立一個連接器

    n         server的屬性配置:

            SocketAcceptorConfig cfg = new SocketAcceptorConfig();

            cfg.setReuseAddress(true);

            cfg.getFilterChain().addLast(

                        "codec",

                        new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //對象序列化 codec factory

            cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

    n         綁定addressbusiness logic

    server:

            acceptor.bind(

                    new InetSocketAddress( SERVER_PORT ),

                    new ServerSessionHandler( ), cfg ); // 綁定addresshandler

    client:

            connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                            new ClientSessionHandler(msg), cfg );

    n         實現自己的業務邏輯: IoHandler

    n         如有必要,實現自己的CODEC

    下面的代碼演示了采用ObjectSerializationCodecFactory給服務端傳送文件:

    e)         Client

    public class Client

    {

        private static final String HOSTNAME = "192.168.0.81";

        private static final int PORT = 8080;

        private static final int CONNECT_TIMEOUT = 30; // seconds

        public static void main( String[] args ) throws Throwable

        {

            System.out.println("in nio client");

            SocketConnector connector = new SocketConnector();       

            // Configure the service.

            SocketConnectorConfig cfg = new SocketConnectorConfig();

            cfg.setConnectTimeout( CONNECT_TIMEOUT );

              cfg.getFilterChain().addLast(

                        "codec",

                        new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

            cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

            IoSession session;

            if(args.length > 1)

            {

                connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                        new ClientSessionHandler(args), cfg );

            }

            else

            {

                String[] files = {"E:/music/lcl/juhuatai.mp3",

                                        "E:/music/lcl/jimosazhouleng.mp3"};

                connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                        new ClientSessionHandler(files), cfg );

            }

        }

    }

    f)         Clint handleclient端的業務代碼)

    public class ClientSessionHandler extends IoHandlerAdapter

        private String[] files;

        public ClientSessionHandler(String[] files)

        {

            this.files = files;

        }

        public void sessionOpened( IoSession session )

        {

            for (int i = 0; i < this.files.length; i++)

            {

                Thread sendMessageThread = new SendMessageThread("Thread" + i, session,files[i]);

                sendMessageThread.start();

            }

        }

        public void messageReceived( IoSession session, Object message )

        {

            System.out.println("in messageReceived!");

        }

        public void exceptionCaught( IoSession session, Throwable cause )

        {

            session.close();

        }

        public class SendMessageThread extends Thread

        {

            private IoSession session;

            private String filename;

            public SendMessageThread(String name, IoSession session, String filename)

            {

                super(name);

                this.session = session;

                this.filename = filename;

            }

            public void run()

            {

                System.out.println("start thread: " + this.getName());

                try {               

                    ByteBuffer buf = ByteBuffer.allocate(Constants.BUF_SIZE);

                    

                    FileChannel fc = new FileInputStream(filename).getChannel();

                    int index;

                    while ((index = NioFileUtil.readFile(fc, buf)) > 0)

                    {

                      buf.flip();

                      byte[] bs;

                      if (index == buf.capacity())

                      {

                          bs = buf.array();

                      }

                      else

                      {

                          bs = new byte[index];

                          int i = 0;

                         while (buf.hasRemaining())

                          {

                              bs[i++] = buf.get();

                          }

                      }

                      Message msg = new Message(filename,Constants.CMD_SEND, bs);

                      session.write(msg);

                    }

                    Message msg = new Message(filename, Constants.CMD_FINISHED, null);

                    session.write(msg);        

                } catch (Exception e) {

                    e.printStackTrace();

                }          

            }

        }

    }

    g)        Server

    public class Server

    {

        private static final int SERVER_PORT = 8080;

        public static void main( String[] args ) throws Throwable

        {

            IoAcceptor acceptor = new SocketAcceptor();

            // Prepare the service configuration.

            SocketAcceptorConfig cfg = new SocketAcceptorConfig();

            cfg.setReuseAddress( true );

            cfg.getFilterChain().addLast(

                        "codec",

                        new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

            cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

            acceptor.bind(

                    new InetSocketAddress( SERVER_PORT ),

                    new ServerSessionHandler( ), cfg );

            System.out.println( "nioFileServer Listening on port " + SERVER_PORT );

        }

    }

    h)        Server handle:(Server端業務代碼)

    public class ServerSessionHandler extends IoHandlerAdapter

    {   

        public void sessionOpened( IoSession session )

        {

            // set idle time to 60 seconds

            System.out.println("in sessionOpened");

            session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );

            session.setAttribute("times",new Integer(0));

        }

        public void messageReceived( IoSession session, Object message )

        {

            System.out.println("in messageReceived");

               Message msg = (Message) message;

               System.out.println("the file name is: " + msg.getFileName() + ""n");

               this.process(session, msg);

              

        }

        private void process(IoSession session, Message message)

        {

            String[] temparray = message.getFileName().split("[//]");

            String filename ="d:/" + temparray[temparray.length - 1];

            if (session.containsAttribute(message.getFileName()))

            {

                FileChannel channel = (FileChannel)session.getAttribute(message.getFileName());

                if (message.getType().equals(Constants.CMD_SEND))

                {

                    try {

                        NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));

                    } catch (Exception e) {

                        e.printStackTrace();

                    }               

                }

                else

                {

                    try {

                        channel.close();

                        channel = null;

                        session.removeAttribute(message.getFileName());

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }

            else

            {

                try {

                    FileChannel channel = new FileOutputStream(filename).getChannel();

                    NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));

                    session.setAttribute(message.getFileName(), channel);

                } catch (Exception e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }          

            }

        }

        public void sessionIdle( IoSession session, IdleStatus status )

        {

            SessionLog.info( session, "Disconnecting the idle." );

            // disconnect an idle client

            session.close();

        }

        public void exceptionCaught( IoSession session, Throwable cause )

        {

            // close the connection on exceptional situation

            session.close();

        }

    }

    i)          文件操作:

    public class NioFileUtil {

        public static void writeFile(FileChannel fileChannel, ByteBuffer buf) throws Exception

        {

            buf.clear();

            fileChannel.write(buf);    

        }

        public static int readFile(FileChannel fileChannel,ByteBuffer buf) throws IOException

        {

            buf.rewind();

            int index = fileChannel.read(buf);

            return index;

        } 

    }

    j)          常量:

    public class Constants {

        public static final String CMD_FINISHED = "FINISHED";

        public static final String CMD_SEND = "SEND";    

        public static final int BUF_SIZE = 10240;

        private Constants(){}   

    }

    Demo

    Introduction

    org.apache.mina.example.chat

    Chat server which demonstates using the text line codec and Spring integration.

    org.apache.mina.example.chat.client

    Swing based chat client.

    org.apache.mina.example.echoserver

    Echo server which demonstates low-level I/O layer and SSL support.

    org.apache.mina.example.echoserver.ssl

    SSL support classes.

    org.apache.mina.example.httpserver.codec

    A HTTP server implemented with protocol codec (needs more work).

    org.apache.mina.example.httpserver.stream

    A simplistic HTTP server which demonstates stream-based I/O support.

    org.apache.mina.example.netcat

    NetCat client (Network + Unix cat command) which demonstates low-level I/O layer.

    org.apache.mina.example.proxy

    A TCP/IP tunneling proxy example.

    org.apache.mina.example.reverser

    Reverser server which reverses all text lines demonstating high-level protocol layer.

    org.apache.mina.example.sumup

    SumUp Server and Client which sums up all ADD requests.

    org.apache.mina.example.sumup.codec

    Protocol codec implementation for SumUp protocol.

    org.apache.mina.example.sumup.message

    Protocol mmessage classes for SumUp protocol.

    org.apache.mina.example.tennis

    Two tennis players play a game which demonstates in-VM pipes.

    n         友情提示:

    下載并運行MINAdemo程序還頗非周折:

    運行MINA demo applition

    1:JDK5

    產生錯誤:

    Exception in thread "main" java.lang.NoClassDefFoundError: edu/emory/mathcs/backport/java/util/concurrent/Executor

           at org.apache.mina.example.reverser.Main.main(Main.java:44)

    察看minaQA email:

    http://www.mail-archive.com/mina-dev@directory.apache.org/msg02252.html

    原來需要下載:backport-util-concurrent.jar并加入classpath

    http://dcl.mathcs.emory.edu/util/backport-util-concurrent/

    繼續運行還是報錯:

    Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

    原來MINA采用了slf4j項目作為log,繼續下載

    slf4j-simple.jar等,并加入classpath:

    http://www.slf4j.org/download.html

    posted on 2007-09-03 15:38 哼哼 閱讀(2975) 評論(0)  編輯  收藏 所屬分類: JAVA-Web
    主站蜘蛛池模板: 7777久久亚洲中文字幕| 99免费观看视频| 亚洲同性男gay网站在线观看| 亚洲精品一级无码鲁丝片| 免费高清在线爱做视频| 国产精品色拉拉免费看| 国产好大好硬好爽免费不卡| jizz中国免费| 日韩少妇内射免费播放| 国产成人精品免费视频大全麻豆| 韩日电影在线播放免费版| 亚洲性色高清完整版在线观看| 亚洲乱色熟女一区二区三区丝袜| 亚洲成a人片在线观看久| 国产免费MV大全视频网站| 亚洲AV无码一区二区大桥未久| 亚洲精品中文字幕无乱码麻豆| 亚洲男女一区二区三区| 免费人成在线观看播放国产 | 在线A亚洲老鸭窝天堂| 国产成人青青热久免费精品| 免费观看一区二区三区| jyzzjyzz国产免费观看| 国产精品一区二区三区免费| www免费黄色网| 一级毛片免费全部播放| 一级毛片成人免费看a| 一本大道一卡二大卡三卡免费| 国产成人综合亚洲| 亚洲国产精品成人综合久久久 | 黄色网址在线免费| 国产精品亚洲天堂| 国产成人精品日本亚洲语音 | 你懂的免费在线观看| 亚洲高清免费视频| a毛片免费播放全部完整| 美女在线视频观看影院免费天天看| 中文字幕在线观看免费| 国产精品免费AV片在线观看| 亚洲电影免费观看| 水蜜桃视频在线观看免费播放高清|