這是1.6版本中加入的新屬性.當容器啟動時,如果此屬性為true (默認為false), 容器會檢查上下文中聲明的隊列是否中間件中存在的隊列是否一致.
如果屬性不匹配(如. auto-delete
) 或參數 (e.g. x-message-ttl
) 存在, 容器 (和應用程序上下文) 會拋出致命異常而導致啟動失敗.如果是在恢復期間檢測到的問題,容器會停止.
必須在上下文中存在單個RabbitAdmin
(或使用rabbitAdmin
屬性在容器上特別配置);否則此屬性必須為false
.
如果在初始啟動期間,中間件還不可用,容器啟動后,當建立連接時會檢查條件.
重要
該檢查針對的是上下文的所有隊列,而不僅僅是特定監聽器配置使用的隊列.如果你希望只檢查容器使用的隊列,你需要為這個容器配置單獨的RabbitAdmin
, 并使用rabbitAdmin
屬性為其提供一個引用.
參考“Conditional Declaration”章節來了解更多信息.
屬性
autoDeclare(auto-declare)
描述
從1.4版本開始, SimpleMessageListenerContainer
引入了這個新屬性.
當設置為true
時(默認值),容器會使用RabbitAdmin
來重新聲明所有 AMQP 對象(Queues, Exchanges, Bindings).
如果在啟動期間探測到至少有一個隊列缺失了,可能因為它是自動刪除隊列或過期隊列,但不管隊列缺失是基于什么原因,重新聲明仍會進行處理(譯者注:太浪費了).
要禁用這種行為, 可設置其屬性為false
. 但需要注意的是,如果所有隊列都缺失了(譯者注:全部還是部分),容器會啟動失敗.
在1.6版本之前,如果在上下文中存在多個admin,容器會隨機選擇一個.反之,如果沒有admin,它會從內部創建一個.
無論是哪種情況,這都將導致非預期結果出現. 從1.6版本開始,為了能使autoDeclare
工作,必須要上下文中明確存在一個RabbitAdmin
,或者特定實例的引用必須要在容器中使用rabbitAdmin屬性中配置
.屬性
declarationRetries(declaration-retries)
描述
從1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer
有了這個新屬性. 命名空間屬性在1.5.x中可用.
用于設置被動聲明失敗時,重新嘗試的次數.被動聲明發生在當消費者啟動了或從多個隊列中消費時,初始化期間部分隊列還不可用的情況下.
當重試次數用完后,如果還是不能被動聲明配置隊列,那么上面的missingQueuesFatal屬性將控制容器行為. 默認: 3次重試 (4 次嘗試).
屬性
failedDeclarationRetryInterval(failed-declaration-retry-interval)
描述
從1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer
有了這個新屬性. 命名空間屬性在1.5.x中可用.
重新嘗試被動聲明的時間間隔. 被動聲明發生在當消費者啟動了或從多個隊列中消費時,初始化期間部分隊列還不可用的情況下. 默認: 5000 (5秒).
屬性
retryDeclarationInterval(missing-queue-retry-interval)
描述
從1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer
有了這個新屬性. 命名空間屬性在1.5.x中可用.
如果配置隊列的一個子集在消費者初始化過程中可用,則消費者將從這些隊列中開始消費。消費者將被動地使用此間隔聲明丟失的隊列。
當這個間隔過去后,會再次使用declarationRetries 和 failedDeclarationRetryInterval.
如果還有缺失隊列,消費者在重新嘗試之前會等待此時間間隔.
這個過程會不停地進行到所有隊列可用. 默認: 60000 (1分鐘).
屬性
consumerTagStrategy(consumer-tag-strategy)
描述
從1.4.5版本開始,SimpleMessageListenerContainer
有了這個新屬性. 命名空間屬性在1.5.x中可用.
之間,只能使用中間件生成的consumer tags;盡管現在這仍是默認的配置,但現在你可以提供一個ConsumerTagStrategy的實現, 這樣就可為每個消費者創建獨特的tag.
屬性
idleEventInterval(idle-event-integer)
描述
從1.6版本開始,SimpleMessageListenerContainer
有了這個新屬性.
參考"Detecting Idle Asynchronous Consumers"章節.
3.1.16 監聽器并發
默認情況下,監聽器容器會啟動單個消費者來接收隊列中的消息.
當檢查前面章節中的表格時,你會發現有許多屬性可控制并發.最簡單的是concurrentConsumers
, 它會創建固定數量的消費者來并發處理消息.
在1.3.0版本之前,這只能在容器停止時才可設置.
從1.3.0版本開始,你可以動態調整 concurrentConsumers
屬性.如果容器運行時修改了,會根據新設置來調需要的消費者(添加或刪除).
此外,在容器中添加了一個新屬性 maxConcurrentConsumers
來基于工作負載來動態調整并發數.
它可與其它四個屬性一起工作: consecutiveActiveTrigger
, startConsumerMinInterval
, consecutiveIdleTrigger
, stopConsumerMinInterval
.
在默認設置的情況下,加大消費者的算法如下:
如果還沒有達到maxConcurrentConsumers
,如果現有消費者活動了10個連續周期且離最后消費者啟動至少消逝了10秒鐘,那么將啟動新的消費者. 如果消費者在txSize
* receiveTimeout
毫秒內至少收到一個消息,那么就認為此消費者是活動的.
在默認設置的情況下,減少消費者的算法如下:
如果有超過concurrentConsumers
數量的消費者在運行,且檢測到消費者連續超時(空閑)了10個周期,且最后一個消費者至少停止了60秒,那么消費者將停止.
超時依賴于receiveTimeout
和 txSize
屬性.當在txSize
* receiveTimeout
毫秒內未收到消息,則認為消費者是空閑的.
因此,當有默認超時(1秒)和 txSize為
4,那么在空閑40秒后,會認為消費者是空閑的并會停止(4超時對應1個空閑檢測).
實際上,如果整個容器空閑一段時間,消費者將只會被停止。這是因為broker將分享其在所有活躍的消費者的工作。
3.1.17 專用消費者
也是從1.3版本開始,監聽器容器可配置單個專用消費者; 這可以阻其它容器來消費隊列直到當前消費者退出.
這樣的容器的并發性必須是1。
當使用專用消費者時,其它容器會根據recoveryInterval
屬性來消費隊列, 如果嘗試失敗,會記錄一個 WARNing 信息.
3.1.18 監聽器容器隊列
1.3版本在監聽器容器中引入許多處理多個隊列的改善措施.
容器配置必須監聽至少一個隊列以上; 以前也是這樣的情況,但現在可以在運行時添加和刪除隊列了。當任何預先獲取的消息被處理后,容器將回收(取消和重新創建)。
參考方法addQueues
, addQueueNames
, removeQueues
and removeQueueNames
.當刪除隊列時,至少要保留一個隊列.
現在,只要有可用隊列消費者就會啟動 -先前如果沒有可用隊列,容器會停止.現在,唯一的問題是是否有可用隊列.如果只是部分隊列可用,容器會每60秒嘗試被動聲明(和消費)缺失隊列.
此外,如果消費才從broker中收到了通道(例如,隊列被刪除)消費者會嘗試重新恢復,重新恢復的消費會繼續處理來自其它配置隊列中的消息. 之前是隊列上的取消會取消整個消費者,最終容器會因缺失隊列而停止.
如果你想永久刪除隊列,你應該在刪除隊列的之前或之后更新容器,以避免消費.
3.1.19 恢復:從錯誤和代理失敗中恢復
介紹
Spring提供了一些關鍵的 (最流行的)高級特性來處理協議錯誤或中間件失敗時的恢復與自動重連接.
主要的重連接特性可通過CachingConnectionFactory
自身來開啟. 它也常有利于使用rabbitadmin自動聲明的特點.
除此之外, 如果你關心保證投遞,你也許需要在RabbitTemplate中使用channelTransacted
標記以及在SimpleMessageListenerContainer中使用AcknowledgeMode.AUTO
(或者自己來手動應答) .
RabbitAdmin
組件在啟動時可聲明交換器,隊列,綁定.它是通過ConnectionListener懶執行的
,因此如果啟動時broker不存在,也沒有關系.
Connection
第一次使用時(如.發送消息) ,監聽器會被觸發,admin功能也會應用.這種在監聽器中自動聲明的好處是,如果連接出于任何原因斷開了,(如. broker死了,網絡中斷問題.),它們會在下次有需要的時候重新應用.
這種方式的隊列聲明必須要有固定的名稱;要么是明確聲明,要么是由框架生成AnonymousQueue
.匿名隊列是非持久化的,專用的,且自動刪除的.
重要
自動聲明只在cachingConnectionFactory
緩存模式是CHANNEL
(默認)才可用. 這種限制的存在是因為專用和自動刪除隊列是綁定到connection上的.
如果你在同步序列中使用RabbitTemplate時丟失了broker的連接,那么Spring AMQP會拋出一個AmqpException
(通常但并不總是AmqpIOException
).
我們不想隱藏存在問題的事實,因此你可以捕獲并對異常進行處理.如果你懷疑連接丟失了,而且這不是你的錯,那么最簡單的事情就是執行再次嘗試操作. 重試操作可以手動進行,也可以使用Spring Retry來處理重試(強制或聲明).
Spring Retry 提供了兩個AOP攔截器并提供非常靈活的方式來指定retry的參數(嘗試的次數,異常類型, 補償算法等等.). Spring AMQP同時也提供了一些方便的工廠bean來創建Spring Retry攔截器, 你可以使用強類型回調接口來實現恢復邏輯.參考Javadocs和 StatefulRetryOperationsInterceptor
和StatelessRetryOperationsInterceptor
的屬性來了解更多詳情.
如果沒有事務,或者如果一個事務是在重試回調中啟動的話,則無狀態重試是適當的。注意,相對于有狀態重試,無狀態重試只是簡單配置和分析,如果存在一個正在進行的事務必須回滾或肯定會回滾的話, 這種無狀態重試則是不合適的.
在事務中間掉下來的連接與回退有同樣的效果, 所以對于事務開始于堆棧上的重連接來說,有狀態重試通常是最佳選擇(so for reconnection where the transaction is started higher up the stack, stateful retry is usually the best choice).
從1.3版本開始,提供了builder API來幫助在Java中使用這些攔截器(或者在 @Configuration
類中),例如:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只有部分retry特性能通過這種方式,更加高級的特性需要在RetryTemplate
中配置.
參考Spring Retry Javadocs 來了解可用策略,配置的完整信息.
消息監聽器和異步情況
如果 MessageListener
因業務異常而失敗,異常可由消息監聽器容器來處理,然后它會繼續回去監聽其它信息.如果失敗是由于掉下的連接引起的(非業務異常),那么監聽此消費者的監聽器將退出和重啟.
SimpleMessageListenerContainer
可以無逢地進行處理,并且它會在日志中記錄監聽器即將重啟.
事實上,它會循環不斷地嘗試重新啟動消費者,只有當消費者有非常糟糕的行為時,才會放棄。一個副作用是,如果broker在容器啟動時關閉,它將會繼續嘗試直到建立一個連接。
業務異常處理, 相對于協議錯誤和連接丟失,它可能需要更多考慮和一些自定義配置,特別是處于事務或 容器應答時.
在2.8.x版本之前, RabbitMQ對于死信行為沒有定義,因此默認情況下,一個因拒絕或因業務異常導致回退的消息可循環往復地重新分發.
要限制客戶端的重新分發的次數,一個選擇是在監聽器的通知鏈中添加一個StatefulRetryOperationsInterceptor
. 攔截器有一個實現了自定義死信動作的恢復回調:
什么是適合你的特定的環境。
另一個選擇是設置容器的rejectRequeued屬性為false. 這會導致丟棄所有失敗的消息.當使用RabbitMQ 2.8.x+時,這也有利于傳遞消息到一個死的信件交換。
或者,你可以拋出一個AmqpRejectAndDontRequeueException
;這會阻止消息重新入列,不管defaultRequeueRejected
屬性設置的是什么.
通常情況下,可以組合使用這兩種技術 在通知鏈中使用StatefulRetryOperationsInterceptor
, 在此處是MessageRecover
拋出AmqpRejectAndDontRequeueException
. MessageRecover
會一直調用,直到耗盡了所有重試.
默認MessageRecoverer
只是簡單的消費錯誤消息,并發出WARN消息.在這種情況下,消息是通過應答的,且不會發送到死信交換器中.
從1.3版本開始,提供了一個新的RepublishMessageRecoverer
,它允許在重試次數耗盡后,發布失敗消息:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
.build();
}
RepublishMessageRecoverer
會使用消息頭的額外信息來發布,這些信息包括異常信息,棧軌跡,原始交換器和路由鍵.額外的頭可通過創建其子類和覆蓋additionalHeaders()
方法來添加.
Spring Retry 可以非常靈活地決定哪些異常可調用重試. 默認配置是對所有異常都進行重試.用戶異常可以包裝在ListenerExecutionFailedException
中,我們需要確保分類檢查異常原因. 默認的分類只是看頂部級別的異常。
從 Spring Retry 1.0.3開始, BinaryExceptionClassifier
有一個屬性traverseCauses
(默認為false
). 當當為true時,它將遍歷異常的原因,直到它找到一個匹配或沒有原因。
要使用分類重試,需要使用一個SimpleRetryPolicy
,其構造函數將接受最大嘗試次數,Exception的Map,以及一個boolean值(traverseCauses),且還需要將此策略注入給RetryTemplate
.
3.1.20 調試
Spring AMQP 提供廣泛的日志記錄,尤其是在DEBUG級別.
如果你想在應用程序和broker之間監控AMQP協議,你可以使用像WireShark的工具, 它有一個插件可用于解碼協議.
另一個選擇是, RabbitMQ java client自身攜帶了一個非常有用的工具類:Tracer
.當以main方式運行時,默認情況下,它監聽于5673 ,并連接本地的5672端口.
只需要簡單的運行它,并修改你的連接工廠配置,將其連接到本地的5673端口. 它就會在控制臺中顯示解碼的協議信息.參考Tracer
javadocs 來了解詳細信息.
3.2 Logging Subsystem AMQP Appenders
框架為多個流行的日志系統提供了日志appenders:
- log4j (從Spring AMQP1.1版本開始)
- logback (從Spring AMQP1.4版本開始)
- log4j2 (從Spring AMQP1.6版本開始)
appenders使用正常機制為為子系統配置,可用屬性參照下面的規定。
3.2.1 共同屬性
下面的屬性對于所有appenders都可用:
Table 3.4. 共同Appender屬性
Property | Default | Description |
---|
exchangeName | logs | 用于發布日志事件的交換器名稱. |
exchangeType | topic | 發布日志事件的交換器類型- 只在appender聲明了交換器的情況下才需要. 參考declareExchange . |
routingKeyPattern | %c.%p | 日志子系統生成路由鍵的模式格式. |
applicationId |
| Application ID - 如果模式包含 %X{applicationId},則將其添加到路由鍵 . |
senderPoolSize | 2 | 用于發布日志事件的線程數目. |
maxSenderRetries | 30 | 當broker不可用時或有某些錯誤時,重試的次數. 延時重試像: N ^ log(N) , N 表示重試次數. |
addresses |
| 一個逗號分隔的broker地址列表: host:port[,host:port]* -覆蓋host 和 port . |
host | localhost | 要連接RabbitMQ的主機. |
port | 5672 | |
virtualHost | / | 要連接的RabbitMQ虛擬主機. |
username | guest | 要連接RabbitMQ的用戶. |
password | guest | 要連接RabbitMQ的用戶密碼. |
contentType | text/plain | 日志消息的content-type屬性 .
|
contentEncoding |
| 日志消息的content-encoding屬性. |
declareExchange | false | 當appender啟動時,是否需要聲明配置的交換器.也可參考 durable 和autoDelete . |
durable | true | 當declareExchange 為 true ,durable 標志才會設置此值. |
autoDelete | false | 當 declareExchange 為true , auto delete 標志才會設置此值. |
charset | null | 當將字符串轉成byte[]時要使用的編碼,默認為null (使用系統默認字符集).如果當前平臺上不支持此字符集,將回退到使用系統字符集. |
deliveryMode | PERSISTENT | PERSISTENT 或 NON_PERSISTENT 用于決定RabbitMQ是否應該持久化消息. |
generateId | false | 用于確定messageId 屬性是否需要設置成唯一值. |
clientConnectionProperties | null | 一個逗號分隔的key:value 對,它是連接RabbitMQ時設置的自定義客戶端屬性 |
3.2.2 Log4j Appender
樣例log4j.properties片斷.
log4j.appender.amqp.addresses=foo:5672,bar:5672
log4j.appender.amqp=org.springframework.amqp.rabbit.log4j.AmqpAppender
log4j.appender.amqp.applicationId=myApplication
log4j.appender.amqp.routingKeyPattern=%X{applicationId}.%c.%p
log4j.appender.amqp.layout=org.apache.log4j.PatternLayout
log4j.appender.amqp.layout.ConversionPattern=%d %p %t [%c] - <%m>%n
log4j.appender.amqp.generateId=true
log4j.appender.amqp.charset=UTF-8
log4j.appender.amqp.durable=false
log4j.appender.amqp.deliveryMode=NON_PERSISTENT
log4j.appender.amqp.declareExchange=true
3.2.3 Log4j2 Appender
樣例 log4j2.xml 片斷.
<Appenders>
...
<RabbitMQ name="rabbitmq"
addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
charset="UTF-8"
senderPoolSize="3" maxSenderRetries="5">
</RabbitMQ>
</Appenders>
3.2.4 Logback Appender
樣例 logback.xml 片斷.
<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
</layout>
<addresses>foo:5672,bar:5672</addresses>
<abbreviation>36</abbreviation>
<applicationId>myApplication</applicationId>
<routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>false</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<declareExchange>true</declareExchange>
</appender>
3.2.5 定制Messages
每個appenders都可以子類化,以允許你在發布前修改消息.
Customizing the Log Messages.
public class MyEnhancedAppender extends AmqpAppender {
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
}
3.2.6 定制客戶端屬性
簡化 String 屬性
每個appender都支持在RabbitMQ連接中添加客戶端屬性.
log4j.
log4j.appender.amqp.clientConnectionProperties=foo:bar,baz:qux
logback.
<appender name="AMQP"...>...
<clientConnectionProperties>foo:bar,baz:qux</clientConnectionProperties>
...</appender>
log4j2.
<Appenders>
...
<RabbitMQname="rabbitmq"...clientConnectionProperties="foo:bar,baz:qux"...</RabbitMQ></Appenders>
這些屬性是逗號分隔的key:value
隊列表; 鍵和值不能包含逗號或 冒號.
當RabbitMQ Admin UI中查看連接上,你會看到這些屬性.
使用 log4j 和 logback appenders, appenders 可以是子類化的, 允許你在連接建立前,修改客戶連接屬性:
定制客戶端連接屬性.
public class MyEnhancedAppender extends AmqpAppender {
private String foo;
@Override
protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
clientProperties.put("foo", this.foo);
}
public void setFoo(String foo) {
this.foo = foo;
}
}
對于 log4j2, 添加 log4j.appender.amqp.foo=bar
到log4j.properties 來設置發展.
對于logback, 在logback.xml中添加 <foo>bar</foo>
.
當然,對于像這個例子中簡單的String 屬性,可以使用先前的技術;
子類允許更豐富的屬性(如添加 Map
的numeric 屬性).
使用log4j2, 子類是不被支持的,因為 log4j2 使用靜態工廠方法.
3.3 樣例應用程序
3.3.1 介紹
Spring AMQP Samples 項目包含了兩個樣例應用程序. 第一個簡單的"Hello World" 示例演示了同步和異步消息的處理. 它為理解基礎部分提供了一個很好的開端.
第二個基于股票交易的例子演示了真實應用程序中的交互場景.在本章中,我們會每個示例進行快速瀏覽,使您可以專注于最重要的組成部分.
這兩個例子都是基于Maven的,因此你可以直接將它們導入任何支持Maven的IDE中(如. SpringSource Tool Suite).
3.3.2 Hello World
介紹
Hello World示例演示了同步和異步消息處理.你可以導入spring-rabbit-helloworld 示例到IDE中并跟隨下面的討論.
同步例子
在src/main/java 目錄中,導航到org.springframework.amqp.helloworld 包中.
打開HelloWorldConfiguration 類,你可以注意到它包含了@Configuration 類級注解和一些@Bean 方法級注解.
這是Spring 的基于Java的配置.你可進一步的了解here.
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =null;
connectionFactory =new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
配置中同樣也包含了RabbitAdmin的實例
, 它會默認查找類型為Exchange, Queue, 或 Binding的bean并在broker中進行聲明.
事實上,"helloWorldQueue" bean是在HelloWorldConfiguration 中生成的,因為它是 Queue的實例.
@Bean
public Queue helloWorldQueue() {
returnnew Queue(this.helloWorldQueueName);
}
重看"rabbitTemplate"bean配置,你會看到它將helloWorldQueue的名稱設成了"queue"屬性(用于接收消息) 以及"routingKey" 屬性(用于發送消息).
現在,我們已經探索了配置,讓我們看看實際上使用這些組件的代碼。
首先,從同一個包內打開Producer類。它包含一個用于創建Spring ApplicationContext的main()方法.
publicstaticvoid main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在上面的例子中你可以看到, 取回的AmqpTemplate用來發送消息.因為客戶端代碼應該盡可能地依賴于接口,因此類型是AmqpTemplate而不是RabbitTemplate.
即使在HelloWorldConfiguration中創建的bean是RabbitTemplate的實例,依賴于接口則意味著這端代碼更具有便攜性(portable) (配置可以獨立于代碼進行修改).
因為convertAndSend() 方法是通過模板來調用的,因此模板會將調用委派給它的MessageConverter實例.在這種情況下,它默認使用的是SimpleMessageConverter,但也可以在HelloWorldConfiguration中為"rabbitTemplate"指定其它的實現.
現在打開Consumer類. 它實際上共享了同一個配置基類,這意味著它將共享"rabbitTemplate" bean. 這就是為什么我們要使用"routingKey" (發送) 和"queue" (接收)來配置模板的原因.
正如你在Section 3.1.4, “AmqpTemplate”中看到的,你可以代替在發送方法中傳遞routingKey參數,代替在接收方法中傳遞queue 參數. Consumer 代碼基本上是Producer的鏡子,只不過調用的是receiveAndConvert()而非convertAndSend()方法.
publicstaticvoid main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
你如果運行Producer,然后再運行Consumer, 在控制臺輸出中,你應該能看到消息"Received: Hello World"
異步示例
我們已經講解了同步Hello World樣例, 是時候移動到一個稍微先進,但更強大的選擇上了.稍微修改一下代碼,Hello World 樣例就可以可以提供異步接收的示例了,又名 Message-driven POJOs. 事實上,有一個子包明確地提供了這種功能: org.springframework.amqp.samples.helloworld.async.
再一次地我們將從發送端開始. 打開ProducerConfiguration類可注意到它創建了一個"connectionFactory"和"rabbitTemplate" bean.
這次,由于配置是專用于消息發送端,因此我們不需要任何隊列定義,RabbitTemplate只須設置routingKey屬性.
回想一下,消息是發送到交換器上的而不是直接發到隊列上的. AMQP默認交換器是無名稱的direct類型交換器.
所有隊列都是通過使用它們的名稱作為路由鍵綁定到默認交換器上的.這就是為什么在這里我們只提供路由鍵的原因.
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于這個示例展示的是異步消息處理,生產方設計為連續發送消息(盡管類似于同步版本中的 message-per-execution模型,但不太明顯,實際上它是消息驅動消費者)負責連續發送消息的組件是作為ProducerConfiguration類中的內部類來定義的,每3秒執行一次.
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
你不必要完全了解這些細節,因為真正的關注點是接收方(我們馬上就會講解).然而,如果你還熟悉Spring 3.0 任務調度支持,你可從here這里來了解.
簡短故事是:在 ProducerConfiguration 中的"postProcessor" bean使用調度器來注冊了任務.
現在,讓我們轉向接收方. 為強調 Message-driven POJO 行為,將從對消息起反應的組件開始.
此類被稱為HelloWorldHandler.
publicclass HelloWorldHandler {
publicvoid handleMessage(String text) {
System.out.println("Received: " + text);
}
}
相當明顯的, 這是一個POJO. 它沒有繼承任何基類,它沒有實現任何接口,它甚至不包含任何導入. 它將通過Spring AMQP MessageListenerAdapter來適配MessageListener接口.然后適配器可配置在SimpleMessageListenerContainer上.
在這個例子中,容器是在ConsumerConfiguration類中創建的.你可以看到POJO是包裝在適配器中的.
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer是一個Spring生命周期組件,默認會自動啟動.如果你看了Consumer類的內部,你會看到main()方法中除了一行啟動創建ApplicationContext的代碼外,其它什么都沒有.
Producer的main()方法也只有一行啟動,因為以 @Scheduled注解的組件會自動開始執行.你可以任何順序來啟動Producer 和Consumer,你會看每秒就會發送消息和接收到消息.
3.3.3 股票交易(Stock Trading)
Stock Trading 示例演示了比Hello World示例更高級的消息場景.然而,配置卻是很相似的 - 只是有一點復雜.
由于我們已經詳細講解了Hello World配置,因此在這里我們將重點關注不一樣的東西. 有一個服務器發送市場數據(股票報價)到Topic交換器中.
然后,客戶端可訂閱市場數據,即通過使用路由模式(如. "app.stock.quotes.nasdaq.*")來綁定隊列(e.g. "app.stock.quotes.nasdaq.*").
這個例子的其它主要功能是 有一個請求回復“股票交易”的互動,它是由客戶發起并由服務器來處理的. 這涉及到一個私有的“回復(replyTo)”隊列,發送客戶端的信息在請求消息中。
服務器的核心配置在RabbitServerConfiguration類中(位于 org.springframework.amqp.rabbit.stocks.config.server 包中).
它繼承了 AbstractStockAppRabbitConfiguration. 這是服務器和客戶端定義常用資源的地方,包括市場數據Topic交換器(其名稱為app.stock.marketdata) 以及服務器公開股票交易的隊列(其名稱為app.stock.request).
在那個公共配置文件中,你會看到在RabbitTemplate上配置了一個JsonMessageConverter.
服務器特有配置由2部分組成.首先,它在RabbitTemplate上配置了市場數據交換器,這樣在發送消息時,就不必提供交換器名稱.它是通過基礎配置類中的抽象回調方法中定義做到這一點的.
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次, 聲明了股票請求隊列.在這里,它不需要任何明確的綁定,因為它將以它自己的名稱作為路由鍵來綁定到無名稱的默認交換器上.正如先前提到的,AMQP規范定義了此種行為.
@Beanpublic Queue stockRequestQueue() {
returnnew Queue(STOCK_REQUEST_QUEUE_NAME);
}
現在你已經看過了服務器的AMQP資源配置,導航到src/test/java目錄下的org.springframework.amqp.rabbit.stocks包.在那里你會實際的 提供了main()方法的Server類.
它基于server-bootstrap.xml 創建了一個ApplicationContext.在那里,你會看到發布虛假市場數據的調度任務.
那個配置依賴于Spring 3.0的"task"命名空間支持.bootstrap配置文件也導入了其它一些文件.最令人關注的是位于src/main/resources目錄下的server-messaging.xml.在那里,你會看到"messageListenerContainer" bean,它負責處理股票交易請求.
最后在看一下定義在src/main/resources目錄下的server-handlers.xml,其中定義了一個 "serverHandler" bean.這個bean是ServerHandler類的實例,它是Message-driven POJO 的好例子,它也有發送回復消息的能力.
注意,它自身并沒有與框架或任何AMQP概念耦合.它只是簡單地接受TradeRequest并返回一個TradeResponse.
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
現在我們已經看了服務端的重要配置和代碼,讓我們轉向客戶端.最佳起點是從 org.springframework.amqp.rabbit.stocks.config.client 包下的RabbitClientConfiguration開始.
注意,它聲明了兩個不帶明確參數的隊列.
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
那些是私有隊列, 唯一名稱會自動自成.客戶端會用第一個生成的隊列來綁定由服務端公開的市場交換器.
記住在AMQP中,消費者與隊列交互,而生產者與交換器交互. 隊列和交換器之間的綁定指示broker從給定的交換器中投遞或路由什么消息給隊列.
由于市場交換器是一個Topic交換器,綁定可通過路由正則表達式來表達.
RabbitClientConfiguration聲明了一個Binding對象,其對象是通過BindingBuilder的便利API來生成的.
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
注意,實際值已經在屬性文件(src/main/resources目錄下的"client.properties")中外部化了,因此我們使用Spring的@Value 注解來注入值.這通常是一個好主意,否則值就會硬編碼在類中,沒有修改就沒有重新編譯.
在這種情況下,通過修改綁定中的路由正則表達式,可很容易地運行多個版本的Client.讓我們立即嘗試.
啟動運行org.springframework.amqp.rabbit.stocks.Server然后再運行 org.springframework.amqp.rabbit.stocks.Client.你將會看到NASDAQ股票的交易報價,因為關聯stocks.quote.pattern 鍵的值在client.properties中是app.stock.quotes.nasdaq.
現在,保持現有Server 和Client 運行,將其屬性值修改為app.stock.quotes.nyse.再啟動第二個Client實例.你會看到第一個client仍然接收NASDAQ 報價,而第二個client接收的NYSE報價. 你可以改變模式,獲取所有的股票報價或個別股票的報價。
最后一個我們將暴露的特性是從客戶端的角度來看待請求-回復交互.記住我們已經看了ServerHandler,它會接受TradeRequest對象并返回TradeResponse對象. 客戶端相應的代碼是 RabbitStockServiceGateway(位于org.springframework.amqp.rabbit.stocks.gateway 包).為發送消息,它會委派給RabbitTemplate.
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
thrownew AmqpException(e);
}
return message;
}
});
}
注意,在發送消息前,它設置了"replyTo"地址. 這提供了隊列,此隊列是由上面的"traderJoeQueue" bean 定義生成的. 以下是StockServiceGateway類的@Bean定義.
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果你沒有運行服務器和客戶端,現在就啟動它們. 嘗試使用100 TCKR的格式來發送請求.經過一個簡短的人工延遲來模擬“處理”請求,你應該看到一個確認消息出現在客戶端上。
3.4 測試支持
3.4.1 介紹
為異步程序寫集成測試比測試簡單程序更復雜. 當引入了@RabbitListener這樣的注解時,這尤其更加復雜.
現在的問題是發送消息后,如何來驗證, 監聽器按預期收到了消息.
框架自身帶有許多單元測試和集成測試;有些使用mocks, 另外一些使用真實的RabbitMQ broker來集成測試. 您可以參照測試場景的一些想法進行測試。
Spring AMQP 1.6版本引入了sring-rabbit-test
jar ,它提供一些測試復雜場景的測試. 預計這一項目將隨著時間的推移進行擴展,但我們需要社會反饋以幫助測試。請使用JIRA問題或GitHub提供這樣的反饋。
3.4.2 Mockito Answer<?> 實現
當前有兩個Answer<?>
實現可幫助測試:
第一個, LatchCountDownAndCallRealMethodAnswer
提供了返回null和計數下一個鎖存器的Answer<Void>
.
LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());
...
assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
第二個, LambdaAnswer<T>
提供了一種調用真正方法的機制,并提供機會來返回定制結果(基于InvocationOnMock和結果
).
public class Foo {
public String foo(String foo) {
return foo.toUpperCase();
}
}
Foo foo = spy(new Foo());
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
.when(foo).foo(anyString());
assertEquals("FOOFOO", foo.foo("foo"));
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
.when(foo).foo(anyString());
assertEquals("FOOfoo", foo.foo("foo"));
doAnswer(new LambdaAnswer<String>(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(foo).foo(anyString());
assertEquals("foofoo", foo.foo("foo"));
When using Java 7 or earlier:
doAnswer(new LambdaAnswer<String>(true, new ValueToReturn<String>() {
@Overridepublic String apply(InvocationOnMock i, String r) {
return r + r;
}
})).when(foo).foo(anyString());
3.4.3 @RabbitListenerTest and RabbitListenerTestHarness
在你的@Configuration
類中使用 @RabbitListenerTest
(它也會通過@EnableRabbit來啟用@RabbitListener
探測).注解會導致框架使用子類RabbitListenerTestHarness來代替標準RabbitListenerAnnotationBeanPostProcessor.
RabbitListenerTestHarness
通過兩種方式來增強監聽器 - 將其包裝進Mockito Spy
, 啟用了Mockito
存根和驗證操作.也可在監聽器添加Advice
來啟用對參數,結果或異常的訪問.
你可以控制哪一個(或兩個)來在@RabbitListenerTest上啟用屬性. 后者用于訪問調用中更為低級數據- 它也支持測試線程阻塞,直到異步監聽器被調用.
重要
final
@RabbitListener
不能被發現或通知 ,同時,只有帶id屬性的監聽器才能發現或通知.
讓我們看一些例子.
使用spy:
@Configuration
@RabbitListenerTest
public class Config {
@Bean
public Listener listener() {
returnnew Listener();
}
...
}
public class Listener {
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness;
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo" ));
Listener listener = this.harness.getSpy("foo");
assertNotNull(listener);
verify(listener).foo("foo");
}
@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);
LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
doAnswer(answer).when(listener).foo(anyString(), anyString());
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}
}

