JDK1.4提供的無阻塞I/O(NIO)有效解決了多線程服務器存在的線程開銷問題,但在使用上略顯得復雜一些。許多基于
NIO的多線程服務器程序往往直接基于選擇器(Selector)的Reactor模式實現。這種簡單的事件機制對于較復雜的服務器應用,顯然缺乏擴展性
和可維護性,而且缺乏直觀清晰的結構層次。本文將通過一個基于事件回調的NIO多線程服務器的設計,試圖提供一個簡潔、直觀、易于擴展的NIO多線程服務
器模型。
JDK1.4的NIO有效解決了原有流式IO存在的線程開銷的問題,在NIO中使用多線程,主要目的已不是為了應對每個客戶端請求而分配獨立的服務線程,而是通過多線程充分使用用多個CPU的處理能力和處理中的等待時間,達到提高服務能力的目的。
多線程的引入,容易為本來就略顯復雜的NIO代碼進一步降低可讀性和可維護性。引入良好的設計模型,將不僅帶來高性能、高可靠的代碼,也將帶來一個愜意的開發過程。
線程模型
NIO
的選擇器采用了多路復用(Multiplexing)技術,可在一個選擇器上處理多個套接字,通過獲取讀寫通道來進行IO操作。由于網絡帶寬等原因,在通
道的讀、寫操作中是容易出現等待的,所以在讀、寫操作中引入多線程,對性能提高明顯,而且可以提高客戶端的感知服務質量。所以本文的模型將主要通過使用
讀、寫線程池來提高與客戶端的數據交換能力。
如下圖所示,服務端接受客戶端請求后,控制線程將該請求的讀通道交給讀線程池,由讀線程池分配
線程完成對客戶端數據的讀取操作;當讀線程完成讀操作后,將數據返回控制線程,進行服務端的業務處理;完成業務處理后,將需回應給客戶端的數據和寫通道提
交給寫線程池,由寫線程完成向客戶端發送回應數據的操作。
(NIO 多線程服務器模型)

同時整個服務端的流程處理,建立于事件機制上。在 [接受連接->讀->業務處理->寫 ->關閉連接 ]這個過程中,觸發器將觸發相應事件,由事件處理器對相應事件分別響應,完成服務器端的業務處理。
下面我們就來詳細看一下這個模型的各個組成部分。
相關事件定義 在這個模型中,我們定義了一些基本的事件:
(1)
onAccept:當服務端收到客戶端連接請求時,觸發該事件。通過該事件我們可以知道有新的客戶端呼入。該事件可用來控制服務端的負載。例如,服務器可
設定同時只為一定數量客戶端提供服務,當同時請求數超出數量時,可在響應該事件時直接拋出異常,以拒絕新的連接。
(2)onAccepted:當客戶端請求被服務器接受后觸發該事件。該事件表明一個新的客戶端與服務器正式建立連接。
(3)
onRead:當客戶端發來數據,并已被服務器控制線程正確讀取時,觸發該事件。該事件通知各事件處理器可以對客戶端發來的數據進行實際處理了。需要注意
的是,在本模型中,客戶端的數據讀取是由控制線程交由讀線程完成的,事件處理器不需要在該事件中進行專門的讀操作,而只需將控制線程傳來的數據進行直接處
理即可。
(4)onWrite:當客戶端可以開始接受服務端發送數據時觸發該事件,通過該事件,我們可以向客戶端發送回應數據。在本模型中,事件處理器只需要在該事件中設置
(5)onClosed:當客戶端與服務器斷開連接時觸發該事件。
(6)onError:當客戶端與服務器從連接開始到最后斷開連接期間發生錯誤時觸發該事件。通過該事件我們可以知道有什么錯誤發生。
事件回調機制的實現
在這個模型中,事件采用廣播方式,也就是所有在冊的事件處理器都能獲得事件通知。這樣可以將不同性質的業務處理,分別用不同的處理器實現,使每個處理器的業務功能盡可能單一。
如下圖:整個事件模型由監聽器、事件適配器、事件觸發器、事件處理器組成。
(事件模型)

