JDK7的NIO2特性或許是我最期待的,我一直想基于它寫一個高性能的Java Http Server.現(xiàn)在這個想法終于可以實施了。
本人基于目前最新的JDK7 b76開發(fā)了一個HTTP Server性能確實不錯。
在windows平臺上NIO2采用AccpetEx來異步接受連接,并且讀寫全都關(guān)聯(lián)到IOCP完成端口。不僅如此,為了方便開發(fā)者使用,連IOCP工作線程都封裝好了,你只要提供線程池就OK。
但是要注意,IOCP工作線程的線程池必須是 Fix的,因為你發(fā)出的讀寫請求都關(guān)聯(lián)到相應(yīng)的線程上,如果線程死了,那讀寫完成情況是不知道的。
作為一個Http Server,傳送文件是必不可少的功能,那一般文件的傳送都是要把程序里的buffer拷貝到內(nèi)核的buffer,由內(nèi)核發(fā)送出去的。windows平臺上為這種情況提供了很好的解決方案,使用TransmitFile接口
BOOL TransmitFile(
SOCKET hSocket,
HANDLE hFile,
DWORD nNumberOfBytesToWrite,
DWORD nNumberOfBytesPerSend,
LPOVERLAPPED lpOverlapped,
LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers,
DWORD dwFlags
);
你只要把文件句柄發(fā)送給內(nèi)核就行了,內(nèi)核幫你搞定其余的,真正做到Zero-Copy.
但是很不幸,NIO2里AsynchronousSocketChannel沒有提供這樣的支持。而為HTTP Server的性能考量,本人只好自己增加這個支持。
要無縫支持,這個必須得表現(xiàn)的跟 Read /Write一樣,有完成的通知,通知傳送多少數(shù)據(jù),等等。
仔細(xì)讀完sun的IOCP實現(xiàn)以后發(fā)現(xiàn)這部分工作他們封裝得很好,基本只要往他們的框架里加?xùn)|西就好了。
為了能訪問他們的框架代碼,我定義自己的TransmitFile支持類在sun.nio.ch包里,以獲得最大的權(quán)限。
package sun.nio.ch;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.WritePendingException;
import java.util.concurrent.Future;
/**
* @author Yvon
*
*/
public class WindowsTransmitFileSupport {
//Sun's NIO2 channel implementation class
private WindowsAsynchronousSocketChannelImpl channel;
//nio2 framework core data structure
PendingIoCache ioCache;
//some field retrieve from sun channel implementation class
private Object writeLock;
private Field writingF;
private Field writeShutdownF;
private Field writeKilledF; // f
WindowsTransmitFileSupport()
{
//dummy one for JNI code
}
/**
*
*/
public WindowsTransmitFileSupport(
AsynchronousSocketChannel
channel) {
this.channel = (WindowsAsynchronousSocketChannelImpl)channel;
try {
// Initialize the fields
Field f = WindowsAsynchronousSocketChannelImpl.class
.getDeclaredField("ioCache");
f.setAccessible(true);
ioCache = (PendingIoCache) f.get(channel);
f = AsynchronousSocketChannelImpl.class
.getDeclaredField("writeLock");
f.setAccessible(true);
writeLock = f.get(channel);
writingF = AsynchronousSocketChannelImpl.class
.getDeclaredField("writing");
writingF.setAccessible(true);
writeShutdownF = AsynchronousSocketChannelImpl.class
.getDeclaredField("writeShutdown");
writeShutdownF.setAccessible(true);
writeKilledF = AsynchronousSocketChannelImpl.class
.getDeclaredField("writeKilled");
writeKilledF.setAccessible(true);
} catch (NoSuchFieldException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Implements the task to initiate a write and the handler to consume the
* result when the send file completes.
*/
private class SendFileTask<V, A> implements Runnable, Iocp.ResultHandler {
private final PendingFuture<V, A> result;
private final long file;//file is windows file HANDLE
SendFileTask(long file, PendingFuture<V, A> result) {
this.result = result;
this.file = file;
}
@Override
// @SuppressWarnings("unchecked")
public void run() {
long overlapped = 0L;
boolean pending = false;
boolean shutdown = false;
try {
channel.begin();
// get an OVERLAPPED structure (from the cache or allocate)
overlapped = ioCache.add(result);
int n = transmitFile0(channel.handle, file, overlapped);
if (n == IOStatus.UNAVAILABLE) {
// I/O is pending
pending = true;
return;
}
if (n == IOStatus.EOF) {
// special case for shutdown output
shutdown = true;
throw new ClosedChannelException();
}
// write completed immediately
throw new InternalError("Write completed immediately");
} catch (Throwable x) {
// write failed. Enable writing before releasing waiters.
channel.enableWriting();
if (!shutdown && (x instanceof ClosedChannelException))
x = new AsynchronousCloseException();
if (!(x instanceof IOException))
x = new IOException(x);
result.setFailure(x);
} finally {
// release resources if I/O not pending
if (!pending) {
if (overlapped != 0L)
ioCache.remove(overlapped);
}
channel.end();
}
// invoke completion handler
Invoker.invoke(result);
}
/**
* Executed when the I/O has completed
*/
@Override
@SuppressWarnings("unchecked")
public void completed(int bytesTransferred, boolean canInvokeDirect) {
// release waiters if not already released by timeout
synchronized (result) {
if (result.isDone())
return;
channel.enableWriting();
result.setResult((V) Integer.valueOf(bytesTransferred));
}
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
@Override
public void failed(int error, IOException x) {
// return direct buffer to cache if substituted
// release waiters if not already released by timeout
if (!channel.isOpen())
x = new AsynchronousCloseException();
synchronized (result) {
if (result.isDone())
return;
channel.enableWriting();
result.setFailure(x);
}
Invoker.invoke(result);
}
}
public <V extends Number, A> Future<V> sendFile(long file, A att,
CompletionHandler<V, ? super A> handler) {
boolean closed = false;
if (channel.isOpen()) {
if (channel.remoteAddress == null)
throw new NotYetConnectedException();
// check and update state
synchronized (writeLock) {
try{
if (writeKilledF.getBoolean(channel))
throw new IllegalStateException(
"Writing not allowed due to timeout or cancellation");
if (writingF.getBoolean(channel))
throw new WritePendingException();
if (writeShutdownF.getBoolean(channel)) {
closed = true;
} else {
writingF.setBoolean(channel, true);
}
}catch(Exception e)
{
IllegalStateException ise=new IllegalStateException(" catch exception when write");
ise.initCause(e);
throw ise;
}
}
} else {
closed = true;
}
// channel is closed or shutdown for write
if (closed) {
Throwable e = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(e);
Invoker.invoke(channel, handler, att, null, e);
return null;
}
return implSendFile(file,att,handler);
}
<V extends Number, A> Future<V> implSendFile(long file, A attachment,
CompletionHandler<V, ? super A> handler) {
// setup task
PendingFuture<V, A> result = new PendingFuture<V, A>(channel, handler,
attachment);
SendFileTask<V,A> sendTask=new SendFileTask<V,A>(file,result);
result.setContext(sendTask);
// initiate I/O (can only be done from thread in thread pool)
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
sendTask.run();
} else {
Invoker.invokeOnThreadInThreadPool(channel, sendTask);
}
return result;
}
private native int transmitFile0(long handle, long file,
long overlapped);
}
這個操作跟默認(rèn)實現(xiàn)的里的write操作是很像的,只是最后調(diào)用的本地方法不一樣。。
接下來,我們怎么使用呢,這個類是定義在sun的包里的,直接用的話,會報IllegalAccessError,因為我們的類加載器跟初始化加載器是不一樣的。
解決辦法一個是通過啟動參數(shù)-Xbootclasspath,讓我們的包被初始加載器加載。我個人不喜歡這種辦法,所以就采用JNI來定義我們的windows TransmitFile支持類。
這樣我們的工作算是完成了,注意,發(fā)送文件的時候傳得是文件句柄,這樣做的好處是你可以更好的控制,一般是在發(fā)送前,打開文件句柄,完成后在回調(diào)通知方法里關(guān)閉文件句柄。
有興趣的同學(xué)可以看看我的HTTP server項目:
http://code.google.com/p/jabhttpd/
目前基本功能實現(xiàn)得差不多,做了些簡單的測試,性能比較滿意。這個服務(wù)器不打算支持servlet api,基本是專門給做基于長連接模式通信的定做的。