從JDK 1.4開始,Java的標(biāo)準(zhǔn)庫(kù)中就包含了NIO,
即所謂的“New
IO”。其中最重要的功能就是提供了“非阻塞”的IO,當(dāng)然包括了Socket。NonBlocking的IO就是對(duì)select(Unix平臺(tái)下)以及
WaitForMultipleObjects(Windows平臺(tái))的封裝,提供了高性能、易伸縮的服務(wù)架構(gòu)。
說(shuō)來(lái)慚愧,直到JDK1.4才有這種功能,但遲到者不一定沒(méi)有螃蟹吃,NIO就提供了優(yōu)秀的面向?qū)ο蟮慕鉀Q方案,可以很方便地編寫高性能的服務(wù)器。
話說(shuō)回來(lái),傳統(tǒng)的Server/Client實(shí)現(xiàn)是基于Thread per request,即服務(wù)器為每個(gè)客戶端請(qǐng)求建立一個(gè)線程處理,單獨(dú)負(fù)責(zé)處理一個(gè)客戶的請(qǐng)求。比如像Tomcat(新版本也會(huì)提供NIO方案)、Resin等Web服務(wù)器就是這樣實(shí)現(xiàn)的。當(dāng)然為了減少瞬間峰值問(wèn)題,服務(wù)器一般都使用線程池,規(guī)定了同時(shí)并發(fā)的最大數(shù)量,避免了線程的無(wú)限增長(zhǎng)。
但這樣有一個(gè)問(wèn)題:如果線程池的大小為100,當(dāng)有100個(gè)用戶同時(shí)通過(guò)HTTP現(xiàn)在一個(gè)大文件時(shí),服務(wù)器的線程池會(huì)用完,因?yàn)樗械木€程都在傳輸大文件了,即使第101個(gè)請(qǐng)求者僅僅請(qǐng)求一個(gè)只有10字節(jié)的頁(yè)面,服務(wù)器也無(wú)法響應(yīng)了,只有等到線程池中有空閑的線程出現(xiàn)。
另外,線程的開銷也是很大的,特別是達(dá)到了一個(gè)臨界值后,性能會(huì)顯著下降,這也限制了傳統(tǒng)的Socket方案無(wú)法應(yīng)對(duì)并發(fā)量大的場(chǎng)合,而“非阻塞”的IO就能輕松解決這個(gè)問(wèn)題。
下面只是一個(gè)簡(jiǎn)單的例子:服務(wù)器提供了下載大型文件的功能,客戶端連接上服務(wù)器的12345端口后,就可以讀取服務(wù)器發(fā)送的文件內(nèi)容信息了。注意這里的服務(wù)器只有一個(gè)主線程,沒(méi)有其他任何派生線程,讓我們看看NIO是如何用一個(gè)線程處理N個(gè)請(qǐng)求的。
NIO服務(wù)器最核心的一點(diǎn)就是反應(yīng)器模式:當(dāng)有感興趣的事件發(fā)生的,就通知對(duì)
應(yīng)的事件處理器去處理這個(gè)事件,如果沒(méi)有,則不處理。所以使用一個(gè)線程做輪詢就可以了。當(dāng)然這里這是個(gè)例子,如果要獲得更高性能,可以使用少量的線程,一
個(gè)負(fù)責(zé)接收請(qǐng)求,其他的負(fù)責(zé)處理請(qǐng)求,特別是對(duì)于多CPU時(shí)效率會(huì)更高。
關(guān)于使用NIO過(guò)程中出現(xiàn)的問(wèn)題,最為普遍的就是為什么沒(méi)有請(qǐng)求時(shí)CPU的占
用率為100%?出現(xiàn)這種問(wèn)題的主要原因是注冊(cè)了不感興趣的事件,比如如果沒(méi)有數(shù)據(jù)要發(fā)到客戶端,而又注冊(cè)了寫事件(OP_WRITE),則在
Selector.select()上就會(huì)始終有事件出現(xiàn),CPU就一直處理了,而此時(shí)select()應(yīng)該是阻塞的。
另外一個(gè)值得注意的問(wèn)題是:由于只使用了一個(gè)線程(多個(gè)線程也如此)處理用戶請(qǐng)求,所以要避免線程被阻塞,解決方法是事件的處理者必須要即刻返回,不能陷入循環(huán)中,否則會(huì)影響其他用戶的請(qǐng)求速度。
具體到本例子中,由于文件比較大,如果一次性發(fā)送整個(gè)文件(這里的一次性不是
指send整個(gè)文件內(nèi)容,而是通過(guò)while循環(huán)不間斷的發(fā)送分組包),則主線程就會(huì)阻塞,其他用戶就不能響應(yīng)了。這里的解決方法是當(dāng)有WRITE事件
時(shí),僅僅是發(fā)送一個(gè)塊(比如4K字節(jié))。發(fā)完后,繼續(xù)等待WRITE事件出現(xiàn),依次處理,直到整個(gè)文件發(fā)送完畢,這樣就不會(huì)阻塞其他用戶了。
服務(wù)器的例子:
package
nio.file;
import
java.io.FileInputStream;
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.channels.FileChannel;
import
java.nio.channels.Selecti;
import
java.nio.channels.Selector;
import
java.nio.channels.ServerSocketChannel;
import
java.nio.channels.SocketChannel;
import
java.nio.charset.Charset;
import
java.nio.charset.CharsetDecoder;
import
java.util.Iterator;
/**
* 測(cè)試文件下載的NIOServer
*
*
@author
tenyears.cn
*/
public class
NIOServer
{
static
int
BLOCK =
4096
;
// 處理與客戶端的交互
public class
HandleClient
{
protected
FileChannel channel;
protected
ByteBuffer buffer;
public
HandleClient
()
throws
IOException
{
this
.channel =
new
FileInputStream
(
filename
)
.getChannel
()
;
this
.buffer = ByteBuffer.allocate
(
BLOCK
)
;
}
public
ByteBuffer readBlock
() {
try
{
buffer.clear
()
;
int
count = channel.read
(
buffer
)
;
buffer.flip
()
;
if
(
count <=
0
)
return null
;
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
return
buffer;
}
public
void
close
() {
try
{
channel.close
()
;
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
}
protected
Selector selector;
protected
String filename =
"d:\\bigfile.dat"
;
// a big file
protected
ByteBuffer clientBuffer = ByteBuffer.allocate
(
BLOCK
)
;
protected
CharsetDecoder decoder;
public
NIOServer
(
int
port
)
throws
IOException
{
selector =
this
.getSelector
(
port
)
;
Charset charset = Charset.forName
(
"GB2312"
)
;
decoder = charset.newDecoder
()
;
}
// 獲取Selector
protected
Selector getSelector
(
int
port
)
throws
IOException
{
ServerSocketChannel server = ServerSocketChannel.open
()
;
Selector sel = Selector.open
()
;
server.socket
()
.bind
(
new
InetSocketAddress
(
port
))
;
server.configureBlocking
(
false
)
;
server.register
(
sel, Selecti.OP_ACCEPT
)
;
return
sel;
}
// 監(jiān)聽端口
public
void
listen
() {
try
{
for
(
;;
) {
selector.select
()
;
Iterator iter = selector.selectedKeys
()
.iterator
()
;
while
(
iter.hasNext
()) {
Selecti key = iter.next
()
;
iter.remove
()
;
handleKey
(
key
)
;
}
}
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
// 處理事件
protected
void
handleKey
(
Selecti key
)
throws
IOException
{
if
(
key.isAcceptable
()) {
// 接收請(qǐng)求
ServerSocketChannel server =
(
ServerSocketChannel
)
key.channel
()
;
SocketChannel channel = server.accept
()
;
channel.configureBlocking
(
false
)
;
channel.register
(
selector, Selecti.OP_READ
)
;
}
else if
(
key.isReadable
()) {
// 讀信息
SocketChannel channel =
(
SocketChannel
)
key.channel
()
;
int
count = channel.read
(
clientBuffer
)
;
if
(
count >
0
) {
clientBuffer.flip
()
;
CharBuffer charBuffer = decoder.decode
(
clientBuffer
)
;
System.out.println
(
"Client >>"
+ charBuffer.toString
())
;
Selecti wKey = channel.register
(
selector,
Selecti.OP_WRITE
)
;
wKey.attach
(
new
HandleClient
())
;
}
else
channel.close
()
;
clientBuffer.clear
()
;
}
else if
(
key.isWritable
()) {
// 寫事件
SocketChannel channel =
(
SocketChannel
)
key.channel
()
;
HandleClient handle =
(
HandleClient
)
key.attachment
()
;
ByteBuffer block = handle.readBlock
()
;
if
(
block !=
null
)
channel.write
(
block
)
;
else
{
handle.close
()
;
channel.close
()
;
}
}
}
public static
void
main
(
String
[]
args
) {
int
port =
12345
;
try
{
NIOServer server =
new
NIOServer
(
port
)
;
System.out.println
(
"Listernint on "
+ port
)
;
while
(
true
) {
server.listen
()
;
}
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
}
|
該代碼中,通過(guò)一個(gè)HandleClient來(lái)獲取文件的一塊數(shù)據(jù),每一個(gè)客戶都會(huì)分配一個(gè)HandleClient的實(shí)例。
下面是客戶端請(qǐng)求的代碼,也比較簡(jiǎn)單,模擬100個(gè)用戶同時(shí)下載文件。
package
nio.file;
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.channels.Selecti;
import
java.nio.channels.Selector;
import
java.nio.channels.SocketChannel;
import
java.nio.charset.Charset;
import
java.nio.charset.CharsetEncoder;
import
java.util.Iterator;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
/**
* 文件下載客戶端
*
@author
tenyears.cn
*/
public class
NIOClient
{
static
int
SIZE =
100
;
static
InetSocketAddress ip =
new
InetSocketAddress
(
"localhost"
,
12345
)
;
static
CharsetEncoder encoder = Charset.forName
(
"GB2312"
)
.newEncoder
()
;
static class
Download
implements
Runnable
{
protected
int
index;
public
Download
(
int
index
) {
this
.index = index;
}
public
void
run
() {
try
{
long
start = System.currentTimeMillis
()
;
SocketChannel client = SocketChannel.open
()
;
client.configureBlocking
(
false
)
;
Selector selector = Selector.open
()
;
client.register
(
selector, Selecti.OP_CONNECT
)
;
client.connect
(
ip
)
;
ByteBuffer buffer = ByteBuffer.allocate
(
8
*
1024
)
;
int
total =
0
;
FOR:
for
(
;;
) {
selector.select
()
;
Iterator iter = selector.selectedKeys
()
.iterator
()
;
while
(
iter.hasNext
()) {
Selecti key = iter.next
()
;
iter.remove
()
;
if
(
key.isConnectable
()) {
SocketChannel channel =
(
SocketChannel
)
key
.channel
()
;
if
(
channel.isConnectionPending
())
channel.finishConnect
()
;
channel.write
(
encoder.encode
(
CharBuffer
.wrap
(
"Hello from "
+ index
)))
;
channel.register
(
selector, Selecti.OP_READ
)
;
}
else if
(
key.isReadable
()) {
SocketChannel channel =
(
SocketChannel
)
key
.channel
()
;
int
count = channel.read
(
buffer
)
;
if
(
count >
0
) {
total += count;
buffer.clear
()
;
}
else
{
client.close
()
;
break
FOR;
}
}
}
}
double
last =
(
System.currentTimeMillis
()
- start
)
*
1.0
/
1000
;
System.out.println
(
"Thread "
+ index +
" downloaded "
+ total
+
"bytes in "
+ last +
"s."
)
;
}
catch
(
IOException e
) {
e.printStackTrace
()
;
}
}
}
public static
void
main
(
String
[]
args
)
throws
IOException
{
ExecutorService exec = Executors.newFixedThreadPool
(
SIZE
)
;
for
(
int
index =
0
; index < SIZE; index++
) {
exec.execute
(
new
Download
(
index
))
;
}
exec.shutdown
()
;
}
}
posted on 2007-01-19 00:03
苦笑枯 閱讀(469)
評(píng)論(0) 編輯 收藏 所屬分類:
Java