| 將harness 注入進測試用于,這樣我們可訪問spy. |

| 獲取spy引用,這樣我們可以驗證是否按預期在調用. 由于這是一個發送和接收操作,因此不必暫停測試線程,因為RabbitTemplate 在等待回復時已經暫停過了. |

| 在這種情況下,我們只使用了發送操作,因為我們需要一個門閂來等待對容器線程中監聽器的異步調用. 我們使用了Answer<?> 一個實現來幫助完成. |

| 配置spy來調用Answer . |
使用捕獲建議:
@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {
}
@Service
public class Listener {
private boolean failed;
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
thrownew RuntimeException(foo);
}
failed = false;
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness; 
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo" ));
InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS);
assertThat(invocationData.getArguments()[0], equalTo("foo"));
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}
@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage());
}
}

| 將harness注入進測試用例,以便我們能獲取spy的訪問. |

| 使用 harness.getNextInvocationDataFor() 來獲取調用數據 - 在這種情況下,由于它處于request/reply 場景,因為沒有必要等待,因為測試線程在RabbitTemplate 中等待結果的時候,已經暫停過了. |

| 我們可以驗證參數和結果是否與預期一致 |

| 這次,我們需要時間來等待數據,因為它在容器線程上是異步操作,我們需要暫停測試線程. |

