<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0
     1.CumulativeProtocolDecoder
          A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
         即解決'
    粘包'->即一次接收數(shù)據(jù)不能完全體現(xiàn)一個(gè)完整的消息數(shù)據(jù)->通過(guò)應(yīng)用層數(shù)據(jù)協(xié)議,如協(xié)議中通過(guò)4字節(jié)描述消息大小或以結(jié)束符.

    2.CumulativeProtocolDecoder#decode實(shí)現(xiàn)
    /**
        * 1.緩存decode中的IoBuffer in至session的attribute
        * 2.循環(huán)調(diào)用doDecode方法直到其返回false
        * 3.解碼結(jié)束后緩存的buffer->壓縮
       
    */

       
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
          
    // 判斷傳輸層是否存在消息分片,如果不分片則直接doDecode.(可參考TCP/IP詳解)
            if (!session.getTransportMetadata().hasFragmentation()) {
               
    while (in.hasRemaining()) {
                   
    if (!doDecode(session, in, out)) {
                       
    break;
                    }

                }


               
    return;
            }


           
    boolean usingSessionBuffer = true;
            IoBuffer buf
    = (IoBuffer) session.getAttribute(BUFFER);
           
    // 如果session中有BUFFER這個(gè)attribute則直接執(zhí)行追加,否則直接用網(wǎng)絡(luò)層讀到的buffer
            if (buf != null) {
               
    boolean appended = false;
               
    // Make sure that the buffer is auto-expanded.
                if (buf.isAutoExpand()) {
                   
    try {
                        buf.put(in);
                        appended
    = true;
                    }
    catch (IllegalStateException e) {
                       
    // 可能調(diào)用了類(lèi)似slice的方法,會(huì)使父緩沖區(qū)的自動(dòng)擴(kuò)展屬性失效(1.可參考AbstractIoBuffer#recapacityAllowed 2.可參考IoBuffer的實(shí)現(xiàn))
                    }
    catch (IndexOutOfBoundsException e) {
                       
    // 取消了自動(dòng)擴(kuò)展屬性(可參考IoBuffer實(shí)現(xiàn))
                    }

                }


               
    if (appended) {
       
    // 追加成功的話(huà),直接flip
                    buf.flip();
                }
    else {
        
    // 因?yàn)橛昧伺缮姆椒?父子緩沖區(qū))如slice或取消了自動(dòng)擴(kuò)展而導(dǎo)致追加失敗->重新分配一個(gè)Buffer
                    buf.flip();
                    IoBuffer newBuf
    = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                    newBuf.order(buf.order());
                    newBuf.put(buf);
                    newBuf.put(in);
                    newBuf.flip();
                    buf
    = newBuf;

                   
    // 更新session屬性
                    session.setAttribute(BUFFER, buf);
                }

            }
    else {
       
    // 此else表示session無(wú)BUFFER屬性,直接賦值
                buf = in;
                usingSessionBuffer
    = false;
            }


           
    // 無(wú)限循環(huán)直到break 1.doDecode返回false 2.doDecode返回true且buf已無(wú)數(shù)據(jù) 3.異常
            for (;;) {
               
    int oldPos = buf.position();
               
    boolean decoded = doDecode(session, buf, out);
               
    if (decoded) {
                   
    if (buf.position() == oldPos) {
                       
    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                    }


                   
    if (!buf.hasRemaining()) {
                       
    break;
                    }

                }
    else {
                   
    break;
                }

            }


           
    // 如果經(jīng)過(guò)decode,buffer依然有剩余數(shù)據(jù)則存儲(chǔ)到session->這樣下次decode的時(shí)候就可以從session取出buffer并執(zhí)行追加了
            if (buf.hasRemaining()) {
               
    if (usingSessionBuffer && buf.isAutoExpand()) {
          
    // 壓縮
                    buf.compact();
                }
    else {
                    storeRemainingInSession(buf, session);
                }

            }
    else {
               
    if (usingSessionBuffer) {
                    removeSessionBuffer(session);
                }

            }

        }

                注.
                        1.doDecode在消息非完整的時(shí)候返回false. 
                        2.如果doDecode成功解碼出一條完整消息則返回true->如果此時(shí)buffer中依然有剩余數(shù)據(jù)則繼續(xù)執(zhí)行for->doDecode->直到buffer中的數(shù)據(jù)不足以解碼出一條成功消息返回false.或者恰恰有n條完整的消息->從for跳出.

    3.CumulativeProtocolDecoder example
        /**
          * 解碼以CRLF(回車(chē)換行)作為結(jié)束符的消息
                 
    */

       
    public class CrLfTerminatedCommandLineDecoder
             
    extends CumulativeProtocolDecoder {

        
    private Command parseCommand(IoBuffer in) {
       
    // 實(shí)現(xiàn)將二進(jìn)制byte[]轉(zhuǎn)為業(yè)務(wù)邏輯消息對(duì)象Command
          }


      
    // 只需實(shí)現(xiàn)doDecode方法即可
        protected boolean doDecode(
                 IoSession session, IoBuffer in, ProtocolDecoderOutput out)
                 
    throws Exception {

           
    // 初始位置
              int start = in.position();

             
    // 查找'\r\n'標(biāo)記
              byte previous = 0;
            
    while (in.hasRemaining()) {
                
    byte current = in.get();

                 
    // 找到了\r\n
                  if (previous == '\r' && current == '\n') {
                    
    // Remember the current position and limit.
                      int position = in.position();
                   
    int limit = in.limit();
                   
    try {
                          in.position(start);
                        in.limit(position);
    //設(shè)置當(dāng)前的位置為limit

              
    // position和limit之間是一個(gè)完整的CRLF消息
                        out.write(parseCommand(in.slice()));//調(diào)用slice方法獲得positon和limit之間的子緩沖區(qū)->調(diào)用write方法加入消息隊(duì)列(因?yàn)榫W(wǎng)絡(luò)層一個(gè)包可能有多個(gè)完整消息)->后經(jīng)調(diào)用flush(遍歷消息隊(duì)列的消息)->nextFilter.messageReceived
    filter
                      }
    finally {
                        
    // 設(shè)置position為解碼后的position.limit設(shè)置為舊的limit
                         in.position(position);
                        in.limit(limit);
                      }


       
    // 直接返回true.因?yàn)樵诟割?lèi)的decode方法中doDecode是循環(huán)執(zhí)行的直到不再有完整的消息返回false.
                   return true;
               }


                previous
    = current;
             }


             
    // 沒(méi)有找到\r\n,則重置position并返回false.使得父類(lèi)decode->for跳出break.
              in.position(start);

             
    return false;
          }

      }

     4.
    DemuxingProtocolDecoder
        
     1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
         2.這是一個(gè)復(fù)合的decoder->多路復(fù)用->找到一個(gè)合適的MessageDecoder.(不同的消息協(xié)議)

         3.其doDecode實(shí)現(xiàn)為迭代候選的MessageDecoder列表->調(diào)用MessageDecoder#decodable方法->如果解碼結(jié)果為MessageDecoderResult#NOT_OK,則從候選列表移除;如果解碼結(jié)果為MessageDecoderResult#NEED_DATA,則保留該候選decoder并在更多數(shù)據(jù)到達(dá)的時(shí)候會(huì)再次調(diào)用decodable;如果返回結(jié)果為MessageDecoderResult#OK,則表明找到了正確的decoder;如果沒(méi)有剩下任何的候選decoder,則拋出異常.

        4.doDecode源碼
          protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
                 
    // 從Session中獲取一個(gè)State.State包含一個(gè)MessageDecoder數(shù)組以及一個(gè)當(dāng)前的decoder
                State state = getState(session);

              
    // 如果當(dāng)前decoder為空
                if (state.currentDecoder == null) {
                    MessageDecoder[] decoders
    = state.decoders;
                   
    int undecodables = 0;
           
           
    // 遍歷decoder候選列表
                    for (int i = decoders.length - 1; i >= 0; i--) {
                        MessageDecoder decoder
    = decoders[i];
                       
    int limit = in.limit();
                       
    int pos = in.position();

                        MessageDecoderResult result;

                       
    try {
                    
    // 執(zhí)行decodable方法并返回result(decodable方法是檢查特定的buffer是否可以decoder解碼)
                            result = decoder.decodable(session, in);
                        }
    finally {
                 
    // 一定要重置回舊的position和limit
                            in.position(pos);
                            in.limit(limit);
                        }


                       
    if (result == MessageDecoder.OK) {
                 
    // 如果返回結(jié)果為OK,則設(shè)置為state的當(dāng)前decoder并break
                            state.currentDecoder = decoder;
                           
    break;
                        }
    else if (result == MessageDecoder.NOT_OK) {
                 
    // 如果返回結(jié)果為NOT_OK,則記錄undecodables數(shù)目++
                            undecodables++;
                        }
    else if (result != MessageDecoder.NEED_DATA) {
                  
    // 如果結(jié)果都不是,即也不是NEED_DATA,則直接拋出異常
                            throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
                        }

                    }


           
    // 如果沒(méi)有找到合適的decoder,則拋出異常
                    if (undecodables == decoders.length) {
                       
    // Throw an exception if all decoders cannot decode data.
                        String dump = in.getHexDump();
                        in.position(in.limit());
    // 跳過(guò)這段數(shù)據(jù)
                        ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
                        e.setHexdump(dump);
                       
    throw e;
                    }

           
           
    // 迭代結(jié)束,如果還沒(méi)有找到合適的decoder則表示可能需要更多的數(shù)據(jù)->所以返回false->跳出父類(lèi)的for-dodecode循環(huán)
                    if (state.currentDecoder == null) {
                       
    // Decoder is not determined yet (i.e. we need more data)
                        return false;
                    }

                }


              
    // 這里表示已找到合適的decoder,調(diào)用decode方法進(jìn)行解碼二進(jìn)制或者特定的協(xié)議數(shù)據(jù)為更高業(yè)務(wù)層的消息對(duì)象
                try {
                    MessageDecoderResult result
    = state.currentDecoder.decode(session, in, out);
                   
    if (result == MessageDecoder.OK) {
              
    // 重置為null
                        state.currentDecoder = null;
                       
    return true;
                    }
    else if (result == MessageDecoder.NEED_DATA) {
                       
    return false;
                    }
    else if (result == MessageDecoder.NOT_OK) {
                        state.currentDecoder
    = null;
                       
    throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
                    }
    else {
                        state.currentDecoder
    = null;
                       
    throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
                    }

                }
    catch (Exception e) {
                    state.currentDecoder
    = null;
                   
    throw e;
                }

            }

    5.一個(gè)特定消息協(xié)議的編解碼的例子,{@link org.apache.mina.example.sumup}
        1.AbstractMessageEncoder
        /**
         * 1.編碼消息頭,消息體編碼由子類(lèi)實(shí)現(xiàn).
         * 2.AbstractMessage中只有一個(gè)sequence字段
        
    */

       
    public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T> {
          
    // 類(lèi)型字段
            private final int type;

           
    protected AbstractMessageEncoder(int type) {
               
    this.type = type;
            }


           
    public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception {
                IoBuffer buf
    = IoBuffer.allocate(16);
                buf.setAutoExpand(
    true); // Enable auto-expand for easier encoding

               
    // 編碼消息頭
                buf.putShort((short) type);//type字段占2個(gè)字節(jié)(short)
                buf.putInt(message.getSequence());// sequence字段占4個(gè)字節(jié)(int)

               
    // 編碼消息體,由子類(lèi)實(shí)現(xiàn)
                encodeBody(session, message, buf);
                buf.flip();
                out.write(buf);
            }


           
    // 子類(lèi)實(shí)現(xiàn)編碼消息體
            protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
        }

        2.AbstractMessageDecoder
        /**
            * 解碼消息頭,消息體由子類(lèi)實(shí)現(xiàn)解碼
           
    */

           
    public abstract class AbstractMessageDecoder implements MessageDecoder {
           
    private final int type;

           
    private int sequence;

           
    private boolean readHeader;

           
    protected AbstractMessageDecoder(int type) {
               
    this.type = type;
            }


           
    // 需覆寫(xiě)decodable方法,檢查解碼結(jié)果
            public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
               
    // HEADER_LEN為type+sequence的長(zhǎng)度,共占6個(gè)字節(jié).如果此時(shí)buffer剩余數(shù)據(jù)不足header的長(zhǎng)度,則返回NEED_DATA的result.
                if (in.remaining() < Constants.HEADER_LEN) {
                   
    return MessageDecoderResult.NEED_DATA;
                }


               
    // 第一個(gè)if判斷ok->讀取2字節(jié)(short),如果和type匹配則返回OK.
                if (type == in.getShort()) {
                   
    return MessageDecoderResult.OK;
                }


               
    // 兩個(gè)if判斷都不ok,則返回NOT_OK
                return MessageDecoderResult.NOT_OK;
            }

           
                    
    // 終極解碼
            public MessageDecoderResult decode(IoSession session, IoBuffer in,
                    ProtocolDecoderOutput out)
    throws Exception {
               
    // 如果header數(shù)據(jù)已ok且消息體數(shù)據(jù)不足則下次直接略過(guò)
                if (!readHeader) {
                    in.getShort();
    // Skip 'type'.
                    sequence = in.getInt(); // Get 'sequence'.
                    readHeader = true;
                }


               
    // 解碼消息體,如果數(shù)據(jù)不足以解析消息體,則返回null
                AbstractMessage m = decodeBody(session, in);
               
    // 消息數(shù)據(jù)體數(shù)據(jù)不足->返回NEED_DATA
                if (m == null) {
                   
    return MessageDecoderResult.NEED_DATA;
                }
    else {
                    readHeader
    = false; // 成功解碼出一條完成消息,則重置readHeader->下次繼續(xù)讀取header
                }

                m.setSequence(sequence);
                out.write(m);

               
    return MessageDecoderResult.OK;
            }


           
    /**
             * 數(shù)據(jù)完整不足以解析整個(gè)消息體則返回null
            
    */

           
    protected abstract AbstractMessage decodeBody(IoSession session,
                    IoBuffer in);
        }

        3.AddMessageEncoder
        /**
                               * 1.AddMessage的encoder.AddMessage繼承自AbstractMessage,又增加了一個(gè)字段value
                               * 2.該encoder的type為Constants.ADD,值為1
            
    */

           
    public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T> {
           
    public AddMessageEncoder() {
               
    super(Constants.ADD);
            }


            @Override
           
    protected void encodeBody(IoSession session, T message, IoBuffer out) {                 // 實(shí)現(xiàn)了編碼消息體,向buffer追加了AddMessage的消息體value(4個(gè)字節(jié)-int)
                out.putInt(message.getValue());
            }


           
    public void dispose() throws Exception {
            }

        }


           4.AddMessageDecoder
        /**
            *  AddMessage的decoder.type為Constants.ADD(1)
           
    */

           
    public class AddMessageDecoder extends AbstractMessageDecoder {

           
    public AddMessageDecoder() {
               
    super(Constants.ADD);
            }


            @Override
           
    protected AbstractMessage decodeBody(IoSession session, IoBuffer in) {                  // ADD_BODY_LEN為AddMessage的消息體長(zhǎng)度(value屬性),即為4字節(jié)(int),如果此時(shí)不足4字節(jié),則返回null,表示body數(shù)據(jù)不足
                if (in.remaining() < Constants.ADD_BODY_LEN) {
                   
    return null;
                }


                AddMessage m
    = new AddMessage();
                m.setValue(in.getInt());
    // 讀取一個(gè)int
                return m;
            }


           
    public void finishDecode(IoSession session, ProtocolDecoderOutput out)
                   
    throws Exception {
            }

        }

    6.總結(jié):使用CumulativeProtocolDecoder可以方便的進(jìn)行特定消息協(xié)議的消息解碼并完美的解決了'粘包'問(wèn)題.另外DemuxingProtocolDecoder結(jié)合MessageDecoder可更完美實(shí)現(xiàn)解碼方案.
    posted on 2013-12-02 18:55 landon 閱讀(3379) 評(píng)論(2)  編輯  收藏 所屬分類(lèi): Sources

    FeedBack:
    # re: apache-mina-2.07源碼筆記4-codec
    2013-12-03 09:38 | 鵬達(dá)鎖業(yè)
    謝謝博主分享。。。。。。。。。。。  回復(fù)  更多評(píng)論
      
    # re: apache-mina-2.07源碼筆記4-codec
    2013-12-05 17:26 | 左岸
    好東西啊,謝謝分享  回復(fù)  更多評(píng)論
      
    主站蜘蛛池模板: 久久国产精品免费一区二区三区| 国产亚洲大尺度无码无码专线 | 好爽又高潮了毛片免费下载 | 免费人妻精品一区二区三区| 日本免费网站观看| 亚洲久热无码av中文字幕| 免费无码A片一区二三区 | 亚洲AV无码乱码在线观看富二代 | 亚洲精品乱码久久久久久久久久久久 | 精品久久久久成人码免费动漫| 亚洲校园春色小说| 我们的2018在线观看免费高清| 亚洲成年人免费网站| 中文毛片无遮挡高潮免费| 狠狠色香婷婷久久亚洲精品| 成人免费午夜在线观看| 亚洲国产精品无码第一区二区三区 | 爱情岛论坛免费视频| 亚洲国产精品自在拍在线播放 | 亚洲色欲久久久综合网| 成人网站免费看黄A站视频| 亚洲国产a∨无码中文777| 久久免费福利视频| 亚洲精品欧洲精品| 成人片黄网站A毛片免费| 亚洲国产成人综合精品| 日韩精品亚洲专区在线观看| 人妻仑乱A级毛片免费看| 国产AV无码专区亚洲AV漫画| 很黄很污的网站免费| 亚洲男女性高爱潮网站| 成年轻人网站色免费看 | 日韩免费高清大片在线| 亚洲福利电影一区二区?| 成年人在线免费观看| 国产在亚洲线视频观看| 国产亚洲精品影视在线产品| 久久成人免费电影| 亚洲香蕉久久一区二区| 国产免费看插插插视频| 国产免费一区二区三区免费视频 |