1.CumulativeProtocolDecoder A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
即解決'粘包'->即一次接收數(shù)據(jù)不能完全體現(xiàn)一個(gè)完整的消息數(shù)據(jù)->通過(guò)應(yīng)用層數(shù)據(jù)協(xié)議,如協(xié)議中通過(guò)4字節(jié)描述消息大小或以結(jié)束符.
2.CumulativeProtocolDecoder#decode實(shí)現(xiàn)

/** *//**
* 1.緩存decode中的IoBuffer in至session的attribute
* 2.循環(huán)調(diào)用doDecode方法直到其返回false
* 3.解碼結(jié)束后緩存的buffer->壓縮
*/

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
// 判斷傳輸層是否存在消息分片,如果不分片則直接doDecode.(可參考TCP/IP詳解)

if (!session.getTransportMetadata().hasFragmentation())
{

while (in.hasRemaining())
{

if (!doDecode(session, in, out))
{
break;
}
}

return;
}

boolean usingSessionBuffer = true;
IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
// 如果session中有BUFFER這個(gè)attribute則直接執(zhí)行追加,否則直接用網(wǎng)絡(luò)層讀到的buffer

if (buf != null)
{
boolean appended = false;
// Make sure that the buffer is auto-expanded.

if (buf.isAutoExpand())
{

try
{
buf.put(in);
appended = true;

} catch (IllegalStateException e)
{
// 可能調(diào)用了類(lèi)似slice的方法,會(huì)使父緩沖區(qū)的自動(dòng)擴(kuò)展屬性失效(1.可參考AbstractIoBuffer#recapacityAllowed 2.可參考IoBuffer的實(shí)現(xiàn))

} catch (IndexOutOfBoundsException e)
{
// 取消了自動(dòng)擴(kuò)展屬性(可參考IoBuffer實(shí)現(xiàn))
}
}


if (appended)
{
// 追加成功的話(huà),直接flip
buf.flip();

} else
{
// 因?yàn)橛昧伺缮姆椒?父子緩沖區(qū))如slice或取消了自動(dòng)擴(kuò)展而導(dǎo)致追加失敗->重新分配一個(gè)Buffer
buf.flip();
IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
newBuf.order(buf.order());
newBuf.put(buf);
newBuf.put(in);
newBuf.flip();
buf = newBuf;

// 更新session屬性
session.setAttribute(BUFFER, buf);
}

} else
{
// 此else表示session無(wú)BUFFER屬性,直接賦值
buf = in;
usingSessionBuffer = false;
}

// 無(wú)限循環(huán)直到break 1.doDecode返回false 2.doDecode返回true且buf已無(wú)數(shù)據(jù) 3.異常

for (;;)
{
int oldPos = buf.position();
boolean decoded = doDecode(session, buf, out);

if (decoded)
{

if (buf.position() == oldPos)
{
throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
}


if (!buf.hasRemaining())
{
break;
}

} else
{
break;
}
}

// 如果經(jīng)過(guò)decode,buffer依然有剩余數(shù)據(jù)則存儲(chǔ)到session->這樣下次decode的時(shí)候就可以從session取出buffer并執(zhí)行追加了

if (buf.hasRemaining())
{

if (usingSessionBuffer && buf.isAutoExpand())
{
// 壓縮
buf.compact();

} else
{
storeRemainingInSession(buf, session);
}

} else
{

if (usingSessionBuffer)
{
removeSessionBuffer(session);
}
}
}
注.
1.doDecode在消息非完整的時(shí)候返回false.
2.如果doDecode成功解碼出一條完整消息則返回true->如果此時(shí)buffer中依然有剩余數(shù)據(jù)則繼續(xù)執(zhí)行for->doDecode->直到buffer中的數(shù)據(jù)不足以解碼出一條成功消息返回false.或者恰恰有n條完整的消息->從for跳出.
3.CumulativeProtocolDecoder example

/** *//**
* 解碼以CRLF(回車(chē)換行)作為結(jié)束符的消息
*/
public class CrLfTerminatedCommandLineDecoder

