在本章中,將包含下面的內容:
- 連接中間件
- 生產消息
- 消費消息
- 使用JSON序列化消息
- 使用RPC消息
- 廣播消息
- 使用direct交換器來處理消息路由
- 使用topic交換器來處理消息路由
- 保證消息處理
- 分發消息到多個消費者
- 使用消息屬性
- 事務消息
- 處理未路由消息
要運行本章內的示例,你需要首先:
- 安裝Java JDK 1.6+
- 安裝Java RabbitMQ client library
- 正確地配置CLASSPATH 以及你喜歡的開發環境(Eclipse,NetBeans, 等等)
- 在某臺機器上安裝RabbitMQ server (也可以是同一個本地機器)
連接到中間件 每個使用AMQP的應用程序都必須建立一個與AMQP中間件的連接.默認情況下,RabbitMQ (以及任何其它1.0版本之前的AMQP中間件) 通過運行于5672端口之上且相當可靠傳輸協議-TCP來工作的, 即IANA分配的端口.
要創建一個連接RabbitMQ中間件的Java客戶端,你必須執行下面的步驟:
1. 從Java RabbitMQ client library中必須的類:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
2. 創建客戶端ConnectionFactory的實例:
ConnectionFactory factory = new ConnectionFactory();
3. 設置ConnectionFactory 選項:
factory.setHost(rabbitMQhostname);
4. 連接RabbitMQ broker:
Connection connection = factory.newConnection();
5. 從剛創建的連接中創建一個通道:
Channel channel = connection.createChannel();
6. 一旦在RabbitMQ上完成了工作,就需要釋放通道和連接:
channel.close();
connection.close();
How it works…
使用Java client API, 應用程序必須創建一個ConnectionFactory實例,并且使用setHost()方法來設置運行RabbitMQ的主機.在導入相關類后(第1步),我們實例化了工廠對象(第2步).在這個例子中,我們只是用可選的命令行參數來設置主機名稱,但是,在后面的章節中,你可以找到更多關于連接選項的信息.第4步,實際上我們已經創建了連接到RabbitMQ中間件的連接.
在這里,我們使用了默認的連接參數,用戶:guest,密碼:guest,以及虛擬主機:/,后面我們會討論這些參數.
但現在我還沒有準備好與中間件通信,我們必須設置一個通信的channel(第5步).這是AMQP中的一個高級概念,使用此抽象,可以讓多個不同的消息會話使用同一個邏輯connection.
實際上, Java client library 中的所有通信操作都是通過channel實例的方法來執行的.如果你正在開發多線程應用程序,強烈建議在每個線程中使用不同的channel.如果多個線程使用同一個channel,在channel方法調用中會順序執行,從而導致性能損失.最佳實踐是打開一個connection,并將其在多個不同線程之間分享.每個線程負責其獨立channel的創建,使用和銷毀.
可對任何RabbitMQ connection指定多個不同的可選屬性.你可以在在線文檔(http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc)上找到它們. 除了AMQP 虛擬主機外,其它選項都不需要說明.
虛擬主機是一個管理容器,在單個RabbitMQ實例中,允許配置多個邏輯上獨立的中間件主機, 以讓多個不同獨立應用程序能夠共享同一個RabbitMQ server. 每個虛擬主機都能獨立地配置權限,交換器,隊列,并在邏輯上獨立的環境中工作.
也可以連接字符串(連接URI)來指定連接選項,即使用factory.setUri() 方法:
ConnectionFactory factory = new ConnectionFactory();
String uri="amqp://user:pass@hostname:port/vhost";
factory.setUri(uri);
URI必須與 RFC3 (http://www.ietf.org/rfc/rfc3986.txt)的語法規范保持一致.
生產消息
在本配方中, 我們將學習了如何將消息發送到AMQP隊列. 我們將介紹AMQP消息的構建塊:消息,隊列,以及交換器.你可以在Chapter01/Recipe02/src/rmqexample中找到代碼.
w to do it…
在連接到中間件后, 像前面配方中看到的一樣,你可以按下面的步驟來來發送消息:
1. 聲明隊列, 在 com.rabbitmq.client.Channel上調用queueDeclare()方法:
String myQueue = "myFirstQueue";
channel.queueDeclare(myQueue, true, false, false, null); //創建一個名為myFirstQueue,持久化的,非限制的,不自動刪除的隊列,
2. 發送第一個消息到RabbitMQ broker:
String message = "My message to myFirstQueue";
channel.basicPublish("",myQueue, null, message.getBytes());
3. 使用不同的選項發送第二個消息:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
注意:隊列名稱是大小寫敏感的: MYFIRSTQUEUE與myFirstQueue是不同的.
如何工作
在第一個基本例子中,我們能夠發送一個消息到RabbitMQ.在信道建立后,第一個步驟可以確保目標隊列存在,這項任務是通過調用queueDeclare()方法來聲明隊列的(步驟1).
如果隊列已經存在的話,此方法不會做任何事情,否則,它會自己創建一個隊列.如果隊列已存在,但使用了不同的參數進行創建,queueDeclare() 方法會拋出異常.
注意,大部分的AMQP操作只是Channel Java接口的方法.
所有與broker交互的操作都需要通過channel來實施.
讓我們來深入探討queueDeclare() 方法. 其模板可以在http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/查看. 文檔看起來像下面這樣:

實際上我們使用了第二個重載的方法:
AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue, boolean durable, boolean exclusive, booleanautoDelete,java.util.Map<java.lang.String,java.lang.Object> arguments)
throws java.io.IOException其參數含義是:- queue: 用來存儲消息的隊列名稱
- durable: 用于指定當服務器重啟時,隊列是否能復活.注意,當服務器重啟時,如果要保證消息持久性,必須將隊列聲明為持久化的
- exclusive: 用于指定此隊列是否只限制于當前連接.
- autoDelete:當隊列不再使用時,用于指示RabbitMQ broker是否要自動刪除隊列.
- arguments: 這是一個可選的隊列構建參數map.
在第二步中,實際上我們才會將消息發送到RabbitMQ broker.
RabbitMQ絕不打開消息體,對它來說,消息是透明的,因此你可以使用任何喜歡的序列化格式.通常我們會使用JSON, 但也可以使用XML, ASN.1, 標準的或自定義的ASCII或二進制格式. 最重要的事情是客戶端程序需要知道如何來解析數據.
現在我們來深度解析 basicPublish()方法:
void basicPublish(java.lang.String exchange,java.lang.String routingKey, AMQP.BasicProperties props, byte[] body) throws java.io.IOException
在我們的例子中,exchange參數被設置成空字符串"", 即默認的交換器, routingKey 參數設置成了隊列的名稱. 在這種情況下,消息將直接發送到routingKey指定的隊列中. body 參數設置成了字符串的字節數組,也就是我們想要發送的消息. props 參數默認設置成了null; 這些是消息屬性,我們將在Using message properties中深入討論.
在步驟3中,我們發送了完全相同的消息,但將消息屬性設置成了MessageProperties.PERSISTENT_TEXT_PLAIN;通過這種方式我們要求RabbitMQ將此消息標記為持久化消息.
兩個消息已經分發到了RabbitMQ broker, 邏輯上已經在myFirstQueue隊列上排隊了. 消息會駐留在緩沖區中,直到有一個客戶端來消費(通常來說,是一個不同的客戶端).
如果隊列和消息都聲明為持久化,消息就會被標記為持久化的,broker會將其存儲在磁盤上.如果兩個條件中的任何一個缺失,消息將會存儲在內存中. 對于后者來說,當服務器重啟時,緩沖消息將不會復活,但消息的投遞和獲取會更快.我們將Chapter 8, Performance Tuning for RabbitMQ來深入探討這個主題.
更多
在本章節中,我們將討論檢查RabbitMQ狀態的方法,以及隊列是否存在的方法.
如何檢查RabbitMQ狀態
要檢查RabbitMQ狀態,你可以使用rabbitmqctl命令行工具.在Linux設置中,它應該在PATH環境變量中.在Windows中,可在programs |
RabbitMQ Server | RabbitMQ Command Prompt (sbin dir). 我們可從命令行提示窗口中運行rabbitmqctl.bat.
我們可以使用rbbitmqclt list_queues來檢查隊列狀態.在下面的截屏中,顯示了運行例子之前和之后的情景.

