Posted on 2010-08-18 18:08
dennis 閱讀(4888)
評論(0) 編輯 收藏 所屬分類:
java
這個問題的由來是有一個朋友報告xmemcached在高并發下會頻繁斷開重連,導致cache不可用,具體請看這個
issue。
我一開始以為是他使用方式上有問題,溝通了半天還是搞不明白。后來聽聞他說他的代碼會中斷運行時間過長的任務,這些任務內部調用了xmemcached跟memcached交互。我才開始懷疑是不是因為中斷引起了連接的關閉。
我們都知道,nio的socket channel都是實現了
java.nio.channels.InterruptibleChannel接口,看看這個接口描述:
A channel that can be asynchronously closed and interrupted.
A channel that implements this interface is asynchronously closeable: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the channel's close method. This will cause the blocked thread to receive an AsynchronousCloseException.
A channel that implements this interface is also
interruptible: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the blocked thread's interrupt method.
This will cause the channel to be closed, the blocked thread to receive a
ClosedByInterruptException, and the blocked thread's interrupt status to be set.
If a thread's interrupt status is already set and it invokes a blocking I/O operation upon a channel then the channel will be closed and the thread will immediately receive a ClosedByInterruptException; its interrupt status will remain set.
意思是說實現了這個接口的channel,首先可以被異步關閉,阻塞的線程拋出AsynchronousCloseException,其次阻塞在該 channel上的線程如果被中斷,會引起channel關閉并拋出ClosedByInterruptException的異常。如果在調用 channel的IO方法之前,線程已經設置了中斷狀態,同樣會引起channel關閉和拋出ClosedByInterruptException。
回到xmemcached的問題,為什么中斷會引起xmemcached關閉連接?難道xmemcached會在用戶線程調用channel的IO operations。答案是肯定的,xmemcached的網絡層實現了一個小優化,當連接里的緩沖隊列為空的情況下,會直接調用 channel.write嘗試發送數據;如果隊列不為空,則放入緩沖隊列,等待Reactor去執行實際的發送工作,這個優化是為了最大化地提高發送效率。這會導致在用戶線程中調用channel.write(緩沖的消息隊列為空的時候),如果這時候用戶線程中斷,就會導致連接斷開,這就是那位朋友反饋的問題的根源。
Netty3的早期版本也有同樣的優化,但是在之后的版本,這個優化被另一個方案替代,寫入消息的時候無論如何都會放入緩沖隊列,但是Netty會判斷當前寫入的線程是不是NioWorker, 如果是的話,就直接flush整個發送隊列做IO寫入,如果不是,則加入發送緩沖區等待NioWorker線程去發送。這個是在NioWorker的 writeFromUserCode方法里實現的:
void writeFromUserCode(final NioSocketChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {
return;
}
// 這里我們可以確認 Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
write0(channel);
}
我估計netty的作者后來也意識到了在用戶線程調用channel的IO操作的危險性。xmemcached這個問題的解決思路也應該跟Netty差不多。但是從我的角度,我希望交給用戶去選擇,如果你確認你的用戶線程沒有調用中斷,那么允許在用戶線程去write可以達到更高的發送效率,更短的響應時間;如果你的用戶線程有調用中斷,那么最好有個選項去設置,發送的消息都將加入緩沖隊列讓Reactor去寫入,有更好的吞吐量,同時避免用戶線程涉及到 IO操作。