一:Mina概要 Apache Mina是一個(gè)能夠幫助用戶開(kāi)發(fā)高性能和高伸縮性網(wǎng)絡(luò)應(yīng)用程序的框架。它通過(guò)Java nio技術(shù)基于TCP/IP和UDP/IP協(xié)議提供了抽象的、事件驅(qū)動(dòng)的、異步的API。
如下的特性:
1、 基于Java nio的TCP/IP和UDP/IP實(shí)現(xiàn)
基于RXTX的串口通信(RS232)
VM 通道通信
2、通過(guò)filter接口實(shí)現(xiàn)擴(kuò)展,類似于Servlet filters
3、low-level(底層)和high-level(高級(jí)封裝)的api:
low-level:使用ByteBuffers
High-level:使用自定義的消息對(duì)象和解碼器
4、Highly customizable(易用的)線程模式(MINA2.0 已經(jīng)禁用線程模型了):
單線程
線程池
多個(gè)線程池
5、基于java5 SSLEngine的SSL、TLS、StartTLS支持
6、負(fù)載平衡
7、使用mock進(jìn)行單元測(cè)試
8、jmx整合
9、基于StreamIoHandler的流式I/O支持
10、IOC容器的整合:Spring、PicoContainer
11、平滑遷移到Netty平臺(tái)
二:實(shí)踐 首先講一下客戶端的通信過(guò)程:
1.通過(guò)SocketConnector同服務(wù)器端建立連接
2.鏈接建立之后I/O的讀寫交給了I/O Processor線程,I/O Processor是多線程的
3.通過(guò)I/O Processor讀取的數(shù)據(jù)經(jīng)過(guò)IoFilterChain里所有配置的IoFilter,IoFilter進(jìn)行消息的過(guò)濾,格式的轉(zhuǎn)換,在這個(gè)層面可以制定一些自定義的協(xié)議
4.最后IoFilter將數(shù)據(jù)交給Handler進(jìn)行業(yè)務(wù)處理,完成了整個(gè)讀取的過(guò)程
5.寫入過(guò)程也是類似,只是剛好倒過(guò)來(lái),通過(guò)IoSession.write寫出數(shù)據(jù),然后Handler進(jìn)行寫入的業(yè)務(wù)處理,處理完成后交給IoFilterChain,進(jìn)行消息過(guò)濾和協(xié)議的轉(zhuǎn)換,最后通過(guò)I/O Processor將數(shù)據(jù)寫出到socket通道
IoFilterChain作為消息過(guò)濾鏈
1.讀取的時(shí)候是從低級(jí)協(xié)議到高級(jí)協(xié)議的過(guò)程,一般來(lái)說(shuō)從byte字節(jié)逐漸轉(zhuǎn)換成業(yè)務(wù)對(duì)象的過(guò)程
2.寫入的時(shí)候一般是從業(yè)務(wù)對(duì)象到字節(jié)byte的過(guò)程
IoSession貫穿整個(gè)通信過(guò)程的始終
客戶端通信過(guò)程

1.創(chuàng)建服務(wù)器
package com.gewara.web.module.base;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
/**
* Mina服務(wù)器
*
* @author mike
*
* @since 2012-3-15
*/
public class HelloServer {
private static final int PORT = 8901;// 定義監(jiān)聽(tīng)端口
public static void main(String[] args) throws IOException{
// 創(chuàng)建服務(wù)端監(jiān)控線程
IoAcceptor acceptor = new NioSocketAcceptor();
// 設(shè)置日志記錄器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
// 設(shè)置編碼過(guò)濾器
acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 指定業(yè)務(wù)邏輯處理器
acceptor.setHandler(new HelloServerHandler());
// 設(shè)置端口號(hào)
acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));
// 啟動(dòng)監(jiān)聽(tīng)線程
acceptor.bind();
}
}
2.創(chuàng)建服務(wù)器端業(yè)務(wù)邏輯
package com.gewara.web.module.base;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
/**
* 服務(wù)器端業(yè)務(wù)邏輯
*
* @author mike
*
* @since 2012-3-15
*/public class HelloServerHandler
extends IoHandlerAdapter {
@Override
/**
* 連接創(chuàng)建事件
*/ public void sessionCreated(IoSession session){
// 顯示客戶端的ip和端口
System.out.println(session.getRemoteAddress().toString());
}
@Override
/**
* 消息接收事件
*/ public void messageReceived(IoSession session, Object message)
throws Exception{
String str = message.toString();
if (str.trim().equalsIgnoreCase("quit")){
// 結(jié)束會(huì)話
session.close(
true);
return;
}
// 返回消息字符串
session.write("Hi Client!");
// 打印客戶端傳來(lái)的消息內(nèi)容
System.out.println("Message written

" + str);
}
}
3.創(chuàng)建客戶端
package com.gewara.web.module.base;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
/**
* Mina客戶端
*
* @author mike
*
* @since 2012-3-15
*/
public class HelloClient {
public static void main(String[] args){
// 創(chuàng)建客戶端連接器.
NioSocketConnector connector = new NioSocketConnector();
// 設(shè)置日志記錄器
connector.getFilterChain().addLast("logger", new LoggingFilter());
// 設(shè)置編碼過(guò)濾器
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 設(shè)置連接超時(shí)檢查時(shí)間
connector.setConnectTimeoutCheckInterval(30);
// 設(shè)置事件處理器
connector.setHandler(new HelloClientHandler());
// 建立連接
ConnectFuture cf = connector.connect(new InetSocketAddress("192.168.2.89", 8901));
// 等待連接創(chuàng)建完成
cf.awaitUninterruptibly();
// 發(fā)送消息
cf.getSession().write("Hi Server!");
// 發(fā)送消息
cf.getSession().write("quit");
// 等待連接斷開(kāi)
cf.getSession().getCloseFuture().awaitUninterruptibly();
// 釋放連接
connector.dispose();
}
}
4.客戶端業(yè)務(wù)邏輯
package com.gewara.web.module.base;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
public class HelloClientHandler extends IoHandlerAdapter {
@Override
/**
* 消息接收事件
*/
public void messageReceived(IoSession session, Object message) throws Exception{
//顯示接收到的消息
System.out.println("server message:"+message.toString());
}
}
5.先啟動(dòng)服務(wù)器端,然后啟動(dòng)客戶端
2012-03-15 14:45:41,456 INFO logging.LoggingFilter - CREATED
/192.168.2.89:2691
2012-03-15 14:45:41,456 INFO logging.LoggingFilter - OPENED
2012-03-15 14:45:41,487 INFO logging.LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=11 cap=2048: 48 69 20 53 65 72 76 65 72 21 0A]
2012-03-15 14:45:41,487 DEBUG codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED
for session 1
Message written

