Overview
Cindy是一個Java異步I/O框架,提供了一個統一高效的模型,同時支持TCP、UDP以及Pipe,并能夠方便的在異步和同步操作之間進行
切換。目前其實現是基于Java
NIO,并計劃通過JNI來支持各操作系統上本身提供的異步I/O功能,應用可以方便的通過運行期屬性來方便的切換到更為高效的實現上。
為什么不使用Java IO?
Java
IO包采用阻塞模型來處理網絡操作。假設應用調用了read方法讀取來自網絡流上的數據,而數據尚未到達本機,則對read方法的調用將一直等到數據到達
并成功接收后才返回。由于IO包的所有操作會一直阻塞當前線程,為了不影響其他事務的處理,在一般情況下應用總是會用一個獨立線程或線程池來處理這些
I/O操作。
Java
IO包阻塞模型的優點就是非常簡單易用,如果配合上線程池,效率也非常不錯,所以得到了廣泛的應用。但這種簡單模型也有其固有的缺點:擴展性不足。如果應
用只需要進行少量的網絡操作,那么開啟若干個單獨的I/O線程無傷大雅;但是如果是實現一個服務端的應用,需要同時處理成千上萬個網絡連接,采用阻塞模型
的話就要同時開啟上千個線程。雖然現在強勁的服務器能夠負擔起這么多線程,但系統花在線程調度上的時間也會遠遠多于用于處理網絡操作上的時間。
為什么要使用Java NIO?
采用IO包的阻塞模型,如果數據量不大的話,則線程的大部分時間都會浪費在等待上。對于稀缺的服務器資源而言,這是一種極大的浪費。
在Java
1.4中引入的NIO包里,最引人注目的就是提供了非阻塞I/O的實現。和IO包提供的阻塞模型不同的是,對一個非阻塞的連接進行操作,如果此時相應的狀
態還未就緒,則調用會立即返回,而不是等待狀態就緒后才返回。假設應用調用了read方法讀取來自網絡流上的數據,而此刻數據尚未到達本機,則對read
方法的調用將立即返回,并通知應用目前只能讀到0個字節。應用可以根據自身的策略來進行處理,比如讀取其他網絡連接的數據等等,這就使得一個線程管理多個
連接成為可能。
NIO包還提供了Selector機制,將一個非阻塞連接注冊在Selector上,應用就不需去輪詢該連接當前是否可以讀取或寫入數據,在相應狀
態就緒后Selector會通知該連接。由于一個Selector上可以注冊多個非阻塞連接,這樣就使得可以用更少的線程數來管理更多的連接。
為什么選擇Cindy,而不直接使用NIO?
Java
NIO包雖然提供了非阻塞I/O模型,但是直接使用NIO的非阻塞I/O需要成熟的網絡編程經驗,處理眾多底層的網絡異常,以及維護連接狀態,判斷連接超
時等等。對于關注于其業務邏輯的應用而言,這些復雜性都是不必要的。不同Java版本的NIO實現也會有一些Bug,Cindy會巧妙的繞開這些已知的
Bug并完成相應功能。并且NIO本身也在不斷發展中,Java 1.4的NIO包中只實現了TCP/UDP單播/Pipe,Java
5.0中引入的SSLEngine類使得基于非阻塞的流協議(TCP/Pipe)支持SSL/TLS成為可能,在未來的版本中還可能會加入非阻塞多播的實
現。Cindy會關注這些新功能,并將其納入到統一的框架中來。
Cindy雖然目前的實現是基于NIO,但它會不僅僅局限于NIO。等到一些基于操作系統本身實現的AIO(Asynchronous IO)類庫成熟后,它也會加入對這些類庫的支持,通過操作系統本身實現的AIO來提高效率。
如果應用程序只想使用一種高效的模型,而不想關心直接使用NIO所帶來的這些限制,或希望將來無需更改代碼就切換到更高效率的AIO實現上,那么
Cindy會是一個很好的選擇。并且使用Cindy,應用可以在同步和異步之間進行無縫切換,對于大部分操作是異步,可某些特殊操作需要同步的應用而言,
這極大的提高了易用性。
Hello world example
場景:服務端監聽本地的1234端口,打印任何收到的消息到控制臺上;客戶端建立TCP連接到本地的1234端口,發送完"Hello world!",然后斷開連接。
基于Java IO包的客戶端示例
基于Java IO包的阻塞模型的示例,用于對比,不做額外說明。(異常處理代碼略)
Socket socket = new Socket("localhost",1234);
OutputStream os = new BufferedOutputStream(socket.getOutputStream());
os.write("Hello world!".getBytes());
os.close();
socket.close();
基于Java IO包的服務端示例
基于Java IO包的阻塞模型的示例,用于對比,不做額外說明。(異常處理代碼略)
ServerSocket ss = new ServerSocket(1234);
while (true) {
? final Socket socket = ss.accept();
? newThread() {
??? public void run() {
????? BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
????? String line = null;
????? while ((line = br.readLine()) != null) {
??????? System.out.println(line);
????? }
}
}.start();
}
基于Cindy的同步客戶端示例
Session session = SessionFactory.createSession(SessionType.TCP);
session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
session.start().complete();
Packet packet = new DefaultPacket(BufferFactory.wrap("Hello world\!".getBytes()));
session.flush(packet).complete();
session.close().complete();
Step 1: create session
Session是連接的抽象,一個Session代表著一個連接。連接有多種類型,SessionType.TCP代表著TCP連接,SessionType.UDP代表著UDP連接,SessionType.PIPE代表著Pipe連接等等。
在這里通過SessionFactory.createSession(SessionType.TCP)創建了一個TCP連接。如果要創建UDP連接,可以使用類似的代碼:
Session session = SessionFactory.createSession(SessionType.UDP);
連接創建后可以通過返回Session實例的getSessionType方法來得到該連接的類型。
Step 2: set the remote address
TCP需要先設置好要連接的地址才能開始連接,在這里是連接到本機的1234端口。對于UDP來說,這一步設置不是必須的。
Step 3: start session
參數設置完成后就可以通過session.start方法啟動Session了。*和Java
IO中的阻塞同步調用不同,由于Cindy是一個異步I/O框架,在調用session.start()方法并返回后,連接可能還沒有建立成功。*如果我
們想要切換到同步阻塞的方式,比如等連接建立成功后才返回,則可以通過start方法返回的Future對象來進行切換。
Session中所有的異步方法(如start/close/flush/send等)的返回值均為Future對象,該對象代表著所進行的異步操作。我們可以通過三種方式來處理Future對象:
- 阻塞:調用Future.complete(),則該方法會一直等到異步操作完成后才返回;或者可以調用Future.complete(int timeout)來等待異步操作完成,如果在指定時間內操作未完成,則該方法也會返回。值得注意的是操作完成(isCompleted)包括操作成功和操作失敗兩種狀態(isSucceeded)。
- 輪詢:周期性的調用Future.isCompleted()方法來查詢異步操作是否完成。
- 回調:通過Future.addListener()方法加入自定義的FutureListener,在異步操作結束后,Future會觸發FutureListener的futureCompleted事件。對任何已完成的Future對象調用addListener方法會馬上觸發FutureListener的futureCompleted事件。
在該示例中我們采用的是第一種方式——阻塞,等待連接建立完成后才返回。可以通過Future.complete()方法的返回值或者
Session.isStarted()來判斷連接是否建立成功。在這個簡單的示例里假設不會出現任何連接錯誤,所以就不再判斷異常情況了,不過在正式的
應用中應當注意相關的判斷。
Step 4: the Packet/Buffer interface
任何數據要在網絡上傳輸最終都得轉換成字節流,Buffer接口就是字節流的抽象。注意的是這里使用的Buffer是net.sf.cindy.Buffer,而沒有采用java.nio.Buffer,這個設計上的取舍可以參考以后的介紹。
而數據傳輸除了要有內容外,還得有目的地,這樣才能把內容送到正確的地址上。Packet接口就代表著要發送的數據,包括一個Buffer對象和一
個目的地址------SocketAddress。對于流協議(如TCP/Pipe)而言,目的地址在建立連接的時候就已經設置好了,所以發送過程中無
需再進行設置,用默認的null即可;對于消息協議(比如非連接型的UDP),則需要在發送時指定每個包的目的地址。
在這里要將"Hello world!"轉換為Buffer對象,可以通過BufferFactory來完成:BufferFactory.wrap("Hello world!".getBytes()),將一個字節數組包裝成Buffer。
由于是TCP連接,目的地址在建立連接時就已經指定好了,所以可以簡單的構造一個只有Buffer而沒有SocketAddress的Packet對象:new DefaultPacket(buffer)。(DefaultPacket是Packet接口的默認實現)
Step 5: send packet
Session的flush方法和start方法一樣都是異步方法,在調用flush并返回時,數據可能并沒有發送完成,在該示例中仍然采用阻塞方式等待數據發送完成后才返回:session.flush(packet).complete()。
flush和send方法的區別在于:flush方法接受的參數是Packet對象,send方法接受的參數是Object對象,通過send方法發送的對象會被Session所關聯的PacketEncoder轉換為Packet對象再進行發送。
Step 6: close session
發送完成后需要關閉連接,close方法同樣是一個異步方法,在這里等待連接完全關閉后才返回:session.close().complete()。
從這個同步的示例可以看到,雖然Cindy是一個異步I/O框架,但用它來完成同步的I/O操作也是一件非常容易的事情。
基于Cindy的異步客戶端示例
final Session session = SessionFactory.createSession(SessionType.TCP);
session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
Future future = session.start();
future.addListener(new FutureListener() {
public void futureCompleted(Future future) throws Exception {
Packet packet = new DefaultPacket("Hello world!".getBytes());
session.flush(packet).complete();
session.close();
}
});
前面的三行代碼和同步示例沒有區別,只是在第四行代碼中采用了回調的方式來處理Future對象,而不是通過阻塞的方式。當連接建立完成后,FutureListener的futureCompleted事件被觸發,應用可以在該方法中做相應的事件處理。
在發送Packet的過程中,其實也可以采用回調的方式來處理。在這里仍采用同步方法處理,一方面是減少內嵌類的數量,另一方面是示例Cindy可以非常容易的切換同步和異步操作。后文對異步處理會有更詳細的介紹。
基于Cindy的服務端示例
SessionAcceptor acceptor = SessionFactory.createSessionAcceptor(SessionType.TCP);
acceptor.setListenPort(1234);
acceptor.setAcceptorHandler(new SessionAcceptorHandlerAdapter() {
public void sessionAccepted(SessionAcceptor acceptor, Session session) throws Exception {
session.setSessionHandler(new LogSessionHandler());
session.start();
}
});
acceptor.start();
Step 1: create SessionAcceptor
SessionAcceptor代表著連接的服務端,SessionAcceptor和Session是一對多關系,類似于IO包的ServerSocket和Socket。這里創建的是TCP服務端。
Step 2: set listen port
設置服務端的監聽端口,在這里是1234端口;如果要設置監聽地址,則可以通過setListenAddress來進行設置。
Step 3: set acceptor handler
每當SessionAcceptor上有連接建立成功,將會觸發該SessionAcceptor所關聯SessionAcceptorHandler的sessionAccepted事件,應用可以在該事件中為連接上的Session設置一些屬性并開始或關閉連接。
SessionAcceptorHandler接口是處理SessionAcceptor產生的各種事件,SessionHandler接口則用于
處理Session產生的各種事件。在這里我們僅僅關心對象接收事件(objectReceived),當接收到對象后,將該對象打印到控制臺上。(注:
LogSessionHandler的代碼沒有列出來)
SessionHandler/SessionAcceptorHandler的更多介紹請參閱后面章節。
PacketEncoder/PacketDecoder
PacketEncoder
在前面的Hello
world示例中,我們都是通過session.flush方法來發送數據,而該方法只接收Packet類型的參數,這就要求我們在發送任何數據前都要先
進行轉換。比如在前面示例中,我們就得先將"Hello world!"字符串轉換成一個代表"Hello
world!"的Packet,然后再進行發送。
這種做法沒有什么問題,其唯一的缺陷在于將發送邏輯和序列化邏輯耦合在一起。比如在上面的示例中,發送邏輯是將"Hello
world!"發送出去,序列化邏輯是"Hello
world"字符串的字節表示。雖然有一定的關聯,但的確是兩種不同的邏輯,比如我們可以更改序列化邏輯,通過Serializable接口來序列化
"Hello world",但這并不影響發送邏輯。
神說,要有光,就有了光。你知道,神不關心光是怎么來的。PacketEncoder的作用就是分離發送邏輯和序列化邏輯。對于應用而言,在發送時
它只需要把要發送的對象傳遞給Session,至于怎么序列化,則由Session所關聯的PacketEncoder來處理。所以在上面的Hello
world示例中,發送邏輯可以改為:
session.send("Hello world!");
序列化邏輯可以通過PacketEncoder來設置:
session.setPacketEncoder(new PacketEncoder() {
public Packet encode(Session session, Object obj) throws Exception {
returnnew DefaultPacket(BufferFactory.wrap(obj.toString().getBytes()));
}
});
如果要改變序列化邏輯,比如通過Serializable接口來序列化,則只需要更改PacketEncoder,而不需要改動發送代碼:
session.setPacketEncoder(new SerialEncoder());
通過Cindy內置的PacketEncoderChain,應用可通過Session發送任意對象,把序列化邏輯完全交給PacketEncoder。如下面的偽碼所示:
PacketEncoderChain chain = new PacketEncoderChain();
chain.addPacketEncoder(new Message1Encoder());
chain.addPacketEncoder(new Message2Encoder());
session.setPacketEncoder(chain);
session.send(new Message1());
session.send(new Message2());
PacketDecoder
PacketEncoder是用來處理序列化邏輯的,相應的,PacketDecoder則是用來處理反序列化邏輯的。
發送方通過session.send方法可以發送任意對象,Session關聯的PacketEncoder會將該對象轉換為Packet發送出
去;接收方收到了Packet后,也可以通過其關聯的PacketDecoder將該Packet轉換為一個對象,再通知應用。假設發送方用了
SerialEncoder來發送序列化對象,則接收方就可以使用SerialDecoder來進行反序列化,然后應用就可以直接對對象進行處理。
session.setPacketDecoder(new SerialDecoder());
session.setSessionHandler(new SessionHandlerAdapter() {
public void objectReceived(Session session, Object obj) throws Exception {
}
}
Cindy源代碼example目錄下net.sf.cindy.example.helloworld包下提供了一個非常簡單的示例。
應用在實現PacketDecoder的時候要注意判斷當前已接收到的內容長度。
比如TCP是一個流協議,一方發送了1000個字節,另一方可能在接收的時候只收到了前200個字節,剩下的800個字節要在下次接收時才收到,可是需要1000個字節才能構造出一個完整的對象,則應用的PacketDecoder實現可能會類似于:
public
Object decode(Session session, Packet packet) throws Exception {
Buffer content = packet.getContent();
if (content.remaining() >= 1000) {
}
returnnull; }
SessionHandler/SessionHandlerAdapter
SessionHandler接口用于處理Session的各種事件。比如當連接建立成功后,會觸發SessionHandler的
sessionStarted事件;連接關閉后,會觸發SessionHandler的sessionClosed事件;對象發送成功后會觸發
SessionHandler的objectSent事件;對象接收成功后會觸發SessionHandler的objectReceived事件等等。
SessionHandlerAdapter是SessionHandler的空實現。即如果你僅僅對SessionHandler中某幾個事件感
興趣,就不用全部實現SessionHandler中定義的各種方法,而只需要繼承自SessionHandlerAdapter,實現感興趣的事件即
可。
通過SessionHandler,可以極大的減少內嵌類的數量。如前面的異步Hello world示例,如果用SessionHandler來改寫,則會是:
Session session = SessionFactory.createSession(SessionType.TCP);
session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
session.setSessionHandler(new SessionHandlerAdapter() {
public void sessionStarted(Session session) throws Exception {
Buffer buffer = BufferFactory.wrap("Hello world!".getBytes());
Packet packet = new DefaultPacket(buffer);
session.send(packet);
}
public void objectSent(Session session, Object obj) throws Exception {
session.close();
};
};
session.start();
在上面的代碼中,當sessionStarted事件觸發后,即Session成功建立后,會發送"Hello
world!"消息;當objectSent事件觸發后,即消息發送成功,調用session.close()異步關閉連接。這些處理全部都是異步的,并
且僅僅使用了一個內嵌類。
SessionHandler中一共定義了如下幾個方法:
- void sessionStarted(Session session) throws Exception //連接已建立
- void sessionClosed(Session session) throws Exception //連接已關閉
- void sessionTimeout(Session session) throws Exception //連接超時
- void objectReceived(Session session, Object obj) throws Exception //接收到了對象
- void objectSent(Session session, Object obj) throws Exception //發送了對象
- void exceptionCaught(Session session, Throwable cause) //捕捉到異常
如果通過session.setSessionTimeout方法設置了超時時間,則在指定的時間內沒有接收或發送任何數據就會觸發
sessionTimeout事件。發生了該事件并不代表著連接被關閉,應用可以選擇關閉該空閑連接或者發送某些消息來檢測網絡連接是否暢通。默認情況下
sessionTimeout為0,即從不觸發sessionTimeout事件。
如果通過PacketEncoder發送了任何對象,則objectSent事件將被觸發(注:通過Session.flush方法發送的
Packet不會觸發objectSent事件,只有通過Session.send方法發送的對象才會觸發objectSent事件);通過
PacketDecoder接收到任何對象,則objectReceived事件將被觸發,一般情況下應用都通過監聽該事件來對接收到的對象做相應處理。
exceptionCaught事件代表著session捕捉到了一個異常,這個異常可能是由于底層網絡所導致,也可能是應用在處理SessionHandler事件時所拋出來的異常。請注意,如果應用在處理exceptionCaught事件中拋出運行期異常,則該異常不會再度觸發exceptionCaught事件,否則可能出現死循環。
由于應用無法處理底層網絡所引發的異常,所以在部署穩定后,可以通過指定運行期參數-
Dnet.sf.cindy.disableInnerException來取消對底層網絡異常的分發,或者判斷異常類型——所有的內部異常都是從
SessionException基類繼承下來的,應用可以根據這個特性來判斷是底層網絡出現了異常,還是SessionHandler或
SessionFilter中拋出了異常。
SessionFilter/SessionFilterAdapter
SessionFilter與SessionHandler有些類似,均用于處理Session的各種事件,不同的則是SessionFilter
先處理這些事件,并判斷是否需要把該事件傳遞給下一個SessionFilter。等到所有SessionFilter處理完成后,事件才會傳遞給
SessionHandler由其來處理。
SessionFilterAdapter是SessionFilter的空實現,默認是把事件傳遞給下一個SessionFilter。SessionFilter中定義了如下幾個方法:
- void sessionStarted(SessionFilterChain filterChain) throws Exception;
- void sessionClosed(SessionFilterChain filterChain) throws Exception;
- void sessionTimeout(SessionFilterChain filterChain) throws Exception;
- void objectReceived(SessionFilterChain filterChain, Object obj) throws Exception;
- void objectSent(SessionFilterChain filterChain, Object obj) throws Exception;
- void exceptionCaught(SessionFilterChain filterChain, Throwable cause);
- void packetReceived(SessionFilterChain filterChain, Packet packet) throws Exception;
- void packetSend(SessionFilterChain filterChain, Packet packet) throws Exception;
- void packetSent(SessionFilterChain filterChain, Packet packet) throws Exception;
其中前六種方法和SessionHandler的作用一致,后三種方法則是用于處理接收和發送的Packet的。
可以看到SessionFilter可以算做SessionHandler的超集,那為什么需要引入SessionHandler呢?
雖然SessionFilter和SessionHandler在表現形式上有很多接近的地方,但是在應用邏輯上卻是處于不同的地位。
SessionHandler目的就是處理應用最為核心的業務邏輯,這些邏輯都是基于Object的,和網絡層沒有太大的關系;而
SessionFilter和網絡層的關聯就比較大了,一般用來處理網絡相關的一些邏輯(如包壓縮/解壓縮、包加密/解密)或者是核心業務邏輯外的一些分
支邏輯(如記錄日志、黑名單處理)。
基于SessionFilter,應用可以做很多擴展而不影響核心的業務處理(核心的業務處理應該放在SessionHandler中)。比如數據
包相關的擴展:加入SSLFilter,則所發送的數據都會被SSL編碼后才發送,接收的數據會先被解碼成明文才接收;加入ZipFilter,則可以壓
縮所發送的數據,接收時再解壓縮。比如統計的擴展:加入StatisticFilter,則可以統計發送和接收的字節數,以及發送速率。比如ACL的擴
展:加入AllowListFilter/BlockListFilter,則可以允許指定或限制某些IP地址訪問;加入LoginFilter,如果用
戶沒有登錄,則不把事件傳遞給后面處理業務邏輯的SessionFilter或SessionHandler。比如線程處理的擴展:加入
ThreadPoolFilter,可以指定讓某個線程池來進行后面事件的處理。比如日志記錄的擴展:加入LogFilter,則可以記錄相應的事件信
息。
所列舉的這些只是一些基于SessionFilter的常見應用,應用可以根據自身的業務需要來進行選擇。Cindy所推薦的實踐是將不同的業務邏輯分散到不同的SessionFilter中,在SessionHandler中只處理核心邏輯。
在這里可以示范一個ZipFilter的偽碼:
public class ZipFilter extends SessionFilterAdapter {
public void packetReceived(SessionFilterChain filterChain, Packet packet) throws Exception {
Packet unzippedPacket = unzip(packet); super.packetReceived(filterChain, unzippedPacket); }
public void packetSend(SessionFilterChain filterChain, Packet packet) throws Exception {
Packet zippedPacket = zip(packet); super.packetSend(filterChain, zippedPacket); }
}
Buffer/Packet
在前面的示例中我們已經接觸到了Buffer/Packet,Buffer是數據流的抽象,Packet是網絡包的抽象。
Java NIO中已經提供了java.nio.ByteBuffer類用于表示字節流,為什么不直接使用java.nio.ByteBuffer?
java.nio.ByteBuffer雖然是NIO中表示字節流的標準類,但是對于高負荷的網絡應用而言,其設計上存在著以下缺陷:
- ByteBuffer并不是一個接口,而是一個抽象類。最為關鍵的地方是其構造函數為包級私有,這意味著我們無法繼承自ByteBuffer構造子類。
- ByteBuffer
僅僅是其所持有內容的一個外部包裝,多個不同的ByteBuffer可以共享相同的內容。比如通過slice、duplicate等方法構造一個新的
ByteBuffer,該新ByteBuffer和原ByteBuffer共享的是同一份內容。這就意味著無法構造基于ByteBuffer的緩存機制。
比如如下代碼:
ByteBuffer buffer1 = ByteBuffer.allocate(100);
ByteBuffer buffer2 = buffer1.slice();
System.out.println(buffer1 == buffer2);
System.out.println(buffer1.equals(buffer2));
打印出來的都是false,而實際上兩個ByteBuffer共享的是同一份數據。在不經意的情況下,可能發生應用把兩個ByteBuffer返回緩存中,被緩存當成是不同的對象進行處理,可能破壞數據完整性。
為什么Cindy使用自定義的Buffer接口?
- Buffer是一個接口,如果對現有的實現類不滿意,應用可以方便的加入自己的實現
- 支持一系列的工具方法,比如indexOf/getString/putString/getUnsignedXXX等等,加入這些常用方法會給應用帶來很大的方便
- 可以非常方便的與nio中的ByteBuffer做轉換,并且效率上不會有太大損失。由于大部分的網絡類庫都是基于nio的ByteBuffer來設計的,這樣保證了兼容性
- NIO的ByteBuffer無法表示ByteBuffer數組,而Cindy中提供了工具類把Buffer數組包裝成一個Buffer
- Cindy的Buffer是可緩存的,對于高負荷的網絡應用而言,這會帶來性能上優勢
在一般情況下,應用應該直接通過BufferFactory類來構造Buffer實例。目前BufferFactory類中有如下方法:
- Buffer wrap(byte[] array)
- Buffer wrap(byte[] array, int offset, int length)
- Buffer wrap(ByteBuffer buffer)
- Buffer wrap(Buffer[] buffers)
- Buffer allocate(int capacity)
- Buffer allocate(int capacity, boolean direct)
前四種方法都是包裝方法,將現有的字節數組、ByteBuffer以及Buffer數組包裝成一個單一的Buffer對象。第五和第六種方法是構造
一個新的Buffer對象,新構造Buffer對象的內容是不確定的(java.nio.ByteBuffer.allocate得到的內容是全0),可
能來自緩存或直接生成,依賴于對Buffer緩存的配置。一般情況下都是通過第五種方法構造新的Buffer對象,通過運行期參數-
Dnet.sf.cindy.useDirectBuffer可以改變默認行為,是構造Non-Direct Buffer還是構造Direct
Buffer。
默認情況下生成的Buffer是可以被緩存,在調用了session.send/flush方法后,Buffer的內容就會被釋放掉,或者手動調用
Buffer.release也能釋放Buffer所持有的內容。通過Buffer.isReleased方法可以可以判斷當前的Buffer是否被釋放
掉。對已經釋放掉的Buffer進行get/put操作都會導致ReleasedBufferException,但是對
position/limit/capacity的操作還是有效的。如果應用不希望Buffer的內容被釋放,可以通過設置permanent屬性為
true來使得Buffer的內容不被釋放。
是否所有Buffer都需要手動調用release來釋放?
這里有一個基本的原則:誰生成,誰釋放。比如應用調用BufferFactory.allocate(1024)得到了一個Buffer實例,這個Buffer實例是由應用生成出來的,那么應用在使用完該Buffer實例后就應該負責調用buffer.release將它釋放掉。
不過框架為了提供應用的方便,所有通過session.send/flush方法發送的Buffer都會在發送完成后被釋放。如果上面得到的這個
Buffer實例通過調用session.flush方法被發送出去,那么該Buffer實例就不需要再通過release方法手工釋放了。
如果應用不手動release自己生成出來的Buffer,也不會造成內存泄漏,因為這些Buffer會通過Java的垃圾回收機制被回收掉。唯一的缺陷在于由于無法重用對象,性能可能會有少許的降低。
舉個例子,假設你實現了一個特定的PacketDecoder:
public class MyPacketDecoder implements PacketDecoder {
publicObject decode(Session session, Packet packet) throws Exception {
Buffer content = packet.getContent();
Buffer result = allocateBuffer();
decodeContent(content, result);
return result;
}
}
可以看到,在MyPacketDecoder的實現當中,并沒有對接收到的Buffer調用release方法,因為這個Buffer并不是應用生
成出來的,不應該由應用來釋放。在該MyPacketDecoder實現中生成了一個新的Buffer實例,并當作PacketDecoder的返回值。
這個新的實例是應用生成出來的,則應該由應用來釋放,所以應用應該在相應的objectReceived事件中釋放該Buffer實例,如:
public void objectReceived(Session session, Object obj) throws Exception {
Buffer buffer = (Buffer) obj;
try {
process(buffer);
} finally {
buffer.release();
}
}
Advanced Topic
JMX支持
通過運行期屬性-Dnet.sf.cindy.useJmx可以開啟JMX支持(Java 5.0中內置了JMX支持,如果運行在Java 1.4版本中,則需要手工下載相應類庫)。
下圖是通過jconsole進行JMX管理的示例:
流量控制
當網絡接收的速度大于應用的處理速度時,如果不控制接收速率,則收到的消息會在隊列中堆積,應用無法及時處理而造成內存溢出。Cindy
3.0中加入了流量控制功能,當接收隊列中消息超過指定數量時,Cindy會放慢網絡接收速度。該功能對于防止內存溢出以及應用程序調試有很大幫助。
可以通過運行期參數-Dnet.sf.cindy.dispatcher.capacity來指定隊列中最多可以堆積的消息數。默認值是1000,即隊列中消息數超過1000,網絡接收速度就會放緩,等到消息數少于1000后,接收速度就會恢復正常。
Read packet size
Read packet size是指接收時每次讀包所構造的緩沖區大小。對于TCP而言,這個值影響到的僅僅是效率;對于UDP則影響到數據正確性。
對于UDP應用,假設發送方發送的包所攜帶數據大小為1000字節,接收方的read packet size設置為600字節,則后面400個字節會被丟棄,這樣
構造出來的邏輯包可能會不正確。
對于TCP應用,如果邏輯包是定長的,這個值最好也設為邏輯包的長度。該值太少,則可能導致在過多的包組合操作(比如邏輯包長度為1000字節,
read packet
size設置為100,則可能需要把10次接收到的packet組合成一個packet才能構造出一個邏輯包);該值太大,可能造成內存的浪費(不過由于
Buffer緩存的存在會緩解這一情況)。
默認的read packet
size是8192字節,可以通過運行期屬性-Dnet.sf.cindy.session.readPacketSize來更改。如果對單獨的
Session進行更改,則可以通過session的readPacketSize屬性進行設置。
Direct ByteBuffer vs Non-Direct ByteBuffer
java.nio.ByteBuffer引入了這兩種類型的ByteBuffer,在Cindy中也有基于這兩種ByteBuffer的Buffer包裝類。那么這兩者的區別在什么地方?在什么環境下采用哪種類型的ByteBuffer會更有效率?
Non-direct ByteBuffer內存是分配在堆上的,直接由Java虛擬機負責垃圾收集,你可以把它想象成一個字節數組的包裝類,如下偽碼所示:
HeapByteBuffer extends ByteBuffer {
byte[] content;
int position, limit, capacity;
......
}
而Direct ByteBuffer是通過JNI在Java虛擬機外的內存中分配了一塊,該內存塊并不直接由Java虛擬機負責垃圾收集,但是在Direct ByteBuffer包裝類被回收時,會通過Java Reference機制來釋放該內存塊。如下偽碼所示:
DirectByteBuffer extends ByteBuffer {
long address;
int position, limit, capacity;
protected void finalize() throws Throwable{
releaseAddress();
......
}
......
}
除開以上的這些外,如果我們查找Java實現類的代碼,就可以了解到這兩者之間更深入的區別。比如在Sun的Java實現中,絕大部分
Channel類都是通過sun.nio.ch.IOUtil這個工具類和外界進行通訊的,如FileChannel/SocketChannel等等。
簡單的用偽碼把write方法給表達出來(read方法也差不多,就不多做說明了):
int write(ByteBuffer src, ......) {
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(...);
ByteBuffer direct = getTemporaryDirectBuffer(src);
writeFromNativeBuffer(direct,......);
updatePosition(src);
releaseTemporaryDirectBuffer(direct);
}
是的,在發送和接收前會把Non-direct ByteBuffer轉換為Direct
ByteBuffer,然后再進行相關的操作,最后更新原始ByteBuffer的position。這意味著什么?假設我們要從網絡中讀入一段數據,再
把這段數據發送出去的話,采用Non-direct ByteBuffer的流程是這樣的:
而采用Direct ByteBuffer的流程是這樣的:
可以看到,除開構造和析構臨時Direct ByteBuffer的時間外,起碼還能節約兩次內存拷貝的時間。那么是否在任何情況下都采用Direct Buffer呢?
答案是否定的。對于大部分應用而言,兩次內存拷貝的時間幾乎可以忽略不計,而構造和析構Direct
Buffer的時間卻相對較長。在JVM的實現當中,某些方法會緩存一部分臨時Direct ByteBuffer,意味著如果采用Direct
ByteBuffer僅僅能節約掉兩次內存拷貝的時間,而無法節約構造和析構的時間。就用Sun的實現來說,write(ByteBuffer)和
read(ByteBuffer)方法都會緩存臨時Direct
ByteBuffer,而write(ByteBuffer[])和read(ByteBuffer[])每次都生成新的臨時Direct
ByteBuffer。
根據這些區別,在選擇ByteBuffer類型上有如下的建議:
- 如果你做中小規模的應用(在這里,應用大小是按照使用ByteBuffer的次數和規模來做劃分的),并不在乎這些細節問題,請選擇Non-direct ByteBuffer
- 如果采用Direct ByteBuffer后性能并沒有出現你所期待的變化,請選擇Non-direct ByteBuffer
- 如果沒有Direct ByteBuffer Pool,盡量不要使用Direct ByteBuffer
- 除非你確定該ByteBuffer會長時間存在,并且和外界有頻繁交互,可采用Direct ByteBuffer
- 如
果采用Non-direct
ByteBuffer,那么采用非聚集(gather)的write/read(ByteBuffer)效果反而*可能*超出聚集的write/read
(ByteBuffer[]),因為聚集的write/read的臨時Direct
ByteBuffer是非緩存的(在Sun的實現上是這樣,其他的實現則不確定)
基本上,采用Non-direct
ByteBuffer總是對的!因為內存拷貝需要的開銷對大部分應用而言都可以忽略不計。在Cindy中,一般的應用只需要通過
BufferFactory.allocate方法來得到Buffer實例即可,默認設置下采用的是Non-Direct
Buffer;通過BufferFactory.wrap方法包裝字節數組或ByteBuffer得到的Buffer類也有很高的效率;只有通過
BufferFactory.wrap(Buffer[])方法目前還處于實驗階段,其效率不一定比生成一個大的Buffer,然后拷貝現有內容更快。如
果應用非常注重效率,要使用該方法上要多加注意。
默認參數設置
對Cindy中一些默認參數的配置可以通過以下兩種方式:
Cindy在啟動時會在當前classpath上尋找cindy.properties文件,如果找到后會把該屬性配置文件讀入到緩存中。在該
properties中的配置無需net.sf.cindy.前綴,即如果要配置net.sf.cindy.enableJmx=true并且使用
DirectBuffer,則只需要在cindy.properties中加入:
enableJmx=true
useDirectBuffer=true
運行時通過-D參數指定的配置,如果和上面配置文件中的Key相同,則會覆蓋配置文件的配置。通過-D參數指定配置時不能省略net.sf.cindy.前綴。
當前版本全部可以配置的屬性請參見Cindy源代碼包的readme.txt。
posted on 2007-01-19 00:09
苦笑枯 閱讀(2258)
評論(0) 編輯 收藏 所屬分類:
Java