extends CumulativeProtocolDecoder
{

private Command parseCommand(IoBuffer in)
{
// 實(shí)現(xiàn)將二進(jìn)制byte[]轉(zhuǎn)為業(yè)務(wù)邏輯消息對(duì)象Command
}

// 只需實(shí)現(xiàn)doDecode方法即可
protected boolean doDecode(
IoSession session, IoBuffer in, ProtocolDecoderOutput out)

throws Exception
{

// 初始位置
int start = in.position();
// 查找'\r\n'標(biāo)記
byte previous = 0;

while (in.hasRemaining())
{
byte current = in.get();
// 找到了\r\n

if (previous == '\r' && current == '\n')
{
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();

try
{
in.position(start);
in.limit(position);//設(shè)置當(dāng)前的位置為limit

// position和limit之間是一個(gè)完整的CRLF消息
out.write(parseCommand(in.slice()));//調(diào)用slice方法獲得positon和limit之間的子緩沖區(qū)->調(diào)用write方法加入消息隊(duì)列(因?yàn)榫W(wǎng)絡(luò)層一個(gè)包可能有多個(gè)完整消息)->后經(jīng)調(diào)用flush(遍歷消息隊(duì)列的消息)->nextFilter.messageReceived
filter

} finally
{
// 設(shè)置position為解碼后的position.limit設(shè)置為舊的limit
in.position(position);
in.limit(limit);
}

// 直接返回true.因?yàn)樵诟割?lèi)的decode方法中doDecode是循環(huán)執(zhí)行的直到不再有完整的消息返回false.
return true;
}
previous = current;
}
// 沒(méi)有找到\r\n,則重置position并返回false.使得父類(lèi)decode->for跳出break.
in.position(start);
return false;
}
}
4.DemuxingProtocolDecoder
1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
2.這是一個(gè)復(fù)合的decoder->多路復(fù)用->找到一個(gè)合適的MessageDecoder.(不同的消息協(xié)議)
3.其doDecode實(shí)現(xiàn)為迭代候選的MessageDecoder列表->調(diào)用MessageDecoder#decodable方法->如果解碼結(jié)果為MessageDecoderResult#NOT_OK,則從候選列表移除;如果解碼結(jié)果為MessageDecoderResult#NEED_DATA,則保留該候選decoder并在更多數(shù)據(jù)到達(dá)的時(shí)候會(huì)再次調(diào)用decodable;如果返回結(jié)果為MessageDecoderResult#OK,則表明找到了正確的decoder;如果沒(méi)有剩下任何的候選decoder,則拋出異常.
4.doDecode源碼

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
// 從Session中獲取一個(gè)State.State包含一個(gè)MessageDecoder數(shù)組以及一個(gè)當(dāng)前的decoder
State state = getState(session);

// 如果當(dāng)前decoder為空

if (state.currentDecoder == null)
{
MessageDecoder[] decoders = state.decoders;
int undecodables = 0;
// 遍歷decoder候選列表

for (int i = decoders.length - 1; i >= 0; i--)
{
MessageDecoder decoder = decoders[i];
int limit = in.limit();
int pos = in.position();

MessageDecoderResult result;


try
{
// 執(zhí)行decodable方法并返回result(decodable方法是檢查特定的buffer是否可以decoder解碼)
result = decoder.decodable(session, in);

} finally
{
// 一定要重置回舊的position和limit
in.position(pos);
in.limit(limit);
}


if (result == MessageDecoder.OK)
{
// 如果返回結(jié)果為OK,則設(shè)置為state的當(dāng)前decoder并break
state.currentDecoder = decoder;
break;

} else if (result == MessageDecoder.NOT_OK)
{
// 如果返回結(jié)果為NOT_OK,則記錄undecodables數(shù)目++
undecodables++;

} else if (result != MessageDecoder.NEED_DATA)
{
// 如果結(jié)果都不是,即也不是NEED_DATA,則直接拋出異常
throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
}
}

// 如果沒(méi)有找到合適的decoder,則拋出異常

if (undecodables == decoders.length)
{
// Throw an exception if all decoders cannot decode data.
String dump = in.getHexDump();
in.position(in.limit()); // 跳過(guò)這段數(shù)據(jù)
ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
e.setHexdump(dump);
throw e;
}
// 迭代結(jié)束,如果還沒(méi)有找到合適的decoder則表示可能需要更多的數(shù)據(jù)->所以返回false->跳出父類(lèi)的for-dodecode循環(huán)

if (state.currentDecoder == null)
{
// Decoder is not determined yet (i.e. we need more data)
return false;
}
}

// 這里表示已找到合適的decoder,調(diào)用decode方法進(jìn)行解碼二進(jìn)制或者特定的協(xié)議數(shù)據(jù)為更高業(yè)務(wù)層的消息對(duì)象

try
{
MessageDecoderResult result = state.currentDecoder.decode(session, in, out);

if (result == MessageDecoder.OK)
{
// 重置為null
state.currentDecoder = null;
return true;

} else if (result == MessageDecoder.NEED_DATA)
{
return false;

} else if (result == MessageDecoder.NOT_OK)
{
state.currentDecoder = null;
throw new ProtocolDecoderException("Message decoder returned NOT_OK.");

} else
{
state.currentDecoder = null;
throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
}

} catch (Exception e)
{
state.currentDecoder = null;
throw e;
}
}
5.一個(gè)特定消息協(xié)議的編解碼的例子,{@link org.apache.mina.example.sumup}
1.AbstractMessageEncoder

/** *//**
* 1.編碼消息頭,消息體編碼由子類(lèi)實(shí)現(xiàn).
* 2.AbstractMessage中只有一個(gè)sequence字段
*/

