-
1.源碼包結構

2.測試線程
1.啟動mina-example下echoserver#Main
2.啟動jconsole查看線程:
1.NioSocketAcceptor
Runnable為AbstractPollingIoAcceptor$Acceptor
3.啟動com.game.landon.entrance.EchoClient,連接已啟動的echoserver. ->此時查看線程->多了一個
2.NioProcessor
Runnable為AbstractPollingIoProcessor$Processor
4.兩個線程池來源:
1.AbstractIoService(IoSessionConfig sessionConfig,Executor executor)
該構造函數中判斷
if(executor == null){
this.executor = Executors.newCachedThreadPool();
}
詳見源碼.
2.SimpleIoProcessorPool
其構造函數判斷同上
3.內部基本流程
1.SocketAcceptor acceptor = new NioSocketAcceptor();
1.初始化NioSocketAcceptor線程池 {@link AbstractIoService }
2.初始化NioProcessor線程池 {@link SimpleIoProcessorPool }
3. NioSocketAcceptor#init
初始化Selector:selector = Selector.open();
2.acceptor.bind(new InetSocketAddress(PORT));
1.AbstractPollingIoAcceptor#bindInternal
#startupAcceptor
- >NioSocketAcceptor線程池執行Acceptor這個任務.
//1.startupAcceptor啟動的時候也會判斷是否已存在Acceptor這個任務,如果不存在才會創建.
// 2.Accetpor這個任務結束條件:所有bind的端口unbind->也會將Acceptor引用置為null.
// 3.每個NioSocketAcceptor只有一個selector/且只對應一個Acceptor任務,即只有一個Acceptor線程.所以我們可以說Acceptor就是單線程的.(即便是一個CachedThreadPool)
3.Acceptor#run
1.while(selectable)
1.selector.select
2.#registerHandles
->NioSocketAcceptor#open
設置socket選項并向selector注冊OP_ACCEPT事件
3.#processHandles
1.NioSocketAcceptor#accept->返回NioSocketSession
2.SimpleIoProcessorPool#add
1.根據sessionId將session與pool中的某個NioProcessor綁定 {@link SimpleIoProcessorPool#getProcessor}
2.AbstractPollingIoProcessor#add
3. AbstractPollingIoProcessor#startupProcessor
->NioProcessor線程池執行Processor這個任務
// 從這段代碼看出:
//1.Processor這個任務只會創建一次.即每一個NioProcessor對象最多擁有一個Processor
// 2.每個NioProcessor只會向線程池提交一次Processor任務.而Processor任務是一個無限循環的任務.
// 也可以說,每個Processor就占據著線程池的一個線程.->即每個NioProcessor對象對應線程池中的一個線程
// 3.而session是與某個固定的NioProcessor綁定的(取模)->也就是說每個session的處理都是單線程的.(均在NioProcessor的唯一Processor線程執行)
// 4.public NioSocketAcceptor(int processorCount)構造中可指定processor的數目,其實最終是指定CacheThreadPool中多少用于prossor的線程數目.
//(默認:private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1)
// 5.每個NioProcessor中只有一個selector.

private void startupProcessor()
{
Processor processor = processorRef.get();


if (processor == null)
{
processor = new Processor();


if (processorRef.compareAndSet(null, processor))
{
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}

// Just stop the select() and start it again, so that the processor
// can be activated immediately.
wakeup();
}

/** *//**
* This private class is used to accept incoming connection from
* clients. It's an infinite loop, which can be stopped when all
* the registered handles have been removed (unbound).
*/
private class Acceptor implements Runnable

{
public void run()

{
int nHandles = 0;
lastIdleCheckTime = System.currentTimeMillis();

// Release the lock
lock.release();

while ( selectable )

{
try

{
int selected = select( SELECT_TIMEOUT );

nHandles += registerHandles();

if ( nHandles == 0 )

{
try

{
lock.acquire();

if ( registerQueue.isEmpty() && cancelQueue.isEmpty() )

{
acceptor = null;
break;
}
}
finally

{
lock.release();
}
}

if ( selected > 0 )

{
processReadySessions( selectedHandles() );
}

long currentTime = System.currentTimeMillis();
flushSessions( currentTime );
nHandles -= unregisterHandles();

notifyIdleSessions( currentTime );
}
catch ( ClosedSelectorException cse )

{
// If the selector has been closed, we can exit the loop
break;
}
catch ( Exception e )

{
ExceptionMonitor.getInstance().exceptionCaught( e );

try

{
Thread.sleep( 1000 );
}
catch ( InterruptedException e1 )

{
}
}
}

if ( selectable && isDisposing() )

{
selectable = false;
try

{
destroy();
}
catch ( Exception e )

{
ExceptionMonitor.getInstance().exceptionCaught( e );
}
finally

{
disposalFuture.setValue( true );
}
}
}
}
2.Processor#run
1.for(;;)
1.select(SELECT_TIMEOUT)
2.#handleNewSessions
1. AbstractPollingIoProcessor#addNow
2. AbstractPollingIoProcessor#init
NioProcessor#init->session.getChannel向selector注冊OP_READ事件
3.#updateTrafficmask
4.#process
//process reads/process writes
5.#flush
6.#nofifyIdleSessions

/** *//**
* The main loop. This is the place in charge to poll the Selector, and to
* process the active sessions. It's done in
* - handle the newly created sessions
* -
*/

private class Processor implements Runnable
{

public void run()
{
assert (processorRef.get() == this);

int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();


for (;;)
{

try
{
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
long t0 = System.currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
long t1 = System.currentTimeMillis();
long delta = (t1 - t0);


if ((selected == 0) && !wakeupCalled.get() && (delta < 100))
{
// Last chance : the select() may have been
// interrupted because we have had an closed channel.

if (isBrokenConnection())
{
LOG.warn("Broken connection");

// we can reselect immediately
// set back the flag to false
wakeupCalled.getAndSet(false);

continue;

} else
{
LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
// which causes a closing file descriptor not to be
// considered as available as a selected channel, but
// it stopped the select. The next time we will
// call select(), it will exit immediately for the same
// reason, and do so forever, consuming 100%
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
registerNewSelector();
}

// Set back the flag to false
wakeupCalled.getAndSet(false);

// and continue the loop
continue;
}

// Manage newly created session first
nSessions += handleNewSessions();

updateTrafficMask();

// Now, if we have had some incoming or outgoing events,
// deal with them

if (selected > 0)
{
//LOG.debug("Processing
"); // This log hurts one of the MDCFilter test
process();
}

// Write the pending requests
long currentTime = System.currentTimeMillis();
flush(currentTime);

// And manage removed sessions
nSessions -= removeSessions();

// Last, not least, send Idle events to the idle sessions
notifyIdleSessions(currentTime);

// Get a chance to exit the infinite loop if there are no
// more sessions on this Processor

if (nSessions == 0)
{
processorRef.set(null);


if (newSessions.isEmpty() && isSelectorEmpty())
{
// newSessions.add() precedes startupProcessor
assert (processorRef.get() != this);
break;
}

assert (processorRef.get() != this);


if (!processorRef.compareAndSet(null, this))
{
// startupProcessor won race, so must exit processor
assert (processorRef.get() != this);
break;
}

assert (processorRef.get() == this);
}

// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.

if (isDisposing())
{

for (Iterator<S> i = allSessions(); i.hasNext();)
{
scheduleRemove(i.next());
}

wakeup();
}

} catch (ClosedSelectorException cse)
{
// If the selector has been closed, we can exit the loop
break;

} catch (Throwable t)
{
ExceptionMonitor.getInstance().exceptionCaught(t);


try
{
Thread.sleep(1000);

} catch (InterruptedException e1)
{
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}


try
{

synchronized (disposalLock)
{

if (disposing)
{
doDispose();
}
}

} catch (Throwable t)
{
ExceptionMonitor.getInstance().exceptionCaught(t);

} finally
{
disposalFuture.setValue(true);
}
}
}
4.相關類圖關系
1.IoService關系類圖
2.IoProcessor關系類圖
3.IoSession關系類圖
5.總結:
本篇只是引入篇,著重介紹了mina2內部的兩個acceptor線程池和processor線程池.關于nio相關請看我之前的文章.
http://www.tkk7.com/landon/archive/2013/08/16/402947.html
posted on 2013-11-18 17:24
landon 閱讀(2049)
評論(2) 編輯 收藏 所屬分類:
Sources