Posted on 2011-10-19 17:34
瘋狂 閱讀(6324)
評(píng)論(3) 編輯 收藏 所屬分類:
網(wǎng)絡(luò)通訊 、
讀代碼
關(guān)于xsocket可見(jiàn)于 我的另外一篇文章http://www.tkk7.com/freeman1984/archive/2011/04/25/302706.html,或者查看官網(wǎng)http://xsocket.org/
當(dāng)然閱讀xsocket需要一些線程,nio,niosocket,和java.util.concurrent(鎖,線程池等)包的一些知識(shí)。要不讀起來(lái)很費(fèi)勁,建議先去了解下這些知識(shí)。可以在我的文章分類concurrent里面有一些,其他去網(wǎng)上找找。
本文只讀了一個(gè)主要的流程,對(duì)于一些其他的代碼例如:ssl相關(guān),讀數(shù)據(jù)相關(guān)沒(méi)有涉及,看有時(shí)間能補(bǔ)上。
首先xsocket的幾個(gè)關(guān)鍵的類
Server: 服務(wù)器端初始化線程池創(chuàng)建IoAcceptor
IoAcceptor:采用while循環(huán)接收客戶端連接,并創(chuàng)建IoSocketDispatcher和IoChainableHandler
IoSocketDispatcher:負(fù)責(zé)注冊(cè)SelectionKey以及事件的分發(fā),并交給IoChainableHandler處理,通過(guò)一個(gè)while循環(huán)來(lái)處理注冊(cè)的SelectionKey事件。
IHandler:事件處理,數(shù)據(jù)的讀寫等等。
INonBlockingConnection客戶端接口。
(1)首先看下Server創(chuàng)建:
構(gòu)造方法
protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize)
這個(gè)方法主要是初始化線程池,構(gòu)件acceptor
defaultWorkerPool = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
workerpool = defaultWorkerPool;

if (sslContext != null)
{//是否使用ssl
acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);

} else
{
acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
}
其中線程池:使用jdk1.5以后的ThreadPoolExecutor,線程池最小默認(rèn)2,最大100,QUEUE的大小默認(rèn)也是100
線程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));
(2)構(gòu)件acceptor細(xì)節(jié),最后server啟動(dòng)的時(shí)候會(huì)啟動(dòng)acceptor監(jiān)聽(tīng)客戶端

public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException
{
.
serverChannel = ServerSocketChannel.open();

serverChannel.configureBlocking(true);
serverChannel.socket().setSoTimeout(0); // accept method never times out
serverChannel.socket().setReuseAddress(isReuseAddress);
.
serverChannel.socket().bind(address, backlog);
dispatcherPool = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
.
}

(3)啟動(dòng)server
server.start();
服務(wù)的啟動(dòng)用了一個(gè)單獨(dú)的線程,這里面使用到了CountDownLatch可參見(jiàn)另外一篇關(guān)于CountDownLatch用法的文章:
http://www.tkk7.com/freeman1984/archive/2011/07/04/353654.html
使用CountDownLatch來(lái)控制server的啟動(dòng)時(shí)間,操作多少時(shí)間為啟動(dòng)就,默認(rèn)是60秒,這里就不講CountDownLatch的代碼了
整個(gè)啟動(dòng)的方法如下,能看懂CountDownLatch的用法基本上就理解了。

public static void start(IServer server, int timeoutSec) throws SocketTimeoutException
{

// start server within a dedicated thread
Thread t = new Thread(server);
t.setName("xServer");
t.start();//請(qǐng)看下面的run方法分析

}
看看他的server線程的run方法:

public void run()
{

acceptor.listen();//啟動(dòng)前面創(chuàng)建的acceptor開(kāi)始監(jiān)聽(tīng)客戶端連接

}
接著查看listen()方法:

public void listen() throws IOException
{
callback.onConnected();//通知server已經(jīng)啟動(dòng)
accept();//接受客戶端連接
}
}
查看 accept();很明了,使用一個(gè)while循環(huán)監(jiān)聽(tīng)客戶端連接,并建立可客戶端相關(guān)的處理類:

