Netty項(xiàng)目中,自帶了很多使用的例子,對(duì)于剛剛開(kāi)始接觸和學(xué)習(xí)Netty源碼的開(kāi)發(fā)者來(lái)說(shuō),可以通過(guò)例子來(lái)更好的理解Netty的具體實(shí)現(xiàn)。源碼可以再netty 4.0的example找到。
1 public class EchoServerHandler extends ChannelInboundByteHandlerAdapter {
2 private static final Logger logger = Logger.getLogger(
3 EchoServerHandler.class.getName());
4
5 @Override
6 public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
7 ByteBuf out = ctx.nextOutboundByteBuffer();
8 out.discardReadBytes();
9 out.writeBytes(in);
10 ctx.flush();
11 }
12
13 @Override
14 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
15 // Close the connection when an exception is raised.
16 logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
17 ctx.close();
18 }
19 }
Line 1: 聲明一個(gè)EchoServerHandler, 并且繼承了ChannelInboundByteHandlerAdapter。 這樣EchoServerHandler就可以處理client發(fā)送過(guò)來(lái)的request。
Line 6: 重寫inboundBufferUpdated方法,對(duì)client發(fā)送過(guò)來(lái)的request進(jìn)行對(duì)ByteBuffer對(duì)象的操作。關(guān)于ByteBuffer的概念將在以后章節(jié)討論。
Line 7: ctx.nextOutboundByteBuffer()將返回一個(gè)ByteBuffer對(duì)象。如果該對(duì)象不存在,那么拋出UnsupportedOperationException異常。
Line 14: 重寫exceptionCaught在server端捕獲異常。
1 public class EchoServer {
2
3 private final int port;
4
5 public EchoServer(int port) {
6 this.port = port;
7 }
8
9 public void run() throws Exception {
10 // Configure the server.
11 ServerBootstrap b = new ServerBootstrap();
12 try {
13 b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
14 .channel(NioServerSocketChannel.class)
15 .option(ChannelOption.SO_BACKLOG, 100)
16 .localAddress(new InetSocketAddress(port))
17 .childOption(ChannelOption.TCP_NODELAY, true)
18 .handler(new LoggingHandler(LogLevel.INFO))
19 .childHandler(new ChannelInitializer<SocketChannel>() {
20 @Override
21 public void initChannel(SocketChannel ch) throws Exception {
22 ch.pipeline().addLast(
23 new LoggingHandler(LogLevel.INFO),
24 new EchoServerHandler());
25 }
26 });
27
28 // Start the server.
29 ChannelFuture f = b.bind().sync();
30
31 // Wait until the server socket is closed.
32 f.channel().closeFuture().sync();
33 } finally {
34 // Shut down all event loops to terminate all threads.
35 b.shutdown();
36 }
37 }
38
39 public static void main(String[] args) throws Exception {
40 int port;
41 if (args.length > 0) {
42 port = Integer.parseInt(args[0]);
43 } else {
44 port = 8080;
45 }
46 new EchoServer(port).run();
47 }
48 }
Line 11: 通過(guò)ServerBootStrap對(duì)象,來(lái)啟動(dòng)服務(wù)器
Line 13: 通過(guò)group方法,來(lái)綁定EventLoopGroup,EventLoopGroup用來(lái)處理SocketChannel和Channel上面的所有時(shí)間和IO。
Line 16: localAddress方法用于綁定服務(wù)器地址和端口。
Line 18,19: handler方法和childhandler方法用于指定各種ChannelHandler對(duì)象,指定的ChannelHandler對(duì)象將用于處理client端來(lái)的request。
Line 21: 初始化一個(gè)Channel對(duì)象,并且綁定之前定義的EchoServerHandler類。
Line 22: 將EchoServerHandler添加到Pipeline中。
Line 29: ServerBootstrap對(duì)象準(zhǔn)備就緒,啟動(dòng)server,
Line 32: 等待直到server socket關(guān)閉
public class EchoClientHandler extends ChannelInboundByteHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.write(firstMessage);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
EchoClientHandler類的實(shí)現(xiàn)與EchoServerHandler類似,也都繼承了ChannelInboundByteHandlerAdapter。不同在于重寫了channelActive()方法。
1 public class EchoClient {
2
3 private final String host;
4 private final int port;
5 private final int firstMessageSize;
6
7 public EchoClient(String host, int port, int firstMessageSize) {
8 this.host = host;
9 this.port = port;
10 this.firstMessageSize = firstMessageSize;
11 }
12
13 public void run() throws Exception {
14 // Configure the client.
15 Bootstrap b = new Bootstrap();
16 try {
17 b.group(new NioEventLoopGroup())
18 .channel(NioSocketChannel.class)
19 .option(ChannelOption.TCP_NODELAY, true)
20 .remoteAddress(new InetSocketAddress(host, port))
21 .handler(new ChannelInitializer<SocketChannel>() {
22 @Override
23 public void initChannel(SocketChannel ch) throws Exception {
24 ch.pipeline().addLast(
25 new LoggingHandler(LogLevel.INFO),
26 new EchoClientHandler(firstMessageSize));
27 }
28 });
29
30 // Start the client.
31 ChannelFuture f = b.connect().sync();
32
33 // Wait until the connection is closed.
34 f.channel().closeFuture().sync();
35 } finally {
36 // Shut down the event loop to terminate all threads.
37 b.shutdown();
38 }
39 }
40
41 public static void main(String[] args) throws Exception {
42 // Print usage if no argument is specified.
43 if (args.length < 2 || args.length > 3) {
44 System.err.println(
45 "Usage: " + EchoClient.class.getSimpleName() +
46 " <host> <port> [<first message size>]");
47 return;
48 }
49
50 // Parse options.
51 final String host = args[0];
52 final int port = Integer.parseInt(args[1]);
53 final int firstMessageSize;
54 if (args.length == 3) {
55 firstMessageSize = Integer.parseInt(args[2]);
56 } else {
57 firstMessageSize = 256;
58 }
59
60 new EchoClient(host, port, firstMessageSize).run();
61 }
62 }
Line 20: 指定遠(yuǎn)程服務(wù)器的地址和端口(localhost:8080)
備注:因?yàn)楣P者開(kāi)始寫Netty源碼分析的時(shí)候,Netty 4.0還是處于Alpha階段,之后的API可能還會(huì)有改動(dòng),筆者將會(huì)及時(shí)更改。使用開(kāi)源已經(jīng)有好幾年的時(shí)間了,一直沒(méi)有時(shí)間和精力來(lái)具體研究某個(gè)開(kāi)源項(xiàng)目的具體實(shí)現(xiàn),這次是第一次寫開(kāi)源項(xiàng)目的源碼分析,如果文中有錯(cuò)誤的地方,歡迎讀者可以留言指出。對(duì)于轉(zhuǎn)載的讀者,請(qǐng)注明文章的出處。
希望和廣大的開(kāi)發(fā)者/開(kāi)源愛(ài)好者進(jìn)行交流,歡迎大家的留言和討論。
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;