Posted on 2011-10-19 17:34
瘋狂 閱讀(6323)
評論(3) 編輯 收藏 所屬分類:
網(wǎng)絡通訊 、
讀代碼
關于xsocket可見于 我的另外一篇文章http://www.tkk7.com/freeman1984/archive/2011/04/25/302706.html,或者查看官網(wǎng)http://xsocket.org/
當然閱讀xsocket需要一些線程,nio,niosocket,和java.util.concurrent(鎖,線程池等)包的一些知識。要不讀起來很費勁,建議先去了解下這些知識。可以在我的文章分類concurrent里面有一些,其他去網(wǎng)上找找。
本文只讀了一個主要的流程,對于一些其他的代碼例如:ssl相關,讀數(shù)據(jù)相關沒有涉及,看有時間能補上。
首先xsocket的幾個關鍵的類
Server: 服務器端初始化線程池創(chuàng)建IoAcceptor
IoAcceptor:采用while循環(huán)接收客戶端連接,并創(chuàng)建IoSocketDispatcher和IoChainableHandler
IoSocketDispatcher:負責注冊SelectionKey以及事件的分發(fā),并交給IoChainableHandler處理,通過一個while循環(huán)來處理注冊的SelectionKey事件。
IHandler:事件處理,數(shù)據(jù)的讀寫等等。
INonBlockingConnection客戶端接口。
(1)首先看下Server創(chuàng)建:
構(gòu)造方法
protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize)
這個方法主要是初始化線程池,構(gòu)件acceptor
defaultWorkerPool = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
workerpool = defaultWorkerPool;

if (sslContext != null)
{//是否使用ssl
acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);

} else
{
acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
}
其中線程池:使用jdk1.5以后的ThreadPoolExecutor,線程池最小默認2,最大100,QUEUE的大小默認也是100
線程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));
(2)構(gòu)件acceptor細節(jié),最后server啟動的時候會啟動acceptor監(jiān)聽客戶端

public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException
{
.
serverChannel = ServerSocketChannel.open();

serverChannel.configureBlocking(true);
serverChannel.socket().setSoTimeout(0); // accept method never times out
serverChannel.socket().setReuseAddress(isReuseAddress);
.
serverChannel.socket().bind(address, backlog);
dispatcherPool = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
.
}

(3)啟動server
server.start();
服務的啟動用了一個單獨的線程,這里面使用到了CountDownLatch可參見另外一篇關于CountDownLatch用法的文章:
http://www.tkk7.com/freeman1984/archive/2011/07/04/353654.html
使用CountDownLatch來控制server的啟動時間,操作多少時間為啟動就,默認是60秒,這里就不講CountDownLatch的代碼了
整個啟動的方法如下,能看懂CountDownLatch的用法基本上就理解了。

public static void start(IServer server, int timeoutSec) throws SocketTimeoutException
{

// start server within a dedicated thread
Thread t = new Thread(server);
t.setName("xServer");
t.start();//請看下面的run方法分析

}
看看他的server線程的run方法:

public void run()
{

acceptor.listen();//啟動前面創(chuàng)建的acceptor開始監(jiān)聽客戶端連接

}
接著查看listen()方法:

public void listen() throws IOException
{
callback.onConnected();//通知server已經(jīng)啟動
accept();//接受客戶端連接
}
}
查看 accept();很明了,使用一個while循環(huán)監(jiān)聽客戶端連接,并建立可客戶端相關的處理類:

while (isOpen.get())
{

// blocking accept call
SocketChannel channel = serverChannel.accept();

// create IoSocketHandler
//創(chuàng)建事件分發(fā)器
IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
//創(chuàng)建io處理
IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
// notify call back
callback.onConnectionAccepted(ioHandler);//很關鍵的一個地方,會注冊SelectionKey.OP_READ,此時客戶端發(fā)來的消息就可以北服務端獲取
}

查看callback.onConnectionAccepted(ioHandler);
此方法會初始化server端的ioHandler,查看初始化的代碼:
dispatcher.register(this, SelectionKey.OP_READ);首先會注冊read選擇器,
如果有read事件發(fā)生dispatcher就會處理。看看dispatcher的run方法(通過一個循環(huán)來不停的處理已經(jīng)注冊的事件)

while(isOpen.get())
{

int eventCount = selector.select(5000);
handledTasks = performRegisterHandlerTasks();//處理事件
handledTasks += performKeyUpdateTasks();

if (eventCount > 0)
{
handleReadWriteKeys();//處理讀寫,調(diào)用我們自己定義的hander來處理onData等事件
}
handledTasks += performDeregisterHandlerTasks();

}
(4)客戶端
NonBlockingConnection,例如:
new NonBlockingConnection("localhost", 8090,new MyHandler() )
構(gòu)造方法的主要代碼:
.
SocketChannel channel = openSocket(localAddress, options);//實際調(diào)用:SocketChannel channel = SocketChannel.open();
IoConnector connector = getDefaultConnector();
IIoConnectorCallback callback = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);

}
建立連接,生成IoConnector用來管理連接,然后connector開始啟動,做一些初始化的工作:
其中connector.connectAsync(…方法會執(zhí)行會產(chǎn)生一個RegisterTask任務到IoConnector,這個RegisterTask做的事情如下:
selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注冊SelectionKey.OP_CONNECT
當IoConnector運行會執(zhí)行這個任務:
看下他的run方法主要代碼:

while(isOpen.get())
{

handledTasks = performTaskQueue();//首先運行上一步創(chuàng)建的RegisterTask注冊SelectionKey.OP_CONNECT
int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已經(jīng)準備好

if (eventCount > 0)
{
handleConnect();//如果準備好就處理連接事件

} else
{
checkForLooping(handledTasks);
}

}
handleConnect()的代碼:

private void handleConnect()
{
Set<SelectionKey> selectedEventKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectedEventKeys.iterator();

while (it.hasNext())
{
SelectionKey eventKey = it.next();
it.remove();

RegisterTask registerTask = (RegisterTask) eventKey.attachment();


if (eventKey.isValid() && eventKey.isConnectable())
{

try
{
boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已經(jīng)通訊連接

if (isConnected)
{
eventKey.cancel();
registerTask.callback.onConnectionEstablished();//連接建立好就做下一步工作,注冊read事件。
}

}
}
}


接著看registerTask.callback.onConnectionEstablished()
主要初始化iohander并注冊read事件

private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException
{
this.ioHandler = ioHandler;
ioHandler.init(handlerCallback);//這個方法里面注冊了read
isConnected.set(true);//這個時候通訊連接才真正建立起來了
}
繼續(xù)看ioHandler.init(handlerCallback)方法:

public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException
{

dispatcher.register(this, SelectionKey.OP_READ);//注冊SelectionKey.OP_READ時間,可以接受服務端的消息了
}
服務端和客戶端就可以互相通信了。本文大致講解了xsocket的代碼的流程,其中講解有誤的地方請兄弟們指出,多謝!