隨著并發(fā)數(shù)量的提高,傳統(tǒng)nio框架采用一個(gè)Selector來(lái)支撐大量連接事件的管理和觸發(fā)已經(jīng)遇到瓶頸,因此現(xiàn)在各種nio框架的新版本都采用多個(gè)Selector并存的結(jié)構(gòu),由多個(gè)Selector均衡地去管理大量連接。這里以Mina和Grizzly的實(shí)現(xiàn)為例。
在Mina 2.0中,Selector的管理是由org.apache.mina.transport.socket.nio.NioProcessor來(lái)處理,每個(gè)NioProcessor對(duì)象保存一個(gè)Selector,負(fù)責(zé)具體的select、wakeup、channel的注冊(cè)和取消、讀寫事件的注冊(cè)和判斷、實(shí)際的IO讀寫操作等等,核心代碼如下:
public NioProcessor(Executor executor) {
super(executor);
try {
// Open a new selector
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeIoException("Failed to open a selector.", e);
}
}
protected int select(long timeout) throws Exception {
return selector.select(timeout);
}
protected boolean isInterestedInRead(NioSession session) {
SelectionKey key = session.getSelectionKey();
return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
}
protected boolean isInterestedInWrite(NioSession session) {
SelectionKey key = session.getSelectionKey();
return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
}
protected int read(NioSession session, IoBuffer buf) throws Exception {
return session.getChannel().read(buf.buf());
}
protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
if (buf.remaining() <= length) {
return session.getChannel().write(buf.buf());
} else {
int oldLimit = buf.limit();
buf.limit(buf.position() + length);
try {
return session.getChannel().write(buf.buf());
} finally {
buf.limit(oldLimit);
}
}
}
這些方法的調(diào)用都是通過(guò)AbstractPollingIoProcessor來(lái)處理,這個(gè)類里可以看到一個(gè)nio框架的核心邏輯,注冊(cè)、select、派發(fā),具體因?yàn)榕c本文主題不合,不再展開(kāi)。NioProcessor的初始化是在NioSocketAcceptor的構(gòu)造方法中調(diào)用的:
public NioSocketAcceptor() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
直接調(diào)用了父類AbstractPollingIoAcceptor的構(gòu)造函數(shù),在其中我們可以看到,默認(rèn)是啟動(dòng)了一個(gè)SimpleIoProcessorPool來(lái)包裝NioProcessor:
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
Class<? extends IoProcessor<T>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
true);
}
這里其實(shí)是一個(gè)組合模式,SimpleIoProcessorPool和NioProcessor都實(shí)現(xiàn)了Processor接口,一個(gè)是組合形成的Processor池,而另一個(gè)是單獨(dú)的類。調(diào)用的SimpleIoProcessorPool的構(gòu)造函數(shù)是這樣:
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
this(processorType, null, DEFAULT_SIZE);
}
可以看到,默認(rèn)的池大小是cpu個(gè)數(shù)+1,也就是創(chuàng)建了cpu+1個(gè)的Selector對(duì)象。它的重載構(gòu)造函數(shù)里是創(chuàng)建了一個(gè)數(shù)組,啟動(dòng)一個(gè)CachedThreadPool來(lái)運(yùn)行NioProcessor,通過(guò)反射創(chuàng)建具體的Processor對(duì)象,這里就不再列出了。
Mina當(dāng)有一個(gè)新連接建立的時(shí)候,就創(chuàng)建一個(gè)NioSocketSession,并且傳入上面的SimpleIoProcessorPool,當(dāng)連接初始化的時(shí)候?qū)ession加入SimpleIoProcessorPool:
protected NioSession accept(IoProcessor<NioSession> processor,
ServerSocketChannel handle) throws Exception {
SelectionKey key = handle.keyFor(selector);
if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) {
return null;
}
// accept the connection from the client
SocketChannel ch = handle.accept();
if (ch == null) {
return null;
}
return new NioSocketSession(this, processor, ch);
}
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
T session = accept(processor, handle);
if (session == null) {
break;
}
initSession(session, null, null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
加入的操作是遞增一個(gè)整型變量并且模數(shù)組大小后對(duì)應(yīng)的NioProcessor注冊(cè)到session里:
private IoProcessor<T> nextProcessor() {
checkDisposal();
return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
}
if (p == null) {
p = nextProcessor();
IoProcessor<T> oldp =
(IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
if (oldp != null) {
p = oldp;
}
}
這樣一來(lái),每個(gè)連接都關(guān)聯(lián)一個(gè)NioProcessor,也就是關(guān)聯(lián)一個(gè)Selector對(duì)象,避免了所有連接共用一個(gè)Selector負(fù)載過(guò)高導(dǎo)致server響應(yīng)變慢的后果。但是注意到NioSocketAcceptor也有一個(gè)Selector,這個(gè)Selector用來(lái)干什么的呢?那就是集中處理OP_ACCEPT事件的Selector,主要用于連接的接入,不跟處理讀寫事件的Selector混在一起,因此Mina的默認(rèn)open的Selector是cpu+2個(gè)。
看完mina2.0之后,我們來(lái)看看Grizzly2.0是怎么處理的,Grizzly還是比較保守,它默認(rèn)就是啟動(dòng)兩個(gè)Selector,其中一個(gè)專門負(fù)責(zé)accept,另一個(gè)負(fù)責(zé)連接的IO讀寫事件的管理。Grizzly 2.0中Selector的管理是通過(guò)SelectorRunner類,這個(gè)類封裝了Selector對(duì)象以及核心的分發(fā)注冊(cè)邏輯,你可以將他理解成Mina中的NioProcessor,核心的代碼如下:
protected boolean doSelect() {
selectorHandler = transport.getSelectorHandler();
selectionKeyHandler = transport.getSelectionKeyHandler();
strategy = transport.getStrategy();
try {
if (isResume) {
// If resume SelectorRunner - finish postponed keys
isResume = false;
if (keyReadyOps != 0) {
if (!iterateKeyEvents()) return false;
}
if (!iterateKeys()) return false;
}
lastSelectedKeysCount = 0;
selectorHandler.preSelect(this);
readyKeys = selectorHandler.select(this);
if (stateHolder.getState(false) == State.STOPPING) return false;
lastSelectedKeysCount = readyKeys.size();
if (lastSelectedKeysCount != 0) {
iterator = readyKeys.iterator();
if (!iterateKeys()) return false;
}
selectorHandler.postSelect(this);
} catch (ClosedSelectorException e) {
notifyConnectionException(key,
"Selector was unexpectedly closed", e,
Severity.TRANSPORT, Level.SEVERE, Level.FINE);
} catch (Exception e) {
notifyConnectionException(key,
"doSelect exception", e,
Severity.UNKNOWN, Level.SEVERE, Level.FINE);
} catch (Throwable t) {
logger.log(Level.SEVERE,"doSelect exception", t);
transport.notifyException(Severity.FATAL, t);
}
return true;
}
基本上是一個(gè)reactor實(shí)現(xiàn)的樣子,在AbstractNIOTransport類維護(hù)了一個(gè)SelectorRunner的數(shù)組,而Grizzly用于創(chuàng)建tcp server的類TCPNIOTransport正是繼承于AbstractNIOTransport類,在它的start方法中調(diào)用了startSelectorRunners來(lái)創(chuàng)建并啟動(dòng)SelectorRunner數(shù)組:
private static final int DEFAULT_SELECTOR_RUNNERS_COUNT = 2;
@Override
public void start() throws IOException {
if (selectorRunnersCount <= 0) {
selectorRunnersCount = DEFAULT_SELECTOR_RUNNERS_COUNT;
}
startSelectorRunners();
}
protected void startSelectorRunners() throws IOException {
selectorRunners = new SelectorRunner[selectorRunnersCount];
synchronized(selectorRunners) {
for (int i = 0; i < selectorRunnersCount; i++) {
SelectorRunner runner =
new SelectorRunner(this, SelectorFactory.instance().create());
runner.start();
selectorRunners[i] = runner;
}
}
}
可見(jiàn)Grizzly并沒(méi)有采用一個(gè)單獨(dú)的池對(duì)象來(lái)管理SelectorRunner,而是直接采用數(shù)組管理,默認(rèn)數(shù)組大小是2。SelectorRunner實(shí)現(xiàn)了Runnable接口,它的start方法調(diào)用了一個(gè)線程池來(lái)運(yùn)行自身。剛才我提到了說(shuō)Grizzly的Accept是單獨(dú)一個(gè)Selector來(lái)管理的,那么是如何表現(xiàn)的呢?答案在RoundRobinConnectionDistributor類,這個(gè)類是用于派發(fā)注冊(cè)事件到相應(yīng)的SelectorRunner上,它的派發(fā)方式是這樣:
public Future<RegisterChannelResult> registerChannelAsync(
SelectableChannel channel, int interestOps, Object attachment,
CompletionHandler completionHandler)
throws IOException {
SelectorRunner runner = getSelectorRunner(interestOps);
return transport.getSelectorHandler().registerChannelAsync(
runner, channel, interestOps, attachment, completionHandler);
}
private SelectorRunner getSelectorRunner(int interestOps) {
SelectorRunner[] runners = getTransportSelectorRunners();
int index;
if (interestOps == SelectionKey.OP_ACCEPT || runners.length == 1) {
index = 0;
} else {
index = (counter.incrementAndGet() % (runners.length - 1)) + 1;
}
return runners[index];
}
getSelectorRunner這個(gè)方法道出了秘密,如果是OP_ACCEPT,那么都使用數(shù)組中的第一個(gè)SelectorRunner,如果不是,那么就通過(guò)取模運(yùn)算的結(jié)果+1從后面的SelectorRunner中取一個(gè)來(lái)注冊(cè)。
分析完mina2.0和grizzly2.0對(duì)Selector的管理后我們可以得到幾個(gè)啟示:
1、在處理大量連接的情況下,多個(gè)Selector比單個(gè)Selector好
2、多個(gè)Selector的情況下,處理OP_READ和OP_WRITE的Selector要與處理OP_ACCEPT的Selector分離,也就是說(shuō)處理接入應(yīng)該要一個(gè)單獨(dú)的Selector對(duì)象來(lái)處理,避免IO讀寫事件影響接入速度。
3、Selector的數(shù)目問(wèn)題,mina默認(rèn)是cpu+2,而grizzly總共就2個(gè),我更傾向于mina的策略,但是我認(rèn)為應(yīng)該對(duì)cpu個(gè)數(shù)做一個(gè)判斷,如果CPU個(gè)數(shù)超過(guò)8個(gè),那么更多的Selector線程可能帶來(lái)比較大的線程切換的開(kāi)銷,mina默認(rèn)的策略并非合適,幸好可以設(shè)置這個(gè)數(shù)值。