public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T>
{
// 類(lèi)型字段
private final int type;


protected AbstractMessageEncoder(int type)
{
this.type = type;
}


public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception
{
IoBuffer buf = IoBuffer.allocate(16);
buf.setAutoExpand(true); // Enable auto-expand for easier encoding

// 編碼消息頭
buf.putShort((short) type);//type字段占2個(gè)字節(jié)(short)
buf.putInt(message.getSequence());// sequence字段占4個(gè)字節(jié)(int)

// 編碼消息體,由子類(lèi)實(shí)現(xiàn)
encodeBody(session, message, buf);
buf.flip();
out.write(buf);
}

// 子類(lèi)實(shí)現(xiàn)編碼消息體
protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
}
2.AbstractMessageDecoder

/** *//**
* 解碼消息頭,消息體由子類(lèi)實(shí)現(xiàn)解碼
*/

public abstract class AbstractMessageDecoder implements MessageDecoder
{
private final int type;

private int sequence;

private boolean readHeader;


protected AbstractMessageDecoder(int type)
{
this.type = type;
}

// 需覆寫(xiě)decodable方法,檢查解碼結(jié)果

public MessageDecoderResult decodable(IoSession session, IoBuffer in)
{
// HEADER_LEN為type+sequence的長(zhǎng)度,共占6個(gè)字節(jié).如果此時(shí)buffer剩余數(shù)據(jù)不足header的長(zhǎng)度,則返回NEED_DATA的result.

if (in.remaining() < Constants.HEADER_LEN)
{
return MessageDecoderResult.NEED_DATA;
}

// 第一個(gè)if判斷ok->讀取2字節(jié)(short),如果和type匹配則返回OK.

if (type == in.getShort())
{
return MessageDecoderResult.OK;
}

// 兩個(gè)if判斷都不ok,則返回NOT_OK
return MessageDecoderResult.NOT_OK;
}
// 終極解碼
public MessageDecoderResult decode(IoSession session, IoBuffer in,

ProtocolDecoderOutput out) throws Exception
{
// 如果header數(shù)據(jù)已ok且消息體數(shù)據(jù)不足則下次直接略過(guò)

if (!readHeader)
{
in.getShort(); // Skip 'type'.
sequence = in.getInt(); // Get 'sequence'.
readHeader = true;
}

// 解碼消息體,如果數(shù)據(jù)不足以解析消息體,則返回null
AbstractMessage m = decodeBody(session, in);
// 消息數(shù)據(jù)體數(shù)據(jù)不足->返回NEED_DATA

if (m == null)
{
return MessageDecoderResult.NEED_DATA;

} else
{
readHeader = false; // 成功解碼出一條完成消息,則重置readHeader->下次繼續(xù)讀取header
}
m.setSequence(sequence);
out.write(m);

return MessageDecoderResult.OK;
}


/** *//**
* 數(shù)據(jù)完整不足以解析整個(gè)消息體則返回null
*/
protected abstract AbstractMessage decodeBody(IoSession session,
IoBuffer in);
}
3.AddMessageEncoder

/** *//**
* 1.AddMessage的encoder.AddMessage繼承自AbstractMessage,又增加了一個(gè)字段value
* 2.該encoder的type為Constants.ADD,值為1
*/

public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T>
{

public AddMessageEncoder()
{
super(Constants.ADD);
}

@Override

protected void encodeBody(IoSession session, T message, IoBuffer out)
{ // 實(shí)現(xiàn)了編碼消息體,向buffer追加了AddMessage的消息體value(4個(gè)字節(jié)-int)
out.putInt(message.getValue());
}


public void dispose() throws Exception
{
}
}
4.AddMessageDecoder

/** *//**
* AddMessage的decoder.type為Constants.ADD(1)
*/

public class AddMessageDecoder extends AbstractMessageDecoder
{


public AddMessageDecoder()
{
super(Constants.ADD);
}

@Override

protected AbstractMessage decodeBody(IoSession session, IoBuffer in)
{ // ADD_BODY_LEN為AddMessage的消息體長(zhǎng)度(value屬性),即為4字節(jié)(int),如果此時(shí)不足4字節(jié),則返回null,表示body數(shù)據(jù)不足

if (in.remaining() < Constants.ADD_BODY_LEN)
{
return null;
}

AddMessage m = new AddMessage();
m.setValue(in.getInt());// 讀取一個(gè)int
return m;
}

public void finishDecode(IoSession session, ProtocolDecoderOutput out)

throws Exception
{
}
}
6.總結(jié):使用CumulativeProtocolDecoder可以方便的進(jìn)行特定消息協(xié)議的消息解碼并完美的解決了'粘包'問(wèn)題.另外DemuxingProtocolDecoder結(jié)合MessageDecoder可更完美實(shí)現(xiàn)解碼方案.
posted on 2013-12-02 18:55
landon 閱讀(3379)
評(píng)論(2) 編輯 收藏 所屬分類(lèi):
Sources