netty從4.x開始,已經(jīng)不再是jboss的一部分,所以引包中,發(fā)現(xiàn)還有org.jboss.netty.*等字樣說明你還用的是3.x即以下版本,現(xiàn)在已經(jīng)到5.0了該更新了,新的包名統(tǒng)一為io.netty.*開頭
5.0以后內(nèi)容:
netty初體驗,netty是個高性能的java通信框架,至于oio還是nio,它都支持,核心概念:基于事件驅(qū)動的架構(gòu),很容易讓人聯(lián)想到觀察者模式。它提供的數(shù)據(jù)結(jié)構(gòu)為ByteBuf,這個是個什么東西?可以理解為:一個數(shù)據(jù)的載體,比如我接收和發(fā)送的消息,得到的都是ByteBuf對象,它是對字節(jié)流的一個高度抽象化,并提供比NIO的ByteBuffer更多的功能,不至于同一個buffer中經(jīng)常操作,flip,compact等方法,更為簡潔實用,同時如果客戶端和服務(wù)端都采用java,那么它也可以提供自定義的object類型的數(shù)據(jù)載體。netty官方提供的example很多,客戶端和服務(wù)端如何寫,照貓畫虎即可。
把握住兩個關(guān)鍵點(diǎn)即可:*handler和傳遞的內(nèi)容(即發(fā)送和接收的消息),*handler里面包含具體事件的觸發(fā)方法:比如exceptionCaught方法(出現(xiàn)異常時)、messageReceived方法(接收消息時)、channelActive(連接剛建立時)等方法,采用最新的SimpleChannelInboundHandler<T> 這個handler,T可以為自定義的任何對象,如果不需要自定義對象,那么傳遞Object即可,如果是自定義對象或者java基本類型或String類型,那么必須得有個大前提:客戶端和服務(wù)端都必須得進(jìn)行一定的轉(zhuǎn)換,換句話說:我的客戶端和服務(wù)端必須都得用netty的相關(guān)API封裝一次(具體看netty例子)。如果不是自定義對象,而傳遞的是Object,那么在收到消息時,必須進(jìn)行強(qiáng)制轉(zhuǎn)換為ByteBuf對象,通過ChannelHandlerContext進(jìn)行發(fā)送,這個時候發(fā)送的是ByteBuf對象,如果是自定義對象,那么ChannelHandlerContext.write(自定義對象)即可,同時必須調(diào)用flush方法才能發(fā)送出去,也可調(diào)用wirteAndFlush(自定義對象)方法。
深究了兩天netty,得出的結(jié)論是:如果客戶端和服務(wù)端都基于netty,那么互發(fā)消息,各種類型協(xié)議消息,基本都不成問題。官方example很多,照貓畫虎,自定義隨便玩
但是,如果我只用netty的服務(wù)端,而客戶端是一個純粹的socket,比如其它語言的客戶端,比如純粹只是一個硬件,進(jìn)行socket連接等等,即不采用netty的API,而且不是java語言,那么就會有一些問題。
首先我用java的oio和nio寫了2個極其簡單的客戶端不使用netty,同時netty服務(wù)端要給返回響應(yīng),先看兩個極其簡單的客戶端代碼:
1
//首先oio客戶端
2
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException
{
3
Socket socket = new Socket("127.0.0.1",8080);
4
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
5
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
6
bw.write("hello world!");
7
bw.flush();
8
9
while(true)
{
10
String str = br.readLine();
11
System.out.println(str);
Thread.sleep(2000);
}
12
}
1
//其次nio客戶端,其實也可改成阻塞,效果一樣
2
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException
{
3
4
SocketChannel socket = SocketChannel.open();
5
boolean status = socket.connect(new InetSocketAddress("127.0.0.1",8080) );
6
System.out.println(status);
7
socket.configureBlocking(false);
8
// Selector selection = Selector.open();
9
String msg = "hello world!";
10
ByteBuffer buffer = ByteBuffer.allocate(msg.getBytes().length);
11
buffer.put(msg.getBytes());
12
buffer.flip();
13
socket.write(buffer);
14
buffer.compact();
15
16
17
18
Thread.sleep(1000000);
19
} netty服務(wù)端的handler是DiscardServerHandler extends SimpleChannelInboundHandler<Object> 里面的messageReceived方法
1
@Override
2
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception
{
3
// discard
4
ByteBuf buffer = (ByteBuf) msg;
5
byte[] bytes = new byte[buffer.readableBytes()];
6
System.out.println("readableBytes="+buffer.readableBytes());
7
buffer.readBytes(bytes);
8
String str = new String(bytes);
9
logger.info("=========="+str+"==============");
10
11
boolean close =false;
12
if(str.equals("bye"))
{
13
close = true;
14
}
15
16
System.out.println("buffer capacity="+buffer.capacity() +" str length="+str.length()
17
+"readableBytes="+buffer.readableBytes());
18
String str1 = "hi I'am server this is my info : @111111@";
19
buffer.writeBytes(str1.getBytes());
20
ctx.writeAndFlush(buffer);
21
// ChannelFuture future = ctx.writeAndFlush(buffer);
22
23
// Close the connection after sending 'Have a good day!'
24
// if the client has sent 'bye'.
25
// if (close) {
26
// future.addListener(ChannelFutureListener.CLOSE);
27
// }
28
}
服務(wù)端,客戶端也能收到消息,但服務(wù)端拋出以下異常:
警告: Unexpected exception from downstream.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:115)
at io.netty.buffer.WrappedByteBuf.release(WrappedByteBuf.java:819)
at io.netty.buffer.SimpleLeakAwareByteBuf.release(SimpleLeakAwareByteBuf.java:34)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:68)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:110)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:138)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:320)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
at java.lang.Thread.run(Unknown Source)
原因是:refCnt=0了,表明使用的這個ByteBuf已經(jīng)被回收了,代碼中調(diào)用ctx.writeAndFlush(buff)會使此次ByteBuf回收也即將refCnt置為0,那么在SimpleChannelInboundHandler里面,會接著調(diào)用代碼如下:
1
@Override
2
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
3
boolean release = true;
4
try
{
5
if (acceptInboundMessage(msg))
{
6
@SuppressWarnings("unchecked")
7
I imsg = (I) msg;
8
messageReceived(ctx, imsg);
9
} else
{
10
release = false;
11
ctx.fireChannelRead(msg);
12
}
13
} finally
{
14
if (autoRelease && release)
{
15
ReferenceCountUtil.release(msg);
16
}
17
}
18
} 里面的15行ReferenceCountUtil.release(msg);這是netty提供的一個釋放ByteBuf內(nèi)存的方法,如果不采用這個,直接調(diào)用ByteBuf.release方法也可以,但是因為調(diào)用了writeAndFlush方法,已經(jīng)將ByteBuf的refCnt置為0了,這個里面調(diào)用的時候又會在設(shè)置一次,但是發(fā)現(xiàn)已經(jīng)為0了,所以就拋出的該異常。
該問題需要在定位。。。。未完待續(xù)
posted on 2014-05-03 14:11
朔望魔刃 閱讀(5463)
評論(0) 編輯 收藏 所屬分類:
netty