實現(xiàn)了Messagerecognizer和Message之后,要實現(xiàn)Server和Client是非常容易的事情,通過下面的代碼,你會很容易理解如何去實現(xiàn)協(xié)議的處理流程。
實現(xiàn)服務(wù)端兩個主要的類,一個是Server類,另一個是ServerSessionListener. Server類負(fù)責(zé)啟動主程序并監(jiān)聽連接。而ServerSessionListener用于處理和發(fā)送消息。
public
class ServerSessionListener implements SessionListener {
?
???????
public ServerSessionListener() {
??????? }
?
???????
public
void connectionEstablished(Session session) {
?????????????? System.out.println(session.getSocketAddress() + " connected");
?
??????????????
//?設(shè)置空閑時間為60秒
?????????????? session.getConfig().setIdleTime(60);
?
??????????????
// 設(shè)置sum的初始值為0。
?????????????? session.setAttachment(new Integer(0));
??????? }
?
???????
public
void connectionClosed(Session session) {
?????????????? System.out.println(session.getSocketAddress() + " closed");
?
?????? }
??????? // 當(dāng)收到client發(fā)來的消息時,此方法被調(diào)用
?
??????
public
void messageReceived(Session session, Message message) {
?
????????????? System.out.println(session.getSocketAddress() + " RCVD: " + message);
?
??????????????
// client端只發(fā)送AddMessage. 其它情況要另作處理
??????????????
// 在這里只是簡單的進(jìn)行類型轉(zhuǎn)換處理
?????????????? AddMessage am = (AddMessage) message;
?
??????????????
// 將收到的消息里的值加上當(dāng)前sum的值.
??????????????
int sum = ((Integer) session.getAttachment()).intValue();
??????????????
int value = am.getValue();
??????????????
long expectedSum = (long) sum + value;
??????????????
if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {
??????????????????????
// 如果溢位返回錯誤消息
???????
?????????????? ResultMessage rm = newResultMessage();
???????
?????????????? rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。
?????????????????????? rm.setOk(false);
?????????????????????? session.write(rm);
?????????????? } else {
??????????????????????
//? 加總
?????????????????????? sum = (int) expectedSum;
?????????????????????? session.setAttachment(new Integer(sum));
?
??????????????????????
// 返回結(jié)果消息
?????????????????????? ResultMessage rm = newResultMessage();
?????????????????????? rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。
?????????????????????? rm.setOk(true);
?????????????????????? rm.setValue(sum);
?????????????????????? session.write(rm);
?????????????? }
??????? }
?
???????
public
void messageSent(Session session, Message message) {
?????????????? System.out.println(session.getSocketAddress() + " SENT: " + message);
??????? }
?
???????
public
void sessionIdle(Session session) {
?????????????? System.out.println(session.getSocketAddress()
?????????????????????????????? + " disconnecting the idle");
?
??????????????
// 關(guān)閉空閑的會話。
?????????????? session.close();
??????? }
??????? // 異常發(fā)生時,將調(diào)用此方法
???????
public
void exceptionCaught(Session session, Throwable cause) {
?????????????? System.out.println(Thread.currentThread().getName()
?????????????????????????????? + session.getSocketAddress() + " exception:");
?
????????????? cause.printStackTrace(System.out);
?
?
?????????????
if (cause instanceof MessageParseException) {
?
?????????????????????
//?印出錯誤信息內(nèi)容,便于調(diào)試
?????????????????????? MessageParseException mpe = (MessageParseException) cause;
?????????????????????? ByteBuffer buf = mpe.getBuffer();
?????????????????????? System.out.println(buf);
?????????????????????? System.out.print("Buffer Content: ");
??????????????????????
while (buf.remaining() > 0) {
?????????????????????????????? System.out.print(buf.get() & 0xFF);
?????????????????????????????? System.out.print(' ');
?????????????????????? }
?????????????????????? System.out.println();
?????????????? }
?
??????????????
// 關(guān)閉會話
?????????????? session.close();
??????? }
}
?
服務(wù)端運(yùn)行后,其輸出的內(nèi)容示例如下:
listening on port 8080
/127.0.0.1:4753 connected
/127.0.0.1:4753 RCVD: 0:ADD(4)
/127.0.0.1:4753 RCVD: 1:ADD(6)
/127.0.0.1:4753 RCVD: 2:ADD(2)
/127.0.0.1:4753 RCVD: 3:ADD(7)
/127.0.0.1:4753 RCVD: 4:ADD(8)
/127.0.0.1:4753 RCVD: 5:ADD(1)
/127.0.0.1:4753 SENT: 0:RESULT(4)
/127.0.0.1:4753 SENT: 1:RESULT(10)
/127.0.0.1:4753 SENT: 2:RESULT(12)
/127.0.0.1:4753 SENT: 3:RESULT(19)
/127.0.0.1:4753 SENT: 4:RESULT(27)
/127.0.0.1:4753 SENT: 5:RESULT(28)
/127.0.0.1:4753 closed
實現(xiàn)客戶端
跟服務(wù)端對應(yīng),主要由Client和ClientSessionListener組成。
public
class Client {
???????
private
static
final String HOSTNAME = "localhost";
?
???????
private
static
final
int PORT = 8080;
?
???????
private
static
final
int CONNECT_TIMEOUT = 30; // seconds
?
???????
private
static
final
int DISPATCHER_THREAD_POOL_SIZE = 4;
?
???????
public
static
void main(String[] args) throws Throwable {
??????????????
// 預(yù)備要加總的值。
??????????????
int[] values = newint[args.length];
??????????????
for (int i = 0; i < args.length; i++) {
?????????????????????? values[i] = Integer.parseInt(args[i]);
?????????????? }
?
??????????????
// 初始化 I/O processor 和 event dispatcher
?????????????? IoProcessor ioProcessor = new IoProcessor();
?????????????? ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
?
??????????????
// 開始缺省數(shù)量的I/O工作線程
?????????????? ioProcessor.start();
?
??????????????
// 啟動指定數(shù)量的event dispatcher線程
??????? eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE
?????????????? eventDispatcher.start();
?
??????????????
// 準(zhǔn)備 message recognizer
?????????????? MessageRecognizer recognizer = new SumUpMessageRecognizer(
?????????????????????????????? SumUpMessageRecognizer.CLIENT_MODE);
?
??????????????
// 準(zhǔn)備客戶端會話。
?????????????? Session session = new Session(ioProcessor, new InetSocketAddress(
?????????????????????????????? HOSTNAME, PORT), recognizer, eventDispatcher);
??????????????
??????????????
?????????????? session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);
??????????????
??????????????
// 開始會話,并使用ClientSessionListener監(jiān)聽。
?????????????? ClientSessionListener listener = new ClientSessionListener(values);
?????????????? session.addSessionListener(listener);
?????????????? session.start();
??????????????
??????????????
// 一直等到加總完成
??????????????
while ( !listener.isComplete() ) {
?????????????????????? Thread.sleep(1000);
?????????????? }
??????????????
??????????????
// 停止 I/O processor 和 event dispatcher
?????????????? eventDispatcher.stop();
?????????????? ioProcessor.stop();
?
?????? }
?}
public
class ClientSessionListener implements SessionListener {
?
?
??????
private
final
int[] values;
?
??????
private
boolean complete;
?
?
??????
public ClientSessionListener(int[] values) {
?
?????????????
this.values = values;
?
?????? }
?
??????
?
??????
public
boolean isComplete() {
?
?????????????
return complete;
?
?????? }
??????? // 當(dāng)連接建立好后會調(diào)用此方法。
?
??????
public
void connectionEstablished(Session session) {
?
????????????? System.out.println("connected to " + session.getSocketAddress());
?
?
?????????????
// 發(fā)送加總請求。
?
?????????????
for (int i = 0; i < values.length; i++) {
?
????????????????????? AddMessage m = new AddMessage();
?
????????????????????? m.setSequence(i);
?
????????????????????? m.setValue(values[i]);
?
????????????????????? session.write(m);
?
????????????? }
?
?????? }
?
?
??????
public
void connectionClosed(Session session) {
?
????????????? System.out.println("disconnected from " + session.getSocketAddress());
??????? }
??????? // 當(dāng)收到server的回應(yīng)信息時,會調(diào)用此方法
???????
public
void messageReceived(Session session, Message message) {
?????????????? System.out.println("RCVD: " + message);
?
??????????????
// 服務(wù)端只發(fā)送ResultMessage. 其它情況下
?
?????????????
// 要通過instanceOf來判斷它的類型.
?
????????????? ResultMessage rm = (ResultMessage) message;
?
?????????????
if (rm.isOk()) {
?
?????????????????????
// 如果ResultMessage是OK的.
?
?????????????????????
// 根據(jù)ResultMessage的sequence值來判斷如果,
?
?????????????????????
// 一次消息的sequence值,則
?
?????????????????????
if (rm.getSequence() == values.length - 1) {
?
?????????????????????????????
// 打印出結(jié)果.
?
????????????????????????????? System.out.println("The sum: " + rm.getValue());
???????????????????????????????// 關(guān)閉會話
?????????????????????????????? session.close();
?????????????????????????????? complete = true;
?????????????????????? }
?????????????? } else {
??????????????????????
//?如有錯誤,則打印錯誤信息,并結(jié)束會話.
?
????????????????????? System.out.println("server error, disconnecting...");
?????????????????????? session.close();
?????????????????????? complete = true;
?????????????? }
??????? }
?
???????
public
void messageSent(Session session, Message message) {
?????????????? System.out.println("SENT: " + message);
??????? }
?
???????
public
void sessionIdle(Session session) {
??????????????
??????? }
?
???????
public
void exceptionCaught(Session session, Throwable cause) {
?????????????? cause.printStackTrace(System.out);
?
??????????????
if (cause instanceof ConnectException) {
??????????????????????
// 如果連接server失敗, 則間隔5秒重試連接.
?
????????????????????? System.out.println("sleeping...");
?
?????????????????????
try {
?
????????????????????????????? Thread.sleep(5000);
?
????????????????????? } catch (InterruptedException e) {
?
????????????????????? }
?
?????????????????????
?
????????????????????? System.out.println("reconnecting... " + session.getSocketAddress());
?
????????????????????? session.start();
?
????????????? } else {
?
????????????????????? session.close();
?
????????????? }
?
?????? }
?}
通過上面的例子,你也許會發(fā)現(xiàn)實現(xiàn)一個自定義的協(xié)議原來如此簡單。你如果用Netty試著去實現(xiàn)自己的smtp或pop協(xié)議,我想也不會是一件難事了。
?
Netty2的首頁在http://gleamynode.net/dev/projects/netty2/index.html,你可以在這找到本文的全部源碼。