3.1.9 Request/Reply 消息
介紹
AmqpTemplate
也提供了各種各樣的sendAndReceive
方法,它們接受同樣的參數選項(exchange, routingKey, and Message)來執行單向發送操作.
這些方法對于request/reply 場景也是有用的,因為它們在發送前處理了必要的"reply-to"屬性配置,并能通過它在專用隊列(基于回復功能臨時創建的隊列)上監聽回復消息.
類似的request/reply方法也是可用的, MessageConverter
可應用于請求和回復上.這些方法被稱為convertSendAndReceive
.參考 AmqpTemplate
的JavaDoc來了解詳情.
從1.5.0版本開始,每個sendAndReceive
方法變種都有一個接受CorrelationData的重載版本
. 連同正確配置的連接工廠,這使得發布者可以確認發送方的操作.
參考 the section called “Publisher Confirms and Returns” 來了解詳情.
Reply 超時
默認情況下,這些 send和receive方法會在5秒后超時并返回null. 這可以通過設置replyTimeout屬性來修改.
從1.5版本開始,如果你設置了 mandatory
屬性為true (或特定消息上的 mandatory-expression
評估為true
),如果消息不能投遞到隊列中,將拋出AmqpMessageReturnedException
.
這個 exception 有 returnedMessage
, replyCode
, replyText
屬性, 如同用于發送的exchange
和
routingKey
.
這個功能使用了發布者返回特性,可通過在CachingConnectionFactory上設置publisherReturns
為true來啟用(參考the section called “Publisher Confirms and Returns”).
此外,你不必在RabbitTemplate上注冊你自己的ReturnCallback.
RabbitMQ Direct reply-to
從Spring AMQP 1.4.1 版本開始,Direct reply-to 就已經做為了默認使用(如果服務器支持的話),而不再創建臨時隊列.
當沒有提供
replyQueue
(或設置名稱為amq.rabbitmq.reply-to
), RabbitTemplate
會自動探測是否支持Direct reply-to, 要么使用它或使用臨時回復隊列來回退. 當使用Direct reply-to, reply-listener
不是必需的,不應該被配置。Reply 監聽器仍然運行命名隊列(不是amq.rabbitmq.reply-to
),允許控制并發回復.
從.16版本開始,出于某些原因,你想為每個回復使用臨時的,專用的,自動刪除的隊列,你可以設置useTemporaryReplyQueues
屬性為true
. 如果你設置了replyAddress,此屬性會被忽略
.
決定是否使用 direct reply-to,可以通過繼承RabbitTemplate
并覆蓋useDirectReplyTo()來修改.
此方法只在發出第一個請求時,調用一次.
應答隊列的消息相關性
當使用固定回復隊列時(不是amq.rabbitmq.reply-to
), 必須要提供 correlation data,這樣回復才能關聯請求.參考RabbitMQ Remote Procedure Call (RPC).
默認情況下,標準correlationId
屬性會用來持有correlation data. 然而,如果你想使用自定義屬性來持有correlation data, 你可在 <rabbit-template/>中設置 correlation-key
屬性.
顯示設置屬性為correlationId
將與缺省屬性相同. 當然,客戶端和服務器對于correlation data必須要有相同的頭.
Spring AMQP 1.1版本為這個data使用自定義屬性spring_reply_correlation
.如果你想在當前版本中恢復這種行為,也許是為了保持1.1中的另一個應用程序的兼容性,你必須設置屬性以spring_reply_correlation。
回復監聽器容器
當使用3.4.0之前的Rabbit版本,每個回復都會使用一個新臨時隊列. 然而,可在模板上配置單個回復隊列, 這將更加高效,同時也允許你在隊列上設置參數.然而,在這種情況下,你必須提供<reply-listener/>子元素.
這個元素為回復隊列提供了監聽器容器, 以模板為監聽器.
所有 Section 3.1.15, “Message Listener Container Configuration” 中的屬性都可以配置在<listener-container/> 元素中,除了connection-factory 和 message-converter(它們是模塊配置中繼承下來的).
重要
如果運行了多個應用程序實例或者使用了多個RabbitTemplate
,那么你必須為每個都使用唯一的回復隊列- RabbitMQ 沒有在隊列中選擇消息的能力,如果它們都使用相同隊列,每個實例都將競爭的答復,而不一定是收到他們自己的。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
reply-queue="replies" reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
由于容器和模板可共享一個連接工廠,它們不會共享一個通道,因此請求和回復不是在同一個事務中執行的(如果是事務的).
重要
在1.5.0版本之前, reply-address
屬性不可用,回復總是通過默認交換器和reply-queue作路由鍵來進行的
. 現在這依然是默認的,但現在你可以指定新的reply-address
屬性. reply-address
可以包含<exchange>/<routingKey>
形式的地址,回復將會路由到設定的exchange和路由到routing key綁定的隊列上. reply-address
優先于 reply-queue
. <reply-listener>
必須配置為一個單獨的<listener-container>
組件, 當只使用reply-address
時,無論是reply-address
還是 reply-queue
(在<listener-container>中的
) 必須指的是同一個隊列.queue屬性
在這個配置中,SimpleListenerContainer
用于接收回復; 而RabbitTemplate
將成為MessageListener
. 當使用<rabbit:template/>
命名空間元素定義模板時, 正如上面所展示的, 分析器會定義容器,并將模板作為監聽器進行包裝.
重要
當模板不使用固定 replyQueue
(或使用Direct reply-to - 參考 the section called “RabbitMQ Direct reply-to”) ,則不需要監聽器容器. 當在RabbitMQ3.4.0+使用時,Direct reply-to
是更好的機制.
如果你將 RabbitTemplate
定義為 <bean/>
, 或使用 @Configuration
類將其定義為@Bean
,或者通過編程來創建模板,你需要自己定義和包裝回復監聽器容器.
如果這樣做失敗了,模板將不會收到回復,并最終會超時并返回null作為對sendAndReceive
方法調用者的回復.
從1.5版本開始, RabbitTemplate
會探測是否配置了MessageListener
來接收回復.如果沒有,它會嘗試發送并使用回復地地址來接收消息,如果失敗了,則會拋出 IllegalStateException
(因為不會收到回復).
此外,如果使用了簡單的replyAddress
(隊列名稱),回復監聽器容器會驗證與監聽的隊列是否是一樣的名稱.但如果這個地址是交換器和路由鍵,這種檢查不會被執行,會輸出調試日志信息.
重要
當在編寫回復監聽器和模板時,重要的一點是要保證模板的replyQueue
與容器的queues
(或queueNames
) 屬性指的是相同的隊列. 模板會將回復隊列插入到出站消息的replyTo屬性中
.
下面的例子展示了如何來包裝這些beans.
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyQueue(replyQueue());
rabbitTemplate.setReplyTimeout(60000);
return rabbitTemplate; }
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
完整的
RabbitTemplate
包裝固定回復隊列,與遠程監聽器容器的請求回復處理展示在 this test case.
重要
當回復超時時(replyTimeout
), sendAndReceive()
方法會返回null.
在1.3.6版本之前, 消息超時回復只是簡單地記錄下來.現在,如果收到了遲到回復,將會拒絕(模板會拋出AmqpRejectAndDontRequeueException
).
如果回復隊列配置了將拒絕消息到死信交換器中, 可獲取回復來作后面的分析. 只須將隊列以隊列名稱作為路由鍵綁定到死信交換器中.
參考RabbitMQ Dead Letter Documentation 來了解更多關于死信的配置信息.
你也可以看示例中關于FixedReplyQueueDeadLetterTests
測試用例.
AsyncRabbitTemplate
1.6版本引入了 AsyncRabbitTemplate
.
它有與 AmqpTemplate 上類似的sendAndReceive
(和 convertSendAndReceive
) 方法,但不是阻塞的,它們會返回一個 ListenableFuture
.
sendAndReceive
方法返回一個RabbitMessageFuture
; convertSendAndReceive
方法會返回一個RabbitConverterFuture
.
你可以同步稍后在future上調用get()方法來獲取結果,也可以注冊一個回調異步來獲取結果.
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
ListenableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get();
}
catch (ExecutionException e) {
...
} ...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.addCallback(new ListenableFutureCallback<String>() {
@Override
publicvoid onSuccess(String result) {
...
}
@Override
publicvoid onFailure(Throwable ex) {
...
}
});
...
}
如果設置了mandatory
,且消息不能投遞,future 會拋出一個ExecutionException
,并帶有AmqpMessageReturnedException
原因,它封裝了返回的消息和以及關于返回的信息.
如果設置了enableConfirms
,future會包含一個屬性confirm
,它是 ListenableFuture<Boolean>
, true
表示成功的發布.
如果confirm future是false,RabbitFuture
會有一個屬性nackCause
- 如果可用的話,則代表的是失敗的原因.
重要
發布者確認已被廢棄了(如果在回復之后收到),-因為回復已經暗示了成功發布.
在模板上設置receiveTimeout
屬性來表示回復超時時間(它默認為 30 秒).如果發生了超時,future會以AmqpReplyTimeoutException結束
.
模板可實現SmartLifecycle
; 這樣可阻止模板在等待回復時Future
退出.
Spring 遠程調用 AMQP
Spring Framework 有一個普遍的遠程處理能力, 允許 Remote Procedure Calls (RPC) 使用多種傳輸協議. Spring-AMQP 通過在客戶端使用AmqpProxyFactoryBean ,在服務端使用AmqpInvokerServiceExporter也可以提供類似的機制.
它提供了基于AMQP的RPC. 在客戶端,RabbitTemplate
可以按照上面一樣來使用,在服務器端, invoker (配置為MessageListener
) 會收到消息, 調用配置的服務,使用入站消息的replyTo
信息來返回回復.
客戶端工廠可注入任何bean (使用它的serviceInterface
);客戶端然后可以調用代理上的方法,導致在AMQP上遠程執行.
重要
使用默認 MessageConverter
器,方法參數和返回值必須是Serializable的實例
.
在服務器端,AmqpInvokerServiceExporter
包含AmqpTemplate
和 MessageConverter
屬性.
目前,未使用模板的MessageConverter
.如果你需要提供定制的消息轉換器,那么你需要使用messageConverter
屬性進行提供.在客戶端,可在AmqpTemplate
中添加定制消息轉換器,它是使用其amqpTemplate
屬性提供給 AmqpProxyFactoryBean
的.
樣例 client 和server 配置如下所示.
<bean id="client" class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<propertyname="amqpTemplate "ref="template" />
<propertyname="serviceInterface"value="foo.ServiceInterface" />
</bean>
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"routing-key="remoting.binding" exchange="remoting.exchange" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="listener" class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface" />
<property name="service" ref="service" />
<property name="amqpTemplate" ref="template" />
</bean><bean id="service" class="foo.ServiceImpl" />
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
重要
AmqpInvokerServiceExporter
只能處理適當格式的消息,如果從AmqpProxyFactoryBean
中發出的消息. 如果它接收到一個不能解釋的消息,那么將發送一個序列化的RuntimeException
作為回復.
如果這些消息無replyToAddress
屬性,消息會被拒絕且在沒有配置死信交換器時會永久丟失.
默認情況下,如果請求消息不能投遞,調用線程最終會超時,并會拋出RemoteProxyFailureException
. 超時時間是5秒,可在RabbitTemplate通過設置replyTimeout
屬性來修改.
從1.5版本開始,如果設置 mandatory
屬性為true, 并在連接工廠中啟用了返回(參考 the section called “Publisher Confirms and Returns”), 調用線程會拋出一個AmqpMessageReturnedException
.
參考 the section called “Reply Timeout” 來了解更多信息.