- 監聽器(Serverlistener):這是一個事件接口,定義需監聽的服務器事件,如果您需要定義更多的事件,可在這里進行擴展。
public interface Serverlistener { public void onError(String error);
public void onAccept() throws Exception;
public void onAccepted(Request request) throws Exception;
public void onRead(Request request) throws Exception;
public void onWrite(Request request, Response response) throws Exception;
public void onClosed(Request request) throws Exception; }
|
- 事件適配器(EventAdapter):對Serverlistener接口實現一個適配器(EventAdapter),這樣的好處是最終的事件處理器可以只處理所關心的事件。
public abstract class EventAdapter implements Serverlistener { public EventAdapter() { } public void onError(String error) {} public void onAccept() throws Exception {} public void onAccepted(Request request) throws Exception {} public void onRead(Request request) throws Exception {} public void onWrite(Request request, Response response) throws Exception {} public void onClosed(Request request) throws Exception {} }
|
- 事件觸發器(Notifier):用于在適當的時候通過觸發服務器事件,通知在冊的事件處理器對事件做出響應。觸發器以Singleton模式實現,統一控制整個服務器端的事件,避免造成混亂。
public class Notifier { private static Arraylist listeners = null; private static Notifier instance = null;
private Notifier() { listeners = new Arraylist(); }
/** * 獲取事件觸發器 * @return 返回事件觸發器 */ public static synchronized Notifier getNotifier() { if (instance == null) { instance = new Notifier(); return instance; } else return instance; }
/** * 添加事件監聽器 * @param l 監聽器 */ public void addlistener(Serverlistener l) { synchronized (listeners) { if (!listeners.contains(l)) listeners.add(l); } }
public void fireOnAccept() throws Exception { for (int i = listeners.size() - 1; i >= 0; i--) ( (Serverlistener) listeners.get(i)).onAccept(); }
....// other fire method }
|
- 事件處理器(Handler):繼承事件適配器,對感興趣的事件進行響應處理,實現業務處理。以下是一個簡單的事件處理器實現,它響應onRead事件,在終端打印出從客戶端讀取的數據。
public class ServerHandler extends EventAdapter { public ServerHandler() { }
public void onRead(Request request) throws Exception { System.out.println("Received: " + new String(data)); } }
|
- 事件處理器的注冊。為了能讓事件處理器獲得服務線程的事件通知,事件處理器需在觸發器中注冊。
ServerHandler handler = new ServerHandler(); Notifier.addlistener(handler);
|
實現NIO多線程服務器
NIO多線程服務器主要由主控服務線程、讀線程和寫線程組成。
(線程模型)

- 主控服務線程(Server):主控線程將創建讀、寫線程池,實現監聽、接受客戶端請求,同時將讀、寫通道提交由相應的讀線程(Reader)和寫服務線程(Writer),由讀寫線程分別完成對客戶端數據的讀取和對客戶端的回應操作。
public class Server implements Runnable { ....
private static int MAX_THREADS = 4; public Server(int port) throws Exception { ....
// 創建無阻塞網絡套接 selector = Selector.open(); sschannel = ServerSocketChannel.open(); sschannel.configureBlocking(false); address = new InetSocketAddress(port); ServerSocket ss = sschannel.socket(); ss.bind(address); sschannel.register(selector, SelectionKey.OP_ACCEPT); }
public void run() { System.out.println("Server started ..."); System.out.println("Server listening on port: " + port); // 監聽 while (true) { try { int num = 0; num = selector.select();
if (num > 0) { Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); // 處理IO事件 if ( (key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); notifier.fireOnAccept();
SocketChannel sc = ssc.accept(); sc.configureBlocking(false);
// 觸發接受連接事件 Request request = new Request(sc); notifier.fireOnAccepted(request);
// 注冊讀操作,以進行下一步的讀操作 sc.register(selector, SelectionKey.OP_READ, request); } else if ( (key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ) { Reader.processRequest(key); // 提交讀服務線程讀取客戶端數據 key.cancel(); } else if ( (key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE ) { Writer.processRequest(key); // 提交寫服務線程向客戶端發送回應數據 key.cancel(); } } } else { addRegister(); // 在Selector中注冊新的寫通道 } } catch (Exception e) { notifier.fireOnError("Error occured in Server: " + e.getMessage()); continue; } } } .... }
|
- 讀線程(Reader):使用線程池技術,通過多個線程讀取客戶端數據,以充分利用網絡數據傳輸的時間,提高讀取效率。
public class Reader extends Thread { public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 讀取客戶端數據,并觸發onRead事件 read(key); } catch (Exception e) { continue; } } } .... }
|
- 寫線程(Writer):和讀操作一樣,使用線程池,負責將服務器端的數據發送回客戶端。
public final class Writer extends Thread { public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); }
// 向客戶端發送數據,然后關閉連接,并分別觸發onWrite,onClosed事件 write(key); } catch (Exception e) { continue; } } } .... }
|
具體應用
NIO多線程模型的實現告一段落,現在我們可以暫且將NIO的各個API和煩瑣的調用方法拋于腦后,專心于我們的實際應用中。
我們用一個簡單的TimeServer(時間查詢服務器)來看看該模型能帶來多么簡潔的開發方式。
在
這個TimeServer中,將提供兩種語言(中文、英文)的時間查詢服務。我們將讀取客戶端的查詢命令(GB/EN),并回應相應語言格式的當前時間。
在應答客戶的請求的同時,服務器將進行日志記錄。做為示例,對日志記錄,我們只是簡單地將客戶端的訪問時間和IP地址輸出到服務器的終端上。
- 實現時間查詢服務的事件處理器(TimeHandler):
public class TimeHandler extends EventAdapter { public TimeHandler() { }
public void onWrite(Request request, Response response) throws Exception { String command = new String(request.getDataInput()); String time = null; Date date = new Date();
// 判斷查詢命令 if (command.equals("GB")) { // 中文格式 DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL, DateFormat.FulL, Locale.CHINA); time = cnDate.format(date); } else { // 英文格式 DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL, DateFormat.FulL, Locale.US); time = enDate.format(date); }
response.send(time.getBytes()); } }
|
- 實現日志記錄服務的事件處理器(LogHandler):
public class LogHandler extends EventAdapter { public LogHandler() { }
public void onClosed(Request request) throws Exception { String log = new Date().toString() + " from " + request.getAddress().toString(); System.out.println(log); }
public void onError(String error) { System.out.println("Error: " + error); } }
|
- 啟動程序:
public class Start {
public static void main(String[] args) { try { LogHandler loger = new LogHandler(); TimeHandler timer = new TimeHandler(); Notifier notifier = Notifier.getNotifier(); notifier.addlistener(loger); notifier.addlistener(timer);
System.out.println("Server starting ..."); Server server = new Server(5100); Thread tServer = new Thread(server); tServer.start(); } catch (Exception e) { System.out.println("Server error: " + e.getMessage()); System.exit(-1); } } }
|
小結
通過例子我們可以看到,基于事件回調的NIO多線程服務器模型,提供了清晰直觀的實現方式,可讓開發者從NIO及多線程的技術細節中擺脫出來,集中精力關注具體的業務實現。
posted on 2007-01-19 00:31
苦笑枯 閱讀(451)
評論(0) 編輯 收藏 所屬分類:
Java