Hi Server!
2012-03-15 14:45:41,487 INFO logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
2012-03-15 14:45:41,487 INFO logging.LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=5 cap=2048: 71 75 69 74 0A]
2012-03-15 14:45:41,487 DEBUG codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED
for session 1
2012-03-15 14:45:41,487 INFO logging.LoggingFilter - CLOSED
三:分析源碼
1.首先看服務(wù)器
// 創(chuàng)建服務(wù)端監(jiān)控線程
IoAcceptor acceptor = new NioSocketAcceptor();
// 設(shè)置日志記錄器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
// 設(shè)置編碼過(guò)濾器
acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 指定業(yè)務(wù)邏輯處理器
acceptor.setHandler(new HelloServerHandler());
// 設(shè)置端口號(hào)
acceptor.setDefaultLocalAddress(new InetSocketAddress(PORT));
// 啟動(dòng)監(jiān)聽(tīng)線程
acceptor.bind();
1)先創(chuàng)建NioSocketAcceptor nio的接收器,談到Socket就要說(shuō)到Reactor模式 當(dāng)前分布式計(jì)算 Web Services盛行天下,這些網(wǎng)絡(luò)服務(wù)的底層都離不開(kāi)對(duì)socket的操作。他們都有一個(gè)共同的結(jié)構(gòu):1. Read request2. Decode request3. Process service 4. Encode reply5. Send reply 
但這種模式在用戶負(fù)載增加時(shí),性能將下降非常的快。我們需要重新尋找一個(gè)新的方案,保持?jǐn)?shù)據(jù)處理的流暢,很顯然,事件觸發(fā)機(jī)制是最好的解決辦法,當(dāng)有事件發(fā)生時(shí),會(huì)觸動(dòng)handler,然后開(kāi)始數(shù)據(jù)的處理。
Reactor模式類似于AWT中的Event處理。
Reactor模式參與者
1.Reactor 負(fù)責(zé)響應(yīng)IO事件,一旦發(fā)生,廣播發(fā)送給相應(yīng)的Handler去處理,這類似于AWT的thread
2.Handler 是負(fù)責(zé)非堵塞行為,類似于AWT ActionListeners;同時(shí)負(fù)責(zé)將handlers與event事件綁定,類似于AWT addActionListener

并發(fā)系統(tǒng)常采用reactor模式,簡(jiǎn)稱觀察者模式,代替常用的多線程處理方式,利用有限的系統(tǒng)的資源,提高系統(tǒng)的吞吐量。
可以看一下這篇文章,講解的很生動(dòng)具體,一看就明白reactor模式的好處
http://daimojingdeyu.iteye.com/blog/828696 Reactor模式是編寫高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之一,它具有如下的優(yōu)點(diǎn): 1)響應(yīng)快,不必為單個(gè)同步時(shí)間所阻塞,雖然Reactor本身依然是同步的; 2)編程相對(duì)簡(jiǎn)單,可以最大程度的避免復(fù)雜的多線程及同步問(wèn)題,并且避免了多線程/進(jìn)程的切換開(kāi)銷; 3)可擴(kuò)展性,可以方便的通過(guò)增加Reactor實(shí)例個(gè)數(shù)來(lái)充分利用CPU資源; 4)可復(fù)用性,reactor框架本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性; 2)其次,再說(shuō)說(shuō)NIO的基本原理和使用 NIO 有一個(gè)主要的類Selector,這個(gè)類似一個(gè)觀察者,只要我們把需要探知的socketchannel告訴Selector,我們接著做別的事情,當(dāng)有事件發(fā)生時(shí),他會(huì)通知我們,傳回一組 SelectionKey,我們讀取這些Key,就會(huì)獲得我們剛剛注冊(cè)過(guò)的socketchannel,然后,我們從這個(gè)Channel中讀取數(shù)據(jù),放心,包準(zhǔn)能夠讀到,接著我們可以處理這些數(shù)據(jù)。
Selector內(nèi)部原理實(shí)際是在做一個(gè)對(duì)所注冊(cè)的channel的輪詢?cè)L問(wèn),不斷的輪詢(目前就這一個(gè)算法),一旦輪詢到一個(gè)channel有所注冊(cè)的事情發(fā)生,比如數(shù)據(jù)來(lái)了,他就會(huì)站起來(lái)報(bào)告,交出一把鑰匙,讓我們通過(guò)這把鑰匙(SelectionKey表示 SelectableChannel 在 Selector 中的注冊(cè)的標(biāo)記。 )來(lái)讀取這個(gè)channel的內(nèi)容。