<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    聶永的博客

    記錄工作/學習的點點滴滴。

    MQTT協議筆記之頭部信息

    前言

    MQTT(Message Queue Telemetry Transport),遙測傳輸協議,提供訂閱/發布模式,更為簡約、輕量,易于使用,針對受限環境(帶寬低、網絡延遲高、網絡通信不穩定),可以簡單概括為物聯網打造,官方總結特點如下:

    1.使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。
    2. 對負載內容屏蔽的消息傳輸。
    3. 使用 TCP/IP 提供網絡連接。
    4. 有三種消息發布服務質量:
        “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用于如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
        “至少一次”,確保消息到達,但消息重復可能會發生。
        “只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。
    5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量。
    6. 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
    

    MQTT 3.1協議在線版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

    官方下載地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

    PDF版本,42頁,不算多。

    另外,目前MQTT大家都用在了手機推送,可能還有很多的使用方式,有待進一步的探索。

    協議方面,以前曾簡單實現過一點HTTP協議,基于HTTP上構建若干種通信管道的socket.io協議,不過socket.io 0.9版本的協議才兩三頁而已。面對領域不同,自然解決的方式也不一樣。

    閱讀完畢MQTT協議,有一個想法,其實可以基于MQTT協議,打造更加私有、精簡(協議一些地方,略顯多余)的傳輸協議,比如一個字節的傳輸開銷。有時間,會詳細說一下。

    固定頭部

    固定頭部,使用兩個字節,共16位:

    bit 7 6 5 4 3 2 1 0
    byte 1 Message Type DUP flag QoS level RETAIN
    byte 2 Remaining Length

    第一個字節(byte 1)

    消息類型(4-7),使用4位二進制表示,可代表16種消息類型:

    Mnemonic Enumeration Description
    Reserved 0 Reserved
    CONNECT 1 Client request to connect to Server
    CONNACK 2 Connect Acknowledgment
    PUBLISH 3 Publish message
    PUBACK 4 Publish Acknowledgment
    PUBREC 5 Publish Received (assured delivery part 1)
    PUBREL 6 Publish Release (assured delivery part 2)
    PUBCOMP 7 Publish Complete (assured delivery part 3)
    SUBSCRIBE 8 Client Subscribe request
    SUBACK 9 Subscribe Acknowledgment
    UNSUBSCRIBE 10 Client Unsubscribe request
    UNSUBACK 11 Unsubscribe Acknowledgment
    PINGREQ 12 PING Request
    PINGRESP 13 PING Response
    DISCONNECT 14 Client is Disconnecting
    Reserved 15 Reserved

    除去0和15位置屬于保留待用,共14種消息事件類型。

    DUP flag(打開標志)

    保證消息可靠傳輸,默認為0,只占用一個字節,表示第一次發送。不能用于檢測消息重復發送等。只適用于客戶端或服務器端嘗試重發PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要滿足以下條件:

     當QoS > 0
     消息需要回復確認
    

    此時,在可變頭部需要包含消息ID。當值為1時,表示當前消息先前已經被傳送過。

    QoS(Quality of Service,服務質量)

    使用兩個二進制表示PUBLISH類型消息:

    QoS value bit 2 bit 1 Description
    0 0 0 至多一次 發完即丟棄 <=1
    1 0 1 至少一次 需要確認回復 >=1
    2 1 0 只有一次 需要確認回復 =1
    3 1 1 待用,保留位置

    RETAIN(保持)

    僅針對PUBLISH消息。不同值,不同含義:

    1:表示發送的消息需要一直持久保存(不受服務器重啟影響),不但要發送給當前的訂閱者,并且以后新來的訂閱了此Topic name的訂閱者會馬上得到推送。

    備注:新來乍到的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。

    0:僅僅為當前訂閱者推送此消息。

    假如服務器收到一個空消息體(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服務器可以刪除掉對應的已被持久化的PUBLISH消息。

    如何解析

    因為java使用有符號(最高位為符號位)數據表示,byte范圍:-128-127。該字節的最高位(左邊第一位),可能為1。若直接轉換為byte類型,會出現負數,這是一個雷區。DataInputStream提供了int readUnsignedByte()讀取方式,請注意。下面演示了,如何從一個字節中,獲取到所有定義的信息,同時繞過雷區:

    public static void main(String[] args) {
        byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0
    
        doGetBit(publishFixHeader);
        int ori = 224;//1110000,DISCONNECT ,Message Type (14)
        byte flag = (byte) ori; //有符號byte       
        doGetBit(flag);
        doGetBit_v2(ori);
    }
    
    
    public static void doGetBit(byte flags) {
        boolean retain = (flags & 1) > 0;
        int qosLevel = (flags & 0x06) >> 1;
        boolean dupFlag = (flags & 8) > 0;
        int messageType = (flags >> 4) & 0x0f;
    
        System.out.format(
                "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
                messageType, dupFlag, qosLevel, retain);
    }
    
    public static void doGetBit_v2(int flags) {
        boolean retain = (flags & 1) > 0;
        int qosLevel = (flags & 0x06) >> 1;
        boolean dupFlag = (flags & 8) > 0;
        int messageType = flags >> 4;
    
        System.out.format(
                "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
                messageType, dupFlag, qosLevel, retain);
    }
    

    處理Remaining Length(剩余長度)

    在當前消息中剩余的byte(字節)數,包含可變頭部和負荷(稱之為內容/body,更為合適)。單個字節最大值:01111111,16進制:0x7F,10進制為127。單個字節為什么不能是11111111(0xFF)呢?因為MQTT協議規定,第八位(最高位)若為1,則表示還有后續字節存在。同時MQTT協議最多允許4個字節表示剩余長度。那么最大長度為:0xFF,0xFF,0xFF,0x7F,二進制表示為:11111111,11111111,11111111,01111111,十進制:268435455 byte=261120KB=256MB=0.25GB 四個字節之間值的范圍:

    Digits From To
    1 0 (0x00) 127 (0x7F)
    2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
    3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
    4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

    如何換算成十進制呢 ? 使用java語言表示如下:

    public static void main(String[] args) throws IOException {
        // 模擬客戶端寫入
       ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
       dataOutputStream.write(0xff);
       dataOutputStream.write(0xff);
       dataOutputStream.write(0xff);
       dataOutputStream.write(0x7f);
    
       InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
    
        // 模擬服務器/客戶端解析
       System. out.println( "result is " + bytes2Length(arrayInputStream));
    }
    
    /**
    * 轉化字節為 int類型長度
    * @param in
    * @return
    * @throws IOException
    */
    private static int bytes2Length(InputStream in) throws IOException {
        int multiplier = 1;
        int length = 0;
        int digit = 0;
        do {
            digit = in.read(); //一個字節的有符號或者無符號,轉換轉換為四個字節有符號 int類型
            length += (digit & 0x7f) * multiplier;
            multiplier *= 128;
       } while ((digit & 0x80) != 0);
    
        return length;
    }
    

    一般最后一個字節小于127(01111111),和0x80(10000000)進行&操作,最終結果都為0,因此計算會終止。代理中間件和請求者,中間傳遞的是字節流Stream,自然要從流中讀取,逐一解析出來。

    那么如何將int類型長度解析為不確定的字節值呢?

    public static void main(String[] args) throws IOException {
        // 模擬服務器/客戶端寫入
       ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(
                 arrayOutputStream);
    
        // 模擬服務器/客戶端解析
        length2Bytes(dataOutputStream, 128);
    }
    
    /**
    * int類型長度解析為1-4個字節
    * @param out
    * @param length
    * @throws IOException
    */
    private static void length2Bytes(OutputStream out, int length)
             throws IOException {
        int val = length;
        do {
             int digit = val % 128;
            val = val / 128;
             if (val > 0)
                 digit = digit | 0x80;
    
            out.write(digit);
       } while (val > 0);
    }
    

    digit對val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 請注意:剩余長度,只在固定頭部中,無論是一個字節,還是四個字節,不能被算作可變頭部中。

    可變頭部

    固定頭部僅定義了消息類型和一些標志位,一些消息的元數據,需要放入可變頭部中。可變頭部內容字節長度 + Playload/負荷字節長度 = 剩余長度,這個是需要牢記的。可變頭部,包含了協議名稱,版本號,連接標志,用戶授權,心跳時間等內容,這部分和后面要講到的CONNECT消息類型,有重復,暫時略過。

    Playload/消息體/負荷

    消息體主要是為配合固定/可變頭部命令(比如CONNECT可變頭部User name標記若為1則需要在消息體中附加用戶名稱字符串)而存在。

    CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息體。PUBLISH的消息體以二進制形式對待。

    請記住,MQTT協議只允許在PUBLISH類型消息體中使用自定義特性,在固定/可變頭部想加入自定義私有特性,就免了吧。這也是為了協議免于流于形式,變得很分裂也為了兼顧現有客戶端等。比如支持壓縮等,那就可以在Playload中定義數據支持,在應用中進行讀取處理。

    這部分會在后面詳細論述。

    消息標識符/消息ID

    固定頭中的QoS level標志值為1或2時才會在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可變頭中出現。

    一個16位無符號位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個特定方向(服務器發往客戶端為一個方向,客戶端發送到服務器端為另一個方向)的通信消息中必須唯一。比如客戶端發往服務器,有可能存在服務器發往客戶端會同時存在重復,但不礙事。

    可變頭部中,需要兩個字節的順序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻譯成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左邊/上面,表示這是一個大端字節/網絡字節序,符合人的閱讀習慣,高位在最左邊。

    bit 7 6 5 4 3 2 1 0
      Message Identifier MSB
      Message Identifier LSB

    但凡如此表示的,都可以視為一個16位無符號short類型整數,兩個字節表示。在JAVA中處理比較簡單:

    DataInputStream.readUnsignedShort
    

    或者

    in.read() * 0xFF + in.read();
    

    最大長度可為: 65535

    UTF-8編碼

    有關字符串,MQTT采用的是修改版的UTF-8編碼,一般形式為如下,需要牢記:

    bit 7 6 5 4 3 2 1 0
    byte 1 String Length MSB
    byte 2 String Length LSB
    bytes 3 ... Encoded Character Data

    比如AVA,使用writeUTF()方法寫入一串文字“OTWP”,頭兩個字節為一個完整的無符號數字,代表字符串字節長度,后面四個字節才是字符串真正的長度,共六個字節:

    bit 7 6 5 4 3 2 1 0
    byte 1 Message Length MSB (0x00)
      0 0 0 0 0 0 0 0
    byte 2 Message Length LSB (0x04)
      0 0 0 0 0 1 0 0
    byte 3 'O' (0x4F)
      0 1 0 0 1 1 1 1
    byte 4 'T' (0x54)
      0 1 0 1 0 1 0 0
    byte 5 'W' (0x57)
      0 1 0 1 0 1 1 1
    byte 6 'P' (0x50)
      0 1 0 1 0 0 0 0

    這點,在程序中,可不用單獨處理默認,直接使用readUTF()方法,可自動省去了處理字符串長度的麻煩。當然,可以手動讀取字符串:

    // 模擬寫入
    dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
    ......
    // 模擬讀取 
    int decodedLength = dataInputStream.readUnsignedShort();//2 byte
    byte[] decodedString = new byte[decodedLength]; // 4 bytes
    dataInputStream.read(decodedString);
    String target = new String(decodedString, "UTF-8");
    

    等同于:

    String target = dataInputStream.readUTF();
    

    MQTT無論是可變頭部還是消息體中,只要是字符串部分,都是采用了修改版的UTF-8編碼,讀取和寫入,借助DataInputStream/DataOutputStream的幫助,一行語句,略去了手動處理的麻煩。

    小結

    總之,掌握固定頭部的QoS level、RETAIN標記、可變頭部的Connect flags作用和意義,對總體理解MQTT作用很大。

    posted on 2014-02-07 17:35 nieyong 閱讀(39593) 評論(2)  編輯  收藏 所屬分類: MQTT

    評論

    # re: MQTT協議筆記之頭部信息 2014-11-17 20:59 金敏通

    可以的  回復  更多評論   

    # re: MQTT協議筆記之頭部信息 2014-12-26 15:37 光輝

    【一個16位無符號位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個特定方向(服務器發往客戶端為一個方向,客戶端發送到服務器端為另一個方向)的通信消息中必須唯一。比如客戶端發往服務器,有可能存在服務器發往客戶端會同時存在重復,但不礙事。】
    ===========================
    剛才看了一下Paho.MQTT.Client(WebSocket)中發送messae時的messageIdentifier的實現,這個id當qos大于0時,
    并不是唯一的,它只是確保在已發送但未收到PUBACK 的in flight message中是唯一的,例如,當發送了
    消息A時使用id(3),并且收到A的PUBACK了,那么下次發送的message的id仍然是3。另外,當Id到一個最大
    值時,這個Id會被reset
    /* The largest message identifier allowed, may not be larger than 2**16 but
    * if set smaller reduces the maximum number of outbound messages allowed.
    */
    ClientImpl.prototype.maxMessageIdentifier = 65536;

    if (this._message_identifier === this.maxMessageIdentifier) {
    this._message_identifier = 1;
    }  回復  更多評論   

    公告

    所有文章皆為原創,若轉載請標明出處,謝謝~

    新浪微博,歡迎關注:

    導航

    <2014年2月>
    2627282930311
    2345678
    9101112131415
    16171819202122
    2324252627281
    2345678

    統計

    常用鏈接

    留言簿(58)

    隨筆分類(130)

    隨筆檔案(151)

    個人收藏

    最新隨筆

    搜索

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 大学生a级毛片免费观看 | jlzzjlzz亚洲jzjzjz| 亚洲国产成人精品女人久久久 | 亚洲午夜成激人情在线影院 | 两个人看的www免费视频| 色欲aⅴ亚洲情无码AV| ww亚洲ww在线观看国产| 久久久久亚洲Av无码专| 亚洲爆乳无码专区| 亚洲尤码不卡AV麻豆| 青青青国产色视频在线观看国产亚洲欧洲国产综合 | 免费手机在线看片| 亚洲av无码偷拍在线观看| 国产亚洲sss在线播放| 亚洲精品视频在线播放| 久久91亚洲精品中文字幕| 亚洲午夜无码AV毛片久久| 免费又黄又爽又猛的毛片| 成人免费无码大片A毛片抽搐色欲| 2021精品国产品免费观看| 日本黄页网址在线看免费不卡| 亚洲视屏在线观看| 亚洲bt加勒比一区二区| 国产亚洲免费的视频看| 国产亚洲A∨片在线观看| 中文字幕亚洲一区二区三区| 亚洲精品老司机在线观看| 久久精品亚洲男人的天堂| 亚洲午夜精品久久久久久浪潮| 亚洲午夜激情视频| 久久亚洲AV无码西西人体| 中文字幕不卡亚洲 | 久久不见久久见免费影院www日本| 深夜福利在线视频免费| a高清免费毛片久久| 成人自慰女黄网站免费大全| 99免费在线视频| 少妇太爽了在线观看免费视频| 最近免费中文字幕mv在线电影| 19禁啪啪无遮挡免费网站| 欧美三级在线电影免费|