| 當監聽器拋出異常時,可用調用數據中的throwable 屬性 |
4. Spring 整合- 參考
這部分參考文檔提供了在Spring集成項目中提供AMQP支持的快速介紹.
4.1 Spring 整合AMQP支持4.1.1 介紹
Spring Integration 項目包含了構建于Spring AMQP項目之上的AMQP 通道適配器(Channel Adapters)和網關(Gateways). 那些適配器是在Spring集成項目中開發和發布的.在Spring 集成中, "通道適配器" 是單向的,而網關是雙向的(請求-響應).
我們提供了入站通道適配器(inbound-channel-adapter),出站通道適配器( outbound-channel-adapter), 入站網關(inbound-gateway),以及出站網關(outbound-gateway).
由于AMQP 適配器只是Spring集成版本的一部分,因為文檔也只針對Spring集成發行版本部分可用.
作為一個品酒師,我們只快速了解這里的主要特征。
4.1.2 入站通道適配器
為了從隊列中接收AMQP消息,需要配置一個個<inbound-channel-adapter>
<amqp:inbound-channel-adapter channel="fromAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>
4.1.3 出站通道適配器
為了向交換器發送AMQP消息,需要配置一個<outbound-channel-adapter>. 除了交換名稱外,還可選擇提供路由鍵。
<amqp:outbound-channel-adapter channel="toAMQP" exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>
4.1.4 入站網關
為了從隊列中接收AMQP消息,并回復到它的reply-to地址,需要配置一個<inbound-gateway>.
<amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>
4.1.5 出站網關
為了向交換器發送AMQP消息并接收來自遠程客戶端的響應,需要配置一個<outbound-gateway>.
除了交換名稱外,還可選擇提供路由鍵。
<amqp:outbound-gateway request-channel="toAMQP" reply-channel="fromAMQP" exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>
除了這份參考文檔,還有其它資源可幫助你了解AMQP.
5.1 進階閱讀
對于那些不熟悉AMQP的人來說, 規范 實際上具有相當的可讀性.
這當然是信息的權威來源,對于熟悉規范的人來說,Spring AMQP代碼應該很容易理解。
目前RabbitMQ實現基于2.8.x版本,并正式支持AMQP 0.8和9.1。我們推薦閱讀9.1文檔。
在RabbitMQ Getting Started 頁面上,還有許多精彩的文章,演示, 博客. 因為當前只有Spring AMQP實現, 但我們仍建議將其作為了解所有中間件相關概念的起點.