while (isOpen.get())
{

// blocking accept call
SocketChannel channel = serverChannel.accept();

// create IoSocketHandler
//創(chuàng)建事件分發(fā)器
IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
//創(chuàng)建io處理
IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
// notify call back
callback.onConnectionAccepted(ioHandler);//很關(guān)鍵的一個(gè)地方,會(huì)注冊(cè)SelectionKey.OP_READ,此時(shí)客戶端發(fā)來(lái)的消息就可以北服務(wù)端獲取
}

查看callback.onConnectionAccepted(ioHandler);
此方法會(huì)初始化server端的ioHandler,查看初始化的代碼:
dispatcher.register(this, SelectionKey.OP_READ);首先會(huì)注冊(cè)read選擇器,
如果有read事件發(fā)生dispatcher就會(huì)處理。看看dispatcher的run方法(通過(guò)一個(gè)循環(huán)來(lái)不停的處理已經(jīng)注冊(cè)的事件)

while(isOpen.get())
{

int eventCount = selector.select(5000);
handledTasks = performRegisterHandlerTasks();//處理事件
handledTasks += performKeyUpdateTasks();

if (eventCount > 0)
{
handleReadWriteKeys();//處理讀寫,調(diào)用我們自己定義的hander來(lái)處理onData等事件
}
handledTasks += performDeregisterHandlerTasks();

}
(4)客戶端
NonBlockingConnection,例如:
new NonBlockingConnection("localhost", 8090,new MyHandler() )
構(gòu)造方法的主要代碼:
.
SocketChannel channel = openSocket(localAddress, options);//實(shí)際調(diào)用:SocketChannel channel = SocketChannel.open();
IoConnector connector = getDefaultConnector();
IIoConnectorCallback callback = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);

}
建立連接,生成IoConnector用來(lái)管理連接,然后connector開(kāi)始啟動(dòng),做一些初始化的工作:
其中connector.connectAsync(…方法會(huì)執(zhí)行會(huì)產(chǎn)生一個(gè)RegisterTask任務(wù)到IoConnector,這個(gè)RegisterTask做的事情如下:
selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注冊(cè)SelectionKey.OP_CONNECT
當(dāng)IoConnector運(yùn)行會(huì)執(zhí)行這個(gè)任務(wù):
看下他的run方法主要代碼:

while(isOpen.get())
{

handledTasks = performTaskQueue();//首先運(yùn)行上一步創(chuàng)建的RegisterTask注冊(cè)SelectionKey.OP_CONNECT
int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已經(jīng)準(zhǔn)備好

if (eventCount > 0)
{
handleConnect();//如果準(zhǔn)備好就處理連接事件

} else
{
checkForLooping(handledTasks);
}

}
handleConnect()的代碼:

private void handleConnect()
{
Set<SelectionKey> selectedEventKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectedEventKeys.iterator();

while (it.hasNext())
{
SelectionKey eventKey = it.next();
it.remove();

RegisterTask registerTask = (RegisterTask) eventKey.attachment();


if (eventKey.isValid() && eventKey.isConnectable())
{

try
{
boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已經(jīng)通訊連接

if (isConnected)
{
eventKey.cancel();
registerTask.callback.onConnectionEstablished();//連接建立好就做下一步工作,注冊(cè)read事件。
}

}
}
}


接著看registerTask.callback.onConnectionEstablished()
主要初始化iohander并注冊(cè)read事件

private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException
{
this.ioHandler = ioHandler;
ioHandler.init(handlerCallback);//這個(gè)方法里面注冊(cè)了read
isConnected.set(true);//這個(gè)時(shí)候通訊連接才真正建立起來(lái)了
}
繼續(xù)看ioHandler.init(handlerCallback)方法:

public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException
{

dispatcher.register(this, SelectionKey.OP_READ);//注冊(cè)SelectionKey.OP_READ時(shí)間,可以接受服務(wù)端的消息了
}
服務(wù)端和客戶端就可以互相通信了。本文大致講解了xsocket的代碼的流程,其中講解有誤的地方請(qǐng)兄弟們指出,多謝!