前言
MQTT(Message Queue Telemetry Transport),遙測傳輸協議,提供訂閱/發布模式,更為簡約、輕量,易于使用,針對受限環境(帶寬低、網絡延遲高、網絡通信不穩定),可以簡單概括為物聯網打造,官方總結特點如下:
1.使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。
2. 對負載內容屏蔽的消息傳輸。
3. 使用 TCP/IP 提供網絡連接。
4. 有三種消息發布服務質量:
“至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用于如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
“至少一次”,確保消息到達,但消息重復可能會發生。
“只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。
5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量。
6. 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
MQTT 3.1協議在線版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
官方下載地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf
PDF版本,42頁,不算多。
另外,目前MQTT大家都用在了手機推送,可能還有很多的使用方式,有待進一步的探索。
協議方面,以前曾簡單實現過一點HTTP協議,基于HTTP上構建若干種通信管道的socket.io協議,不過socket.io 0.9版本的協議才兩三頁而已。面對領域不同,自然解決的方式也不一樣。
閱讀完畢MQTT協議,有一個想法,其實可以基于MQTT協議,打造更加私有、精簡(協議一些地方,略顯多余)的傳輸協議,比如一個字節的傳輸開銷。有時間,會詳細說一下。
固定頭部
固定頭部,使用兩個字節,共16位:
bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
byte 1 |
Message Type |
DUP flag |
QoS level |
RETAIN |
byte 2 |
Remaining Length |
第一個字節(byte 1)
消息類型(4-7),使用4位二進制表示,可代表16種消息類型:
Mnemonic |
Enumeration |
Description |
Reserved |
0 |
Reserved |
CONNECT |
1 |
Client request to connect to Server |
CONNACK |
2 |
Connect Acknowledgment |
PUBLISH |
3 |
Publish message |
PUBACK |
4 |
Publish Acknowledgment |
PUBREC |
5 |
Publish Received (assured delivery part 1) |
PUBREL |
6 |
Publish Release (assured delivery part 2) |
PUBCOMP |
7 |
Publish Complete (assured delivery part 3) |
SUBSCRIBE |
8 |
Client Subscribe request |
SUBACK |
9 |
Subscribe Acknowledgment |
UNSUBSCRIBE |
10 |
Client Unsubscribe request |
UNSUBACK |
11 |
Unsubscribe Acknowledgment |
PINGREQ |
12 |
PING Request |
PINGRESP |
13 |
PING Response |
DISCONNECT |
14 |
Client is Disconnecting |
Reserved |
15 |
Reserved |
除去0和15位置屬于保留待用,共14種消息事件類型。
DUP flag(打開標志)
保證消息可靠傳輸,默認為0,只占用一個字節,表示第一次發送。不能用于檢測消息重復發送等。只適用于客戶端或服務器端嘗試重發PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要滿足以下條件:
當QoS > 0
消息需要回復確認
此時,在可變頭部需要包含消息ID。當值為1時,表示當前消息先前已經被傳送過。
QoS(Quality of Service,服務質量)
使用兩個二進制表示PUBLISH類型消息:
QoS value |
bit 2 |
bit 1 |
Description |
0 |
0 |
0 |
至多一次 |
發完即丟棄 |
<=1 |
1 |
0 |
1 |
至少一次 |
需要確認回復 |
>=1 |
2 |
1 |
0 |
只有一次 |
需要確認回復 |
=1 |
3 |
1 |
1 |
待用,保留位置 |
RETAIN(保持)
僅針對PUBLISH消息。不同值,不同含義:
1:表示發送的消息需要一直持久保存(不受服務器重啟影響),不但要發送給當前的訂閱者,并且以后新來的訂閱了此Topic name的訂閱者會馬上得到推送。
備注:新來乍到的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。
0:僅僅為當前訂閱者推送此消息。
假如服務器收到一個空消息體(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服務器可以刪除掉對應的已被持久化的PUBLISH消息。
如何解析
因為java使用有符號(最高位為符號位)數據表示,byte范圍:-128-127。該字節的最高位(左邊第一位),可能為1。若直接轉換為byte類型,會出現負數,這是一個雷區。DataInputStream提供了int readUnsignedByte()讀取方式,請注意。下面演示了,如何從一個字節中,獲取到所有定義的信息,同時繞過雷區:
public static void main(String[] args) {
byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0
doGetBit(publishFixHeader);
int ori = 224;//1110000,DISCONNECT ,Message Type (14)
byte flag = (byte) ori; //有符號byte
doGetBit(flag);
doGetBit_v2(ori);
}
public static void doGetBit(byte flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = (flags >> 4) & 0x0f;
System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
messageType, dupFlag, qosLevel, retain);
}
public static void doGetBit_v2(int flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = flags >> 4;
System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
messageType, dupFlag, qosLevel, retain);
}
處理Remaining Length(剩余長度)
在當前消息中剩余的byte(字節)數,包含可變頭部和負荷(稱之為內容/body,更為合適)。單個字節最大值:01111111,16進制:0x7F,10進制為127。單個字節為什么不能是11111111(0xFF)呢?因為MQTT協議規定,第八位(最高位)若為1,則表示還有后續字節存在。同時MQTT協議最多允許4個字節表示剩余長度。那么最大長度為:0xFF,0xFF,0xFF,0x7F,二進制表示為:11111111,11111111,11111111,01111111,十進制:268435455 byte=261120KB=256MB=0.25GB 四個字節之間值的范圍:
Digits |
From |
To |
1 |
0 (0x00) |
127 (0x7F) |
2 |
128 (0x80, 0x01) |
16 383 (0xFF, 0x7F) |
3 |
16 384 (0x80, 0x80, 0x01) |
2 097 151 (0xFF, 0xFF, 0x7F) |
4 |
2 097 152 (0x80, 0x80, 0x80, 0x01) |
268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
如何換算成十進制呢 ? 使用java語言表示如下:
public static void main(String[] args) throws IOException {
// 模擬客戶端寫入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0x7f);
InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
// 模擬服務器/客戶端解析
System. out.println( "result is " + bytes2Length(arrayInputStream));
}
/**
* 轉化字節為 int類型長度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {
int multiplier = 1;
int length = 0;
int digit = 0;
do {
digit = in.read(); //一個字節的有符號或者無符號,轉換轉換為四個字節有符號 int類型
length += (digit & 0x7f) * multiplier;
multiplier *= 128;
} while ((digit & 0x80) != 0);
return length;
}
一般最后一個字節小于127(01111111),和0x80(10000000)進行&操作,最終結果都為0,因此計算會終止。代理中間件和請求者,中間傳遞的是字節流Stream,自然要從流中讀取,逐一解析出來。
那么如何將int類型長度解析為不確定的字節值呢?
public static void main(String[] args) throws IOException {
// 模擬服務器/客戶端寫入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(
arrayOutputStream);
// 模擬服務器/客戶端解析
length2Bytes(dataOutputStream, 128);
}
/**
* int類型長度解析為1-4個字節
* @param out
* @param length
* @throws IOException
*/
private static void length2Bytes(OutputStream out, int length)
throws IOException {
int val = length;
do {
int digit = val % 128;
val = val / 128;
if (val > 0)
digit = digit | 0x80;
out.write(digit);
} while (val > 0);
}
digit對val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 請注意:剩余長度,只在固定頭部中,無論是一個字節,還是四個字節,不能被算作可變頭部中。
可變頭部
固定頭部僅定義了消息類型和一些標志位,一些消息的元數據,需要放入可變頭部中。可變頭部內容字節長度 + Playload/負荷字節長度 = 剩余長度,這個是需要牢記的。可變頭部,包含了協議名稱,版本號,連接標志,用戶授權,心跳時間等內容,這部分和后面要講到的CONNECT消息類型,有重復,暫時略過。
Playload/消息體/負荷
消息體主要是為配合固定/可變頭部命令(比如CONNECT可變頭部User name標記若為1則需要在消息體中附加用戶名稱字符串)而存在。
CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息體。PUBLISH的消息體以二進制形式對待。
請記住,MQTT協議只允許在PUBLISH類型消息體中使用自定義特性,在固定/可變頭部想加入自定義私有特性,就免了吧。這也是為了協議免于流于形式,變得很分裂也為了兼顧現有客戶端等。比如支持壓縮等,那就可以在Playload中定義數據支持,在應用中進行讀取處理。
這部分會在后面詳細論述。
消息標識符/消息ID
固定頭中的QoS level標志值為1或2時才會在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可變頭中出現。
一個16位無符號位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個特定方向(服務器發往客戶端為一個方向,客戶端發送到服務器端為另一個方向)的通信消息中必須唯一。比如客戶端發往服務器,有可能存在服務器發往客戶端會同時存在重復,但不礙事。
可變頭部中,需要兩個字節的順序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻譯成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左邊/上面,表示這是一個大端字節/網絡字節序,符合人的閱讀習慣,高位在最左邊。
bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
|
Message Identifier MSB |
|
Message Identifier LSB |
但凡如此表示的,都可以視為一個16位無符號short類型整數,兩個字節表示。在JAVA中處理比較簡單:
DataInputStream.readUnsignedShort
或者
in.read() * 0xFF + in.read();
最大長度可為: 65535
UTF-8編碼
有關字符串,MQTT采用的是修改版的UTF-8編碼,一般形式為如下,需要牢記:
bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
byte 1 |
String Length MSB |
byte 2 |
String Length LSB |
bytes 3 ... |
Encoded Character Data
|
比如AVA,使用writeUTF()方法寫入一串文字“OTWP”,頭兩個字節為一個完整的無符號數字,代表字符串字節長度,后面四個字節才是字符串真正的長度,共六個字節:
bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
byte 1 |
Message Length MSB (0x00) |
|
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 2 |
Message Length LSB (0x04) |
|
0 |
0 |
0 |
0 |
0 |
1 |
0 |
0 |
byte 3 |
'O' (0x4F) |
|
0 |
1 |
0 |
0 |
1 |
1 |
1 |
1 |
byte 4 |
'T' (0x54) |
|
0 |
1 |
0 |
1 |
0 |
1 |
0 |
0 |
byte 5 |
'W' (0x57) |
|
0 |
1 |
0 |
1 |
0 |
1 |
1 |
1 |
byte 6 |
'P' (0x50) |
|
0 |
1 |
0 |
1 |
0 |
0 |
0 |
0 |
這點,在程序中,可不用單獨處理默認,直接使用readUTF()方法,可自動省去了處理字符串長度的麻煩。當然,可以手動讀取字符串:
// 模擬寫入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模擬讀取
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");
等同于:
String target = dataInputStream.readUTF();
MQTT無論是可變頭部還是消息體中,只要是字符串部分,都是采用了修改版的UTF-8編碼,讀取和寫入,借助DataInputStream/DataOutputStream的幫助,一行語句,略去了手動處理的麻煩。
小結
總之,掌握固定頭部的QoS level、RETAIN標記、可變頭部的Connect flags作用和意義,對總體理解MQTT作用很大。