在上面的截屏中,我們可以看到myfirstqueue隊列,其后跟著數字2, 它表示緩存在我們隊列中的消息數目(待發送消息數目).
現在我們可以嘗試重啟RabbitMQ, 或者重啟主機.成功重啟RabbitMQ依賴于使用的OS:
在Linux, RedHat, Centos, Fedora, Raspbian上:
service rabbitmq-server restart
在Linux, Ubuntu, Debian上:
/etc/init.d/rabbitmq restart
在Windows上:
sc stop rabbitmq / sc start rabbitmq
當我們再次運行rabbitmqclt list_queues 時,能期望有多少個消息呢?
檢查隊列是否已經存在
要確定特定隊列是否已經存在, 用channel.queueDeclarePassive()來代替channel.queueDeclare(). 兩個方法在隊列已經存在的情況下,會表現出相同的行為,否則,channel.queueDeclare()會創建隊列,但channel.queueDeclarePassive()會拋出異常.
消費消息
在本配方中,我們將關閉此回路.我們已經知道了如何將消息發送到RabbitMQ—或者任何AMQP broker—現在,我們要學習如何獲取這些消息.
你可以在Chapter01/Recipe03/src/rmqexample/ nonblocking 找到源碼.
如何做
要消費前面配方中發送的消息,我們需要執行下面的步驟:
1. 聲明我們要從哪里消費消息的隊列:
String myQueue="myFirstQueue";
channel.queueDeclare(myQueue, true, false, false, null);
2. 定義一個繼承自DefaultConsumer的消費類:
public class ActualConsumer extends DefaultConsumer {
public ActualConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
String message = new String(body);
System.out.println("Received: " + message);
}
}
3. 創建一個消費對象實例,再綁定到我們的channel上:
ActualConsumer consumer = new ActualConsumer(channel);
4. 開始消費消息:
String consumerTag = channel.basicConsume(myQueue, true,consumer);
5. 一旦完成,取消消費者(其API含義為:取消當前消費者(不能再收到此隊列上的消息,重新運行消費者可以收到消息)并調用consumer的handleCancelOk方法):
channel.basicCancel(consumerTag);
如何運作
在我們建立了與AMQP broker的connection 和channel后,我們需要確保我們從哪個隊列來消費消息(step 1).事實上,消費者可以在生產者發送消息到隊列之前就已經啟動了,此時有可能隊列還不存在,為了避免隊列上后續操作失敗,我們需要聲明隊列(譯者注:但消費聲明隊列這個動作并不必須的,只要生產者聲明了隊列,消費者不需要調用queueDeclare方法同樣可以消費消息,在這里只能認為是一種保險措施).
TIP:
通過允許生產者和消費者聲明相同的隊列,我們可以解藕其存在性,同時啟動的順序也不重要.
步驟2的核心,我們通過覆蓋handleDelivery()方法定義了我們特定的消費者,以及在步驟3中我們進行實例化。在Java client API中,消費者回調是通過com.rabbitmq.client.Consumer接口定義的.我們從 DefaultConsumer擴展了我們的消費者,DefaultConsumer提供了Consumer 接口所有方法中無具體操作的實現.在步驟3中,通過調用channel.basicConsume(),我們讓消費者開始了消費消息.每個channel的消費者總是同一個線程上執行,而且是獨立于調用者的.
現在我們已經從myQueue中激活了一個消費者,Java client library就會開始從RabbitMQ broker的隊列中獲取消息,并且會對每個消費者都調用handleDelivery().
在channel.basicConsume()方法調用后,我們會坐等主線程結束. 消息正在以非阻塞方式進行消費。
只有當我們按Enter之后, 執行過程會到步驟5,然后消費者退出.在這個時刻,消費者線程會停止調用我們的消費者對象,因此我們可以釋放資源并退出。
更多
在本章節中,我們將了解更多關于消費者線程以及阻塞語義的用法.
更多的消費者線程
在連接定義期間,RabbitMQ Java API 會按消費者線程需要分配一個線程池。所有綁定到同一個channel的消費者都會使用線程池中的單個線程來運行;但是,有可能不同channel的消費者也可通過同一個線程來處理. 這就是為什么要在消費方法避免長時間操作的原因,為了避免阻塞其它消費者,可以在我們的自己定義的線程池中進行處理,就像我們例子中展示的一樣,但這不是必須的。我們已經定義了一個線程池, java.util.concurrent.ExecutorService, 因此可在連接期間將其傳入:
ExecutorService eService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(eService);
這是由我們來進行管理的,因此我們要負責對其終止:
eService.shutdown();
但是,必須要記住的是,如果你沒有定義你自己的ExecutorService線程池,Java client library會在連接創建期間創建一個,并會在銷毀對應連接時,自動銷毀連接池。
使用JSON來序列化消息體(y)tion with JSON
在AMQP中,消息是不透明的實體,AMQP不提供任何標準的方式來編解碼消息.但是,web應用程序經常使用JSON來作為應用程序層格式,JavaSciprt序列化格式已經變成了事實上的標準,在這種情況下,RabbitMQ client Java library 可以包含一些實用函數.另一方面,這也不是唯一的協議,任何程序可以選擇它自己的協議(XML, Google Protocol Buffers, ASN.1, or proprietary).
在這個例子中,我們將展示如何使用JSON協議來編解碼消息 體. 我們會使用Java編寫的發布者(Chapter01/Recipe04/Java_4/src/rmqexample)來發送消息,并用 Python語言編寫的消費者來消費消息 (Chapter01/Recipe04/Python04).
如何做How to do it…
要實現一個Java生產者和一個Python消費者, 你可以執行下面的步驟:
1. Java: 除了導入Connecting to the broker配方中提到的包外,我們還要導入:
import com.rabbitmq.tools.json.JSONWriter;
2. Java: 創建一個非持久化隊列:
String myQueue="myJSONBodyQueue_4";
channel.queueDeclare(MyQueue, false, false, false, null);
3. Java: 創建一個使用樣例數據的Book列表:
List<Book>newBooks = new ArrayList<Book>();
for (inti = 1; i< 11; i++) {
Book book = new Book();
book.setBookID(i);
book.setBookDescription("History VOL: " + i );
book.setAuthor("John Doe");
newBooks.add(book);
}
4. Java: 使用JSONwriter來序列化newBooks實例:
JSONWriter rabbitmqJson = new JSONWriter();
String jsonmessage = rabbitmqJson.write(newBooks);
5. Java: 最后發送jsonmessage:
channel.basicPublish("",MyQueue,null,jsonmessage.getBytes());
6. Python: 要使用Pika library,我們必須要導入下面的包:
import pika;
import json;
Python 有JSON處理的內鍵包.
7. Python: 創建RabbitMQ的連接,使用下面的代碼:
connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
8. Python: 聲明隊列,綁定消費者,然后再注冊回調:
channel = connection.channel()
my_queue = "myJSONBodyQueue_4"
channel.queue_declare(queue=my_queue)
channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
channel.start_consuming()
How it works…
在我們設置環境后(步驟1和步驟2),我們使用了write(newbooks)來序列化newbooks類。此方法返回返回的JSON字符串,就像下面的展示的一樣:
[
{
"author" : "John Doe",
"bookDescription" : "History VOL: 1",
"bookID" : 1
},
{
"author" : "John Doe",
"bookDescription" : "History VOL: 2",
"bookID" : 2
}
]
步驟4中,我們發布了一個jsonmessage到myJSONBodyQueue_4隊列中.現在Python消費者可以從同一個隊列中獲取消息。在Python中我們看如何操作:
connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
channel = connection.channel()
queue_name = "myJSONBodyQueue_4"
channel.queue_declare(queue=my_queue)
..
channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
channel.start_consuming()
正如Java實現中看到的一樣,我們必須創建一個連接,然后再創建一個通道.channel.queue_declare(queue=myQueue)方法,我們聲明了非持久化,不受連接限制,不會自己刪除的隊列。 如果要改變隊列的屬性,我們方法中添加參數,就像下面這樣:
channel.queue_declare(queue=myQueue,durable=True)
當不同AMQP clients聲明了相同隊列時,那么確保有相同的durable, exclusive, 和autodelete 屬性是相當重要的(如果隊列名稱相同,但屬性不同會拋異常),否則, channel.queue_declare()會拋異常。
對于channel.basic_consume()方法, client會從給定的隊列中消費消息,當接收到消息后,會調用consumer_callback()回調方法。
在Java中我們是在消費者接口中定義的回調,但在Python中,它們只是傳遞給basic_consume()方法, 更多的功能,更少的聲明,是Python的典范.
consumer_callback回調如下:
def consumer_callback(ch, method, properties, body):
newBooks=json.loads(body);
print" Count books:",len(newBooks);
for item in newBooks:
print 'ID:',item['bookID'], '-
Description:',item['bookDescription'],' -
Author:',item['author']
回調接收到消息后,使用json.loads()來反序列化消息,然后就可以準備讀取newBooks的結構了。
更多
包含在RabbitMQ client library中的JSON幫助類是非常簡單的,在真實項目中,你可以使用外部JSON library.如:強大的google-gson (https://code.google.com/p/google-gson/) 或 jackson (http://jackson.codehaus.org/).
使用RPC消息
遠程過程調用(RPC)通常用于client-server架構. client提出需要執行服務器上的某些操作請求,然后等待服務器響應.
消息架構試圖使用發后即忘(fire-and-forget)的消息形式來實施一種完全不同的解決方案,但是可以使用設計合理的AMQP隊列和增加型RPC來實施,如下所示:

上面的圖形描述了request queue是與responder相關聯的,reply queues 與callers是相聯的.但是,當我們在使用RabbitMQ的時候,所有的涉及的端點(callers和responders) 都是AMQP clients.現在我們將描述Chapter01/Recipe05/Java_5/src/rmqexample/rpc例子中的操作步驟.
如何做
執行下面的步驟來實現RPC responder:
1. 聲明一個請求隊列, responder會在此處來等候RPC請求:
channel.queueDeclare(requestQueue, false, false, false,null);
2. 通過覆蓋DefaultConsumer.handleDelivery()來定義我們特定的RpcResponderConsumer消費者, 在接收到每個RPC請求的時,消費者將:
? 執行RPC請求中的操作
? 準備回復消息
? 通過下面的代碼在回復屬性中設置correlation ID:
BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
將答案發送到回復隊列中:
getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
?發送應答給RPC request:
getChannel().basicAck(envelope.getDeliveryTag(), false);
3. 開始消費消息,直到我們看到了回復消息才停止:
現在讓我們來執行下面的步驟來實現RPC caller:
1. 聲明請求隊列,在這里responder會等待RPC請求:
channel.queueDeclare(requestQueue, false, false, false,null);
2. 創建一個臨時的,私有的,自動刪除的回復隊列:
String replyQueue = channel.queueDeclare().getQueue();
3. 定義我們特定的消費者RpcCallerConsumer, 它用于接收和處理RPC回復. 它將:
? 當收到回復時,通過覆蓋handleDelivery()用于指明要做什么(在我們的例子中,定義了AddAction()):
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws java.io.IOException {
String messageIdentifier =properties.getCorrelationId();
String action = actions.get(messageIdentifier);
actions.remove(messageIdentifier);
String response = new String(body);
OnReply(action, response);
}
4. 調用channel.basicConsume()方法啟動消息消費.
5. 準備和序列化請求(我們例子中是messageRequest).
6. 初始化一個任意的唯一消息標識符(messageIdentifier).
7. 當消費者收到相應回復的時候,定義應該做什么,通過使用messageIdentifier來綁定動作.在我們的例子中,我們通過調用我們自定義的方法 consumer.AddAction()來完成的.
8. 發布消息到請求隊列,設置其屬性:
BasicProperties props = new BasicProperties.Builder().correlationId(messageIdentifier)replyTo(replyQueue).build();
channel.basicPublish("", requestQueue,props,messageRequest.getBytes());
如何工作
在我們的例子中,RPC responder扮演的是RPC server的角色; responder會監聽requestQueue公共隊列(步驟1),這里放置了調用者的請求.
另一方面,每個調用者會在其私有隊列上消費responder的回復信息(步驟5).當caller發送消息時(步驟11),它包含兩個屬性:一個是用于監聽的臨時回復隊列 (replyTo())名稱,另一個是消息標識(correlationId()),當回復消息時,用于標識caller.事實上,在我們的例子中,我們已經實現了一個異步的RPC caller. The action to be performed by the RpcCallerConsumer (step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction() (step 10).
回到responder, RPC邏輯全在RpcResponderConsumer中.這不同于特定的非阻塞consumer,就像我們在消費消息配方中看到的一樣,但不同的是下面兩點細節:
回復的隊列名稱是通過消息屬性來獲取的,即properties.getReplyTo().其值已經被caller設成了私有,臨時回復隊列.
回復消息必須包含在correlation ID標識的隊列中(待查)
TIPRPC responde不會使用correlation ID;它只用來讓caller收到對應請求的回復消息
更多
本章節我們會討論阻塞RPC的使用.
使用阻塞RPC
有時,簡單性比可擴展性更重要.在這種情況下,可以使用包含在 Java RabbitMQ client library中實現了阻塞RPC語義的幫助類:
com.rabbitmq.client.RpcClient
com.rabbitmq.client.StringRpcServer
邏輯是相同的,但沒有非阻塞消費者參與, 并且臨時隊列和correlation IDs 的處理對于用戶來說是透明的.
你可以在Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc找到相關的例子.
擴展注意
當有多個callers會發生什么呢?它主要以標準的RPC client/server 架構來工作.但如果運行多個reponders會怎樣呢?
在這種情況下,所有的responders都會從請求隊列中關注消費消息. 此外, responders可位于不同的主機. 這個主題的更多信息請參考配方-分發消息給多個消費者.
廣播消息es
在本例中,我們看到如何將同一個消息發送給有可能很大量的消費者.這是一個典型的廣播消息到大量客戶端的消息應用.舉例來說,在大型多人游戲中更新記分板的時候,或在一個社交網絡應用中發布新聞的時候,都需要將消息廣播給多個消費者.
在本配方中,我們同時探討生產者和消費者實現.因為它是非常典型的消費者可以使用不同的技術和編程語言,在AMQP中,我們將使用Java, Python, 以及Ruby來展示這種互通性.
我們會感謝AMQP中隔離交換器和隊列帶來的好處.在Chapter01/Recipe06/中找到源碼.
如何做
要做這道菜,我們需要四個不同的代碼:
- Java發布者
- Java消費者
- Python消費者
- Ruby消費者
準備Java發布者:
1. 聲明一個fanout類型的交換器:
channel.exchangeDeclare(myExchange, "fanout");
2. 發送一個消息到交換器:
channel.basicPublish(myExchange, "", null,jsonmessage.getBytes());
然后準備Java消費者:
1. 聲明同一個生產者聲明的fanout交換器:
channel.exchangeDeclare(myExchange, "fanout");
2. 自動創建一個新的臨時隊列:
String queueName = channel.queueDeclare().getQueue();
3. 將隊列綁定到交換器上:
channel.queueBind(queueName, myExchange, "");
4. 定義一個自定義,非阻塞消費者,這部分內容已經在消費消息食譜中看到過了.
5. 調用channel.basicConsume()來消費消息
相對于Java消費者來說,Python消費者的源碼是非常簡單的,因此這里沒必要再重復必要的步驟,只需要遵循Java消費者的步驟,可參考Chapter01/Recipe06/Python_6/PyConsumer.py的代碼.
Ruby消費者中,你必須使用"bunny" 然后再使用URI連接.
可查看在Chapter01/Recipe06/Ruby_6/RbConsumer.rb的源碼
現在我們要把這些整合到一起來看食譜:
1. 啟動一個Java生產者的實例; 消息將立即進行發布.
2. 啟動一個或多個Java/Python/Ruby的消費者實例; 消費者只有當它們運行的時候,才能接接收到消息.
3. 停止其中一個消費者,而生產者繼續運行,然后再重啟這個消費者,我們可以看到消費者在停止期間會丟失消息. 如何運作
生產者和消費者都通過單個連接連上了RabbitMQ,消息的邏輯路徑如下圖所示:

在步驟1中,我們已經聲明了交換器,與隊列聲明的邏輯一樣: 如果指定的交換器不存在,將會進行創建;否則,不做任何事情.exchangeDeclare()方法的第二個參數是一個字符串, 它用于指定交換器的類型,在我們這里,交換器類型是fanout.
在步驟2中,生產者向交換器發送了一條消息. 你可以使用下面的命令來查看它以及其它已定義的交換器:
rabbitmqctl list_exchanges
channel.basicPublish() 方法的第二個參數是路由鍵(routing key),在使用fanout交換器時,此參數通常會忽略.第三個設置為null的參數, 此參數代表可選的消息屬性(更多信息可參考使用消息屬性食譜).第四個參數是消息本身.
當我們啟動一個消費者的時候,它創建一個它自己的臨時隊列(步驟9). 使用channel.queueDeclare()空重載,我們會創建一個非持久化,私有的,自動刪除的,隊列名稱自動生成的隊列.
運行一對消費者,并用rabbitmqctl list_queues查看,我們可以兩個隊列,每個消費者一個, 還有奇怪的名字,還有前面食譜中用到的持久化隊列myFirstQueue ,如下圖所示:
在步驟5中,我們將隊列綁定到了myExchange交換器上.可以用下面的命令來監控這些綁定:
rabbitmqctl list_bindings
監控是AMQP非常重要的一面; 消息是通過交換器來路由到綁定隊列的,且會在隊列中緩存.
TIP
交換器不會緩存消息,它只是邏輯元素.
fanout交換器在通過消息拷貝,來將消息路由到每個綁定的隊列中,因此,如果沒有綁定隊列,消息就不會被消費者接收(參考處理未路由消息食譜來了解更多信息).
一旦我們關閉了消費者,我們暗中地銷毀了其私有臨時隊列(這就是為什么隊列是自動刪除的,否則,這些隊列在未使用后會保留下來,broker上的隊列數目會無限地增長), 消息也不會緩存了.
當重啟消費者的時候,它會創建一個新的獨立的隊列,只要我們將其綁定到myExchange上,發布者發送的消息就會緩存到這個隊列上,并被消費者消費.
更多
當RabbitMQ第一次啟動的時候,它創建一些預定的交換器. 執行rabbitmqctl list_exchanges命令,我們可以觀察到許多存在的交換器,也包含了我們在本食譜中定義的交換器:

所有出現在這里的amq.*交換器都是由AMQP brokers預先定義的,它可用來代替你定義你自己的交換器;它們不需要聲明.
我們可以使用amq.fanout來替換myLastnews.fanout_6, 對于簡單應用程序來說,這是很好的選擇. 但一般來說,應用程序來聲明和使用它們自己的交換器.
本食譜使用的重載,交換器是非自動刪除的(won't be deleted as soon as the last client detaches it) 和非持久化的(won't survive server restarts). 你可以在http://www.rabbitmq.com/releases/ rabbitmq-java-client/current-javadoc/找到更多的選項和重載.
使用Direct交換器來路由消息
要本食譜中,我們將看到如何選擇消費消息子集(部分消息), 只路由那些感涂在的AMQP隊列,以及忽略其它隊列.
一個典型的使用場景是實現一個聊天器, 在這里每個隊列代表了一個用戶.我們可以查看下面的目錄找到相關的例子:Chapter01/Recipe07/Java_7/src/rmqexample/direct
我們將展示如何同時實現生產者和消費者.實現生產者,執行下面的步驟:
1. 聲明一個direct交換器:
channel.exchangeDeclare(exchangeName, "direct", false,false, null);
2. 發送一些消息到交換器,使用任意的routingKey 值:
channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
要實現消費者,執行下面的步驟:
1. 聲明同樣的交換器,步驟與上面步驟相同.
2. 創建一個臨時隊列:
String myQueue = channel.queueDeclare().getQueue();
3. 使用bindingKey將隊列綁定到交換器上. 假如你要使用多個binding key,可按需要多次執行這個操作:
channel.queueBind(myQueue,exchangeName,bindingKey);
4. 在創建了適當的消費對象后,可以參考消費消息食譜來消費消息.
如何工作
在本食譜中,我們使用任意的字符串(也稱為路由鍵)來向direct交換器發布消息(step 2).在fanout交換器中,如果沒有綁定隊列的話,消息是不是存儲的,但在這里,根據在綁定時指定的綁定鍵,消費者可以選擇消息轉發這些隊列(步驟5).
僅當路由鍵與綁定鍵相同的消息才會被投遞到這些隊列.
TIP
過濾操作是由AMQP broker來操作,而不是消費者;路由鍵與綁定鍵不同的消息是不會放置到隊列中的.但是,可允許多個隊列使用相同的綁定鍵,broker會將匹配的消息進行拷貝,并投遞給它們.也允許在同一個隊列/交換綁定上綁定多個不同的綁定鍵,這樣就可以投遞所有相應的消息.
更多
假如我們使用指定的路由鍵來將消息投遞到交換器,但在這個指定鍵上卻沒有綁定隊列,那么消息會默默的銷毀.
然而, 當發生這種情況時,生產者可以檢測這種行為,正如處理未路由消息食譜中描述的一樣.
使用topic交換器來路由消息
Direct 和topic 交換器在概念上有點相似,最大的不同點是direct交換器使用精準匹配來選擇消息的目的地,而topic交換器允許使用通配符來進行模式匹配.
例如, BBC使用使用topic交換器來將新故事路由到恰當的RSS訂閱.
你可以在這里找到topic交換器的例子:Chapter01/Recipe08/Java_8/src/rmqexample/topic
如何做
我們先從生產者開始:
1. 聲明一個topic交換器:
channel.exchangeDeclare(exchangeName, "topic", false,false, null);
2. 使用任意的路由鍵將消息發送到交換器:
channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
接下來,消費者:
1. 聲明相同的交換,如步驟1做的一樣.
2. 創建一個臨時隊列:
String myQueue = channel.queueDeclare().getQueue();
3. 使用綁定鍵將隊列綁定到交換器上,這里也可以包含通配符:
channel.queueBind(myQueue,exchangeName,bindingKey);
4. 在創建適當的消費者對象后,可以像消息消息食譜中一樣來消費消息.
如何工作
以先前的食譜中,用字符串標記來將消息發送到topic交換器中(步驟2),但對于topic交換器來說,組合多個逗號分隔的單詞也是很重要的;它們會被當作主題消息.例如,在我們的例子中,我們用:
technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook
要消息這些消息,消費者需要將myQueue綁定到交換器上(步驟5)
使用topic交換器, 步驟5中指定的訂閱綁定/綁定鍵可以是一系列逗號分隔的單詞或通配符. AMQP通配符只包括:
- #: 匹配0或多個單詞
- *: 只精確匹配一個單詞
例如:
- #.ebook 和 *.*.ebook 可匹配第一個和第三個發送消息
- sport.# and sport.*.* 可匹配第二個和第三個發送消息
- # 可匹配任何消息
在最后一種情況中,topic交換器的行為類似于fanout交換器, 但性能不同,當使用這種形式時性能更高
更多
再次說明,如果消息不能投遞到任何隊列,它們會被默默地銷毀.當發生此種情況時,生產者可以檢測這種行為,就如處理未路由消息食譜中描述的一樣.
保證消息處理
在這個例子中,我們將展示在消費消息時,我們如何來使用明確的應答.消息在消費者獲取并對broker作出應答前,它會一直存在于隊列中.應答可以是明確的或隱含的.在先前的例子中,我們使用的是隱含應答.為了能實際查看這個例子,你可以運行生產消息食譜中的發布者,然后你運行消費者來獲取消息,可在Chapter01/Recipe09/Java_9/中找到.
如何做
為了能保證消費者處理完消息后能應答消息,你可以執行下面的步驟:
1. 聲明一個隊列:
channel.queueDeclare(myQueue, true, false, false,null);
2. 綁定消費者與隊列,并設置basicConsume()方法的autoAck參數為false:
ActualConsumer consumer = new ActualConsumer(channel);
boolean autoAck = false; // n.b.
channel.basicConsume(MyQueue, autoAck, consumer);
3. 消費消息,并發送應答:
public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException {
String message = new String(body);
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
如何工作
在創建隊列后(步驟1),我們將消費者加入到隊列中,并且定義了應答行為(步驟2).
參數autoack = false表明RabbitMQ client API會自己來發送明確的應答.
在我們從隊列收到消息后,我們必須向RabbitMQ發送應答,以表示我們收到到消息并適當地處理了,因此我們調用了channel.basicAck()(步驟3).
RabbitMQ只有在收到了應答后,才會從隊列中刪除消息.
TIP
如果在消費者不發送應答,消費者會繼續接收后面的消息;但是,當你斷開了消費者后,所有的消息仍會保留在隊列中.消息在RabbitMQ收到應答前,都認為沒有被消費.可以注解basicAck()調用來演示這種行為.
channel.basicAck()方法有兩個參數:
- deliveryTag
- multiple
deliveryTag參數是由服務器為消息指定的值,你可以通過使用delivery.getEnvelope().getDeliveryTag()來獲取.
如果multiple設置為false,client只會應答deliveryTag參數的消息, 否則,client會應答此消息之前的所有消息. 通過向RabbitMQ應答一組消息而不是單個消息,此標志允許我們優化消費消息. TIP
消息只能應答一次,如果對同一個消息應答了多次,方法會拋出preconditionfailed 異常.
調用channel.basicAck(0,true),則所有未應答的消息都會得到應答,0 代表所有消息.此外,調用channel.basicAck(0,false) 會引發異常.
更多
下面的章節,我們還會討論basicReject()方法,此方法是RabbitMQ擴展,它允許更好的靈活性.
也可參考
分發消息到多個消費者食譜是一個更好解釋明確應答真實例子.
分發消息到多個消費者
在這個例子中,我們將展示如何來創建一個動態負責均衡器,以及如何將消息分發到多個消費者.我們將創建一個文件下載器.
你可在Chapter01/Recipe10/Java_10/找到源碼.
如何做
為了能讓兩個以上的RabbitMQ clients能盡可能的負載均衡來消費消息,你必須遵循下面的步驟:
1. 聲明一個命令隊列, 并按下面這樣指定basicQos:
channel.queueDeclare(myQueue, false, false, false,null);
channel.basicQos(1);
2. 使用明確應答來綁定一個消費者:
channel.basicConsume(myQueue, false, consumer);
3. 使用channel.basicPublish()來發送一個或多個消息.
4. 運行兩個或多個消費者.
如何工作
發布者發送了一條帶下載地址的消息:
String messageUrlToDownload="http://www.rabbitmq.com/releases/rabbitmq-dotnetclient/v3.0.2/rabbitmq-dotnet-client-3.0.2-user-guide.pdf";
channel.basicPublish("",MyQueue,null,messageUrlToDownload.getBytes());
消費者獲取到了這個消息:
System.out.println("Url to download:" + messageURL);
downloadUrl(messageURL);
一旦下載完成,消費者將向broker發送應答,并開始準備下載下一個:
getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("Ack sent!");
System.out.println("Wait for the next download...");
消費者按塊的方式能獲取消息,但實際上,當消費者發送應答時,消息就會從隊列中刪除,在先前的食譜中,我們已經看過這種情況了.
另一個方面,在本食譜中使用了多個消費才,第一個會預先提取消息,其它后啟動的消費者在隊列中找不到任何可用的消息.為了平等地發分發消息,我們需要使用channel.basicQos(1)來指定一次只預先提取一個消息.
也可參考
在Chapter 8, Performance Tuning for RabbitMQ中可以找到更多負載均衡的信息.
使用消息屬性
在這個例子中,我們將展示如何AMQP消息是如何分解的,以及如何使用消息屬性.
你可在Chapter01/Recipe11/Java_11/找到源碼.
如何做
要訪問消息屬性,你必須執行下面的步驟:
1. 聲明一個隊列:
channel.queueDeclare(MyQueue, false, false, false,null);
2. 創建一個BasicProperties類:
Map<String,Object>headerMap = new HashMap<String,Object>();
headerMap.put("key1", "value1");
headerMap.put("key2", new Integer(50) );
headerMap.put("key3", new Boolean(false));
headerMap.put("key4", "value4");
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.contentType("text/plain")
.userId("guest")
.appId("app id: 20")
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.clusterId("cluster id: 1")
.build();
3. 使用消息屬性來發布消息:
channel.basicPublish("",myQueue,messageProperties,message.getBytes())
4. 消費消息并打印屬性:
System.out.println("Property:" + properties.toString());
如何工作
AMQP 消息(也稱為內容)被分成了兩部分:
- 內容頭
- 內容體(先前例子我們已經看到過了)
在步驟2中,我們使用BasicProperties創建一個內容頭:
Map<String,Object>headerMap = new HashMap<String, Object>();
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest")
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.build();
在這個對象中,我們設置了下面的屬性:
- timestamp: 消息時間戳.
- userId: 哪個用戶發送的消息(默認是"guest"). 在下面的章節中,我們將了解用戶管理.
- deliveryMode: 如果設置為1,則消息是非持久化的, 如果設置為2,則消息是持久化的(你可以參考食譜連接broker).
- priority: 用于定義消息的優先級,其值可以是0到9.
- headers: 一個HashMap<String, Object>頭,你可以在其中自由地定義字段.
TIP RabbitMQ BasicProperties 類是一個AMQP內容頭實現.BasicProperties的屬性可通過BasicProperties.Builder()構建.頭準備好了,我們可使用
channel.basicPublish("",myQueue, messageProperties,message.getBytes())來發送消息,在這里,messageProperties是消息頭,message是消息體.
在步驟中,消費者獲得了一個消息:
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
System.out.println("***********message header****************");
System.out.println("Message sent at:"+ properties.getTimestamp());
System.out.println("Message sent by user:"+ properties.getUserId());
System.out.println("Message sent by App:"+properties.getAppId());
System.out.println("all properties :" + properties.toString());
System.out.println("**********message body**************");
String message = new String(body);
System.out.println("Message Body:"+message);
}
參數properties包含了消息頭,body包含了消息體.
更多
使用消息屬性可以優化性能.將審計信息或日志信息寫入body,通常是一種典型的錯誤,因為消費者需要解析body來獲取它們.
body 消息只可以包含應用程序數據(如,一個Book class),而消息屬性可以持有消息機制相關或其它實現細節的相關信息.
例如 ,如果消費者想知道消息是何時發送的,那么你可以使用timestamp屬性, 或者消費者需要根據一個定制標記來區分消息,你可以將它們放入header HashMap屬性中.
也可參考
MessageProperties類對于標準情況,包含了一些預先構建的BasicProperties類. 可查看http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/
MessageProperties.html
在這個例子中,我們只是使用了一些屬性,你可在http://www.rabbitmq.com/releases//rabbitmq-java-client/currentjavadoc/com/rabbitmq/client/AMQP.BasicProperties.html獲取更多信息.
消息事務
在本例中,我們將討論如何使用channel事務. 在生產消息食譜中,我們已經了解了如何來使用持久化消息,但如果broker不能將消息寫入磁盤的話,那么你就會丟失消息.使用AQMP事務,你可以確保消息不會丟失.
你可在Chapter01/Recipe12/Java_12/找到相關源碼.
如何做
通過下面的步驟,你可以使用事務性消息:
1. 創建持久化隊列
channel.queueDeclare(myQueue, true, false, false, null);
2. 設置channel為事務模式:
channel.txSelect();
3. 發送消息到隊列,然后提交操作:
channel.basicPublish("", myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
channel.txCommit();
如何工作
在創建了持久化隊列后(step 1),我們將channel設置成了事務模式,使用的方法是txSelect() (step 2). 使用 txCommit()確保消息存儲在隊列并寫入磁盤,然后消息將投遞給消費者.在txCommit() 或txRollback()之前,必須至少調用一次txSelect().
在一個DBMS中,你可以使用回滾方法.在下面的情況下,消息不會被存儲或投遞:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes());
channel.txRollback();
更多
事務會降低應用程序的性能,因為broker不會緩存消息,且tx操作是同步的.
也可參考
在后面的章節中,我們會討論發布確認插件,這是一種較快確認操作的方式.
處理未路由消息
在這個例子中,我們將展示如何管理未路由的消息. 未路由消息指的是沒有目的地的消息.如,一個消息發送到了無任何綁定隊列的交換器上.
未路由消息不同于死消息 ,前者是發送到無任何隊列目的地的交換器上,而后者指的是消息到達了隊列,但由于消費者的決策,過期TTL,或者超過隊列長度限制而被拒絕的消息 你可以在Chapter01/Recipe13/Java_13/找到源碼.
如何做
為了處理未路由的消息,你需要執行下面的操作:
1. 第一步實現ReturnListener接口:
public class HandlingReturnListener implements ReturnListener
@Override
public void handleReturn…
2. 將HandlingReturnListener類添加到channel.addReturnListener():
channel.addReturnListener(new HandlingReturnListener());
3. 然后創建一個交換機:
channel.exchangeDeclare(myExchange, "direct", false, false,null);
4. 最后發布一個強制消息到交換器:
boolean isMandatory = true;
channel.basicPublish(myExchange, "",isMandatory, null,message.getBytes());
如何工作
當我們運行發布者的時候,發送到myExchange的消息因為沒有綁定任何隊列不會到達任何目的地.但這些消息不會,它們會被重定向到一個內部隊列. .HandlingReturnListener類會使用handleReturn()來處理這些消息.ReturnListener類綁定到了一個發布者channel上, 且它會獵捕那些不能路由的消息.
在源碼示例中,你可以找到消費者,你也可以一起運行生產者和消費者,然后再停止消費者.
更多
如果沒有設置channel ReturnListener, 未路由的消息只是被broker默默的拋棄.在這種情況下,你必須注意未路由消息,將mandatory 標記設置為true是相當重要的,如果為false,未路由的消息也會被拋棄.
posted on 2016-06-03 23:22
胡小軍 閱讀(2601)
評論(0) 編輯 收藏 所屬分類:
RabbitMQ