<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    3.1.15 消息監聽器容器配置

    有相當多的配置SimpleMessageListenerContainer 相關事務和服務質量的選項,它們之間可以互相交互.當使用命名空間來配置<rabbit:listener-container/>時,

    下表顯示了容器屬性名稱和它們等價的屬性名稱(在括號中).

    未被命名空間暴露的屬性,以`N/A`表示.
    Table 3.3. 消息監聽器容器的配置選項
    屬性
    (group)
    描述

    只在使用命名空間時可用. 當經過指定時,類型為Collection<MessageListenerContainer> 的bean會使用它這個名稱進行注冊,容器會將每個<listener/> 元素添加到集合中.這就允許,如,通過迭代集合來啟動/停止該組容器.如果多個<listener-container/> 元素有相同的group值, 那么集合中的容器是所有指定容器的總和.
    屬性

    channelTransacted(channel-transacted)

    描述

    Boolean標志,用于表示在事務中的所有消息都應該應答(手動地或自動地)

    屬性

    acknowledgeMode(acknowledge)

    描述

    • NONE: 不發送應答(與channelTransacted=true不兼容). RabbitMQ 稱此為 "autoack",因為broker假設消費者沒有采取任何動作應答了所有消息.
    • MANUAL:監聽器必須調用Channel.basicAck()來手動應答所有消息.
    • AUTO :容器會自動應答所有消息, 除非MessageListener 拋出了異常. 注意acknowledgeMode 與channelTransacted 是互補的- 如果通道是事務的,那么broker除了ack外,還需要提交通知. 這是默認模式. 也可參考txSize.

    屬性

    transactionManager(transaction-manager)

    描述

    監聽器操作的外部事務管理器. 也是與channelTransacted互補的 -如果通道是事務的,那么其事務會用外部事務來同步.

    屬性

    prefetchCount(prefetch)

    描述

    可接受來自broker一個socket幀中的消息數目. 數值越大,消息分發速度就越快, 但無序處理的風險更高. 
    如果acknowledgeMode為NONE它會忽略. 它會增長,如果有必要,須匹配txSize.

    屬性

    shutdownTimeout(N/A)

    描述
    當容器關閉時(例如. 關閉ApplicationContext),用于等待正在傳輸消息的上限時間.默認是5秒. 當達到上限時,如果通道是非事務的,消息將被丟棄.

    屬性

    txSize(transaction-size)

    描述
    acknowledgeMode 為AUTO時,在發送ack前(等待每一個消息達到接收超時設置),容器將試圖處理這個數目的消息 . 當事務通道提交后也是一樣的.如果prefetchCount 小于txSize,prefetchCount 會增長以匹配txSize.

    屬性
    receiveTimeout(receive-timeout)
    描述
    等待消息的最大時間.如果acknowledgeMode=NONE 這只有很小的效果 - 容器只旋轉一輪,并要求另一個消息. 當在txSize>1的事務通道中有最大效果,因為它能導致已經消費但沒有應答的消息直接超時過期.

    屬性

    autoStartup(auto-startup)

    描述

    用于當ApplicationContext啟動時(作為SmartLifecycle 回調的一部分,發生在所有bean初始化之后)是否同時啟動容器的標志.默認為true,如果在容器啟動時,中間件暫不可用,那么可將其設為false,隨后在確認中間件已啟動后,手動調用start() 方法來啟動.

    屬性

    phase(phase)

    描述

    當autoStartup為true時,容器中的生命周期階段應該啟動和停止.值越小,容器就會越早啟動,并越晚停止.默認值Integer.MAX_VALUE,這意味著容器會越晚啟動并盡快停止.

    屬性

    adviceChain(advice-chain)

    描述

    應用于監聽器執行路徑上的AOP Advice數組. 它可用于額外的橫切關注點,如broker死亡事件中的自動重試. 
    注意,只要broker還活著,出現AMQP錯誤后的重新連接是由CachingConnectionFactory來處理的.

    屬性

    taskExecutor(task-executor)

    描述

    執行監聽器調用程序的Spring TaskExecutor引用(或標準JDK 1.5+ Executor). 默認是 SimpleAsyncTaskExecutor, 用于內部管理線程.

    屬性

    errorHandler(error-handler)

    描述

    在MessageListener執行期間,用于處理未捕獲異常的ErrorHandler策略的引用. 默認是 ConditionalRejectingErrorHandler.

    屬性

    concurrentConsumers(concurrency)

    描述
    每個監聽器上初始啟動的并發消費者數目. 參考Section 3.1.16, “Listener Concurrency”.

    屬性

    axConcurrentConsumers(max-concurrency)

    描述

    啟動并發消費者的最大數目,如果有必要,可以按需設置.必須要大于或等于concurrentConsumers

    參考Section 3.1.16, “Listener Concurrency”.

    屬性

    startConsumerMinInterval(min-start-interval)

    描述
    啟動新消費者之間的時間間隔,單位為毫秒. 
    參考 Section 3.1.16, “Listener Concurrency”. 默認 10000 (10 秒).

    屬性

    stopConsumerMinInterval(min-stop-interval)

    描述

    停止消費者的時間間隔, 由于最后一個消費者已經停止了,這時可以檢測到空閑消費者.

    參考Section 3.1.16, “Listener Concurrency”. 默認 60000 (1 分鐘).

    屬性

    consecutiveActiveTrigger(min-consecutive-active)

    描述

    消費者收到連續消息的最小數量,當考慮啟動一個新的消費者,接收不會發生超時。也會受txsize影響參考 Section 3.1.16, “Listener Concurrency”. 默認為10.

    屬性

    consecutiveIdleTrigger(min-consecutive-idle)

    描述

    在考慮停止一個消費者,消費者必須經歷的最小接收超時時間,也會受txsize影響

    參考 Section 3.1.16, “Listener Concurrency”. 默認為10.

    屬性

    connectionFactory(connection-factory)

    描述
    connectionFactory的引用; 當使用XML命名空間配置時,默認引用bean名稱是"rabbitConnectionFactory".

    屬性

    defaultRequeueRejected(requeue-rejected)

    描述

    用以確定因監聽器拋出異常而遭受拒絕的消息是否需要重新入列. 默認為true.
    屬性
    recoveryInterval(recovery-interval)
    描述
    如果消費者不是因致命原因而導致啟動失敗,則用于設置重啟消費者的時間間隔,單位毫秒. 默認為5000.與recoveryBackOff互斥.
    屬性
    recoveryBackOff(recovery-back-off)
    描述
    果消費者不是因致命原因而導致啟動失敗,則用于指定 BackOff 啟動消費者的時間間隔. 默認是每5秒無限重試的FixedBackOff. 與recoveryInterval互斥.
    屬性
    exclusive(exclusive)
    描述
    用于確定容器中的單個消費者是否具有獨占訪問隊列的權限。當其值為1時,容器的concurrency必須為1時。如果另一個消費者有獨占訪問權,容器將根據恢復時間間隔或恢復后退試圖恢復消費者。
    當使用命名空間時,此屬性會隨著隊列名稱出現在<rabbit:listener/>元素中。默認為false。
    屬性
    rabbitAdmin(admin)
    描述
    監聽器監聽了多個自動刪除隊列時,當其發現在啟動時隊列消失了,容器會使用RabbitAdmin 來聲明消失的隊列,并進行交換器的相關綁定.
    如果此元素配置成使用條件聲明(參考
    the section called “Conditional Declaration”), 容器必須使用配置的admin來聲明那些元素.
    這里指定的admin;只在使用帶有條件聲明的自動刪除隊列時才需要. 如果你不想在容器啟動前聲明自動刪除隊列,可在amdin中將 
    auto-startup 設為false. 默認情況下,RabbitAdmin 會聲明所有非條件元素.
    屬性
    missingQueuesFatal(missing-queues-fatal)
    描述

    從1.3.5版本開始,SimpleMessageListenerContainer 就有了這個新屬性.

    當設為true (默認值)時,如果配置隊列在中間件都不可用, 這會視為是致命的.這會導致應用程序上下文初始化失敗; 同時, 當容器還在運行時刪除了隊列,也會發生這樣的情況.
    默認情況下,消費者進行3次重試來連接隊列(5秒時間間隔),如果所有嘗試都失敗了則會停止容器.

    在以前版本中,此選項是不可配置的.

    當設置為false, 再做了三次重試后,容器將進入恢復模式, 這也伴隨其它問題,如中間件已經發生了故障.

    容器會根據recoveryInterval 屬性來嘗試恢復. 在每次恢復嘗試期間,每個消費者會以5秒的時間間隔來嘗試4次被動聲明. 這個過程將無限期地繼續下去(譯者注:有點沖突)。

    你也可以使用properties bean來為所有的容器全局設置屬性,如下所示:

    <util:properties id="spring.amqp.global.properties">
    <prop key="smlc.missing.queues.fatal">false</prop>
    </util:properties>

    如果容器明確的設置了 missingQueuesFatal 屬性,全局屬性的值對此容器將無效.

    默認的retry屬性(5秒間隔3次重試)可通過下面的屬性值來覆蓋.

    屬性
    mismatchedQueuesFatal(mismatched-queues-fatal)
    描述

    這是1.6版本中加入的新屬性.當容器啟動時,如果此屬性為true (默認為false), 容器會檢查上下文中聲明的隊列是否中間件中存在的隊列是否一致.

    如果屬性不匹配(如. auto-delete) 或參數 (e.g. x-message-ttl) 存在, 容器 (和應用程序上下文) 會拋出致命異常而導致啟動失敗.如果是在恢復期間檢測到的問題,容器會停止.

    必須在上下文中存在單個RabbitAdmin (或使用rabbitAdmin屬性在容器上特別配置);否則此屬性必須為false.

    如果在初始啟動期間,中間件還不可用,容器啟動后,當建立連接時會檢查條件.

    重要

    該檢查針對的是上下文的所有隊列,而不僅僅是特定監聽器配置使用的隊列.如果你希望只檢查容器使用的隊列,你需要為這個容器配置單獨的RabbitAdmin , 并使用rabbitAdmin 屬性為其提供一個引用. 

    參考“Conditional Declaration”章節來了解更多信息.

    屬性

    autoDeclare(auto-declare)
    描述

    從1.4版本開始, SimpleMessageListenerContainer 引入了這個新屬性.

    當設置為true 時(默認值),容器會使用RabbitAdmin 來重新聲明所有 AMQP 對象(Queues, Exchanges, Bindings).
    如果在啟動期間探測到至少有一個隊列缺失了,可能因為它是自動刪除隊列或過期隊列,但不管隊列缺失是基于什么原因,重新聲明仍會進行處理(譯者注:太浪費了).
    要禁用這種行為, 可設置其屬性為false. 但需要注意的是,如果所有隊列都缺失了(譯者注:全部還是部分),容器會啟動失敗.

    在1.6版本之前,如果在上下文中存在多個admin,容器會隨機選擇一個.反之,如果沒有admin,它會從內部創建一個.
    無論是哪種情況,這都將導致非預期結果出現
    . 從1.6版本開始,為了能使autoDeclare 工作,必須要上下文中明確存在一個RabbitAdmin,或者特定實例的引用必須要在容器中使用rabbitAdmin屬性中配置.

    屬性

    declarationRetries(declaration-retries)
    描述

    1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer 有了這個新屬性. 命名空間屬性在1.5.x中可用.

    用于設置被動聲明失敗時,重新嘗試的次數.被動聲明發生在當消費者啟動了或從多個隊列中消費時,初始化期間部分隊列還不可用的情況下.
    當重試次數用完后,如果還是不能被動聲明配置隊列,那么上面的missingQueuesFatal屬性將控制容器行為. 默認: 3次重試 (4 次嘗試).

    屬性

    failedDeclarationRetryInterval(failed-declaration-retry-interval)
    描述

    1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer 有了這個新屬性. 命名空間屬性在1.5.x中可用.

    重新嘗試被動聲明的時間間隔. 被動聲明發生在當消費者啟動了或從多個隊列中消費時,初始化期間部分隊列還不可用的情況下. 默認: 5000 (5秒).

    屬性

    retryDeclarationInterval(missing-queue-retry-interval)
    描述

    1.4.3, 1.3.9版本開始,SimpleMessageListenerContainer 有了這個新屬性. 命名空間屬性在1.5.x中可用.

    如果配置隊列的一個子集在消費者初始化過程中可用,則消費者將從這些隊列中開始消費。消費者將被動地使用此間隔聲明丟失的隊列。

    當這個間隔過去后,會再次使用declarationRetries 和 failedDeclarationRetryInterval

    如果還有缺失隊列,消費者在重新嘗試之前會等待此時間間隔. 

    這個過程會不停地進行到所有隊列可用. 默認: 60000 (1分鐘).

    屬性

    consumerTagStrategy(consumer-tag-strategy)
    描述

    從1.4.5版本開始,SimpleMessageListenerContainer 有了這個新屬性. 命名空間屬性在1.5.x中可用.

    之間,只能使用中間件生成的consumer tags;盡管現在這仍是默認的配置,但現在你可以提供一個ConsumerTagStrategy的實現, 這樣就可為每個消費者創建獨特的tag.

    屬性

    idleEventInterval(idle-event-integer)
    描述

    從1.6版本開始,SimpleMessageListenerContainer 有了這個新屬性. 

    參考"Detecting Idle Asynchronous Consumers"章節.

    3.1.16 監聽器并發

    默認情況下,監聽器容器會啟動單個消費者來接收隊列中的消息.

    當檢查前面章節中的表格時,你會發現有許多屬性可控制并發.最簡單的是concurrentConsumers, 它會創建固定數量的消費者來并發處理消息.

    在1.3.0版本之前,這只能在容器停止時才可設置.

    從1.3.0版本開始,你可以動態調整 concurrentConsumers 屬性.如果容器運行時修改了,會根據新設置來調需要的消費者(添加或刪除).

    此外,在容器中添加了一個新屬性 maxConcurrentConsumers 來基于工作負載來動態調整并發數. 

    它可與其它四個屬性一起工作: consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval.
    在默認設置的情況下,加大消費者的算法如下:

    如果還沒有達到maxConcurrentConsumers ,如果現有消費者活動了10個連續周期且離最后消費者啟動至少消逝了10秒鐘,那么將啟動新的消費者. 如果消費者在txSize * receiveTimeout 毫秒內至少收到一個消息,那么就認為此消費者是活動的.

    在默認設置的情況下,減少消費者的算法如下:

    如果有超過concurrentConsumers 數量的消費者在運行,且檢測到消費者連續超時(空閑)了10個周期,且最后一個消費者至少停止了60秒,那么消費者將停止.
    超時依賴于receiveTimeout 和 txSize 屬性.當在txSize * receiveTimeout 毫秒內未收到消息,則認為消費者是空閑的.
    因此,當有默認超時(1秒)和 txSize為4,那么在空閑40秒后,會認為消費者是空閑的并會停止(4超時對應1個空閑檢測).


    實際上,如果整個容器空閑一段時間,消費者將只會被停止。這是因為broker將分享其在所有活躍的消費者的工作。

    3.1.17 專用消費者

    也是從1.3版本開始,監聽器容器可配置單個專用消費者; 這可以阻其它容器來消費隊列直到當前消費者退出. 

    這樣的容器的并發性必須是1。

    當使用專用消費者時,其它容器會根據recoveryInterval 屬性來消費隊列, 如果嘗試失敗,會記錄一個 WARNing 信息.

    3.1.18 監聽器容器隊列

    1.3版本在監聽器容器中引入許多處理多個隊列的改善措施.

    容器配置必須監聽至少一個隊列以上以前也是這樣的情況,但現在可以在運行時添加和刪除隊列了。當任何預先獲取的消息被處理后,容器將回收(取消和重新創建)。
    參考方法
    addQueuesaddQueueNamesremoveQueuesand removeQueueNames.當刪除隊列時,至少要保留一個隊列.

    現在,只要有可用隊列消費者就會啟動 -先前如果沒有可用隊列,容器會停止.現在,唯一的問題是是否有可用隊列.如果只是部分隊列可用,容器會每60秒嘗試被動聲明(和消費)缺失隊列.

    此外,如果消費才從broker中收到了通道(例如,隊列被刪除)消費者會嘗試重新恢復,重新恢復的消費會繼續處理來自其它配置隊列中的消息. 之前是隊列上的取消會取消整個消費者,最終容器會因缺失隊列而停止.

    如果你想永久刪除隊列,你應該在刪除隊列的之前或之后更新容器,以避免消費.

    3.1.19 恢復:從錯誤和代理失敗中恢復

    介紹

    Spring提供了一些關鍵的 (最流行的)高級特性來處理協議錯誤或中間件失敗時的恢復與自動重連接. 

    主要的重連接特性可通過CachingConnectionFactory 自身來開啟. 它也常有利于使用rabbitadmin自動聲明的特點.
    除此之外, 如果你關心保證投遞,你也許需要在RabbitTemplate中使用
    channelTransacted 標記以及在SimpleMessageListenerContainer中使用AcknowledgeMode.AUTO (或者自己來手動應答) .

    交換器、隊列和綁定的自動聲明

    RabbitAdmin 組件在啟動時可聲明交換器,隊列,綁定.它是通過ConnectionListener懶執行的,因此如果啟動時broker不存在,也沒有關系. 
    Connection 第一次使用時(如.發送消息) ,監聽器會被觸發,admin功能也會應用.這種在監聽器中自動聲明的好處是,如果連接出于任何原因斷開了,(如. broker死了,網絡中斷問題.),它們會在下次有需要的時候重新應用.

    這種方式的隊列聲明必須要有固定的名稱;要么是明確聲明,要么是由框架生成AnonymousQueue.匿名隊列是非持久化的,專用的,且自動刪除的.

    重要

    自動聲明只在cachingConnectionFactory 緩存模式是CHANNEL (默認)才可用. 這種限制的存在是因為專用和自動刪除隊列是綁定到connection上的.

    同步操作中的故障和重試選項

    如果你在同步序列中使用RabbitTemplate時丟失了broker的連接,那么Spring AMQP會拋出一個AmqpException (通常但并不總是AmqpIOException).
    我們不想隱藏存在問題的事實,因此你可以捕獲并對異常進行處理.
    如果你懷疑連接丟失了,而且這不是你的錯,那么最簡單的事情就是執行再次嘗試操作. 重試操作可以手動進行,也可以使用Spring Retry來處理重試(強制或聲明).

    Spring Retry 提供了兩個AOP攔截器并提供非常靈活的方式來指定retry的參數(嘗試的次數,異常類型, 補償算法等等.). Spring AMQP同時也提供了一些方便的工廠bean來創建Spring Retry攔截器, 你可以使用強類型回調接口來實現恢復邏輯.參考Javadocs和 StatefulRetryOperationsInterceptor 和StatelessRetryOperationsInterceptor 的屬性來了解更多詳情. 

    如果沒有事務,或者如果一個事務是在重試回調中啟動的話,則無狀態重試是適當的。注意,相對于有狀態重試,無狀態重試只是簡單配置和分析,如果存在一個正在進行的事務必須回滾或肯定會回滾的話, 這種無狀態重試則是不合適的.
    在事務中間掉下來的連接與回退有同樣的效果, 所以對于事務開始于堆棧上的重連接來說,有狀態重試通常是最佳選擇(so for reconnection where the transaction is started higher up the stack, stateful retry is usually the best choice).

    從1.3版本開始,提供了builder API來幫助在Java中使用這些攔截器(或者在 @Configuration 類中),例如:

    @Bean
    public StatefulRetryOperationsInterceptor interceptor() {
    	return RetryInterceptorBuilder.stateful()
    			.maxAttempts(5)
    			.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
    			.build();
    }

    只有部分retry特性能通過這種方式,更加高級的特性需要在RetryTemplate 中配置. 

    參考Spring Retry Javadocs 來了解可用策略,配置的完整信息.

    消息監聽器和異步情況

    如果 MessageListener 因業務異常而失敗,異常可由消息監聽器容器來處理,然后它會繼續回去監聽其它信息.如果失敗是由于掉下的連接引起的(非業務異常),那么監聽此消費者的監聽器將退出和重啟.

    SimpleMessageListenerContainer 可以無逢地進行處理,并且它會在日志中記錄監聽器即將重啟. 

    事實上,它會循環不斷地嘗試重新啟動消費者,只有當消費者有非常糟糕的行為時,才會放棄。一個副作用是,如果broker在容器啟動時關閉,它將會繼續嘗試直到建立一個連接。

    業務異常處理, 相對于協議錯誤和連接丟失,它可能需要更多考慮和一些自定義配置,特別是處于事務或 容器應答時.
    在2.8.x版本之前, RabbitMQ對于死信行為沒有定義,因此默認情況下,一個因拒絕或因業務異常導致回退的消息可循環往
    復地重新分發.
    要限制客戶端的重新分發的次數,一個選擇是在監聽器的通知鏈中添加一個
    StatefulRetryOperationsInterceptor. 攔截器有一個實現了自定義死信動作的恢復回調: 

    什么是適合你的特定的環境。

    另一個選擇是設置容器的rejectRequeued屬性為false. 這會導致丟棄所有失敗的消息.當使用RabbitMQ 2.8.x+時,這也有利于傳遞消息到一個死的信件交換。

    或者,你可以拋出一個AmqpRejectAndDontRequeueException;這會阻止消息重新入列,不管defaultRequeueRejected 屬性設置的是什么.

    通常情況下,可以組合使用這兩種技術 在通知鏈中使用StatefulRetryOperationsInterceptor, 在此處是MessageRecover 拋出AmqpRejectAndDontRequeueExceptionMessageRecover 會一直調用,直到耗盡了所有重試.
    默認MessageRecoverer 只是簡單的消費錯誤消息,并發出WARN消息.在這種情況下,消息是通過應答的,且不會發送到死信交換器中.

    從1.3版本開始,提供了一個新的RepublishMessageRecoverer,它允許在重試次數耗盡后,發布失敗消息:

    @Bean
    RetryOperationsInterceptor interceptor() {
    	return RetryInterceptorBuilder.stateless()
    			.maxAttempts(5)
    			.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
    			.build();
    }

    RepublishMessageRecoverer 會使用消息頭的額外信息來發布,這些信息包括異常信息,棧軌跡,原始交換器和路由鍵.額外的頭可通過創建其子類和覆蓋additionalHeaders()方法來添加.

    重試的異常分類

    Spring Retry 可以非常靈活地決定哪些異常可調用重試. 默認配置是對所有異常都進行重試.用戶異常可以包裝在ListenerExecutionFailedException 中,我們需要確保分類檢查異常原因默認的分類只是看頂部級別的異常。

    從 Spring Retry 1.0.3開始BinaryExceptionClassifier 有一個屬性traverseCauses (默認為false). 當當為true時,它將遍歷異常的原因,直到它找到一個匹配或沒有原因。

    要使用分類重試,需要使用一個SimpleRetryPolicy ,其構造函數將接受最大嘗試次數,Exception的Map,以及一個boolean值(traverseCauses),且還需要將此策略注入給RetryTemplate.

    3.1.20 調試

    Spring AMQP 提供廣泛的日志記錄,尤其是在DEBUG級別.

    如果你想在應用程序和broker之間監控AMQP協議,你可以使用像WireShark的工具, 它有一個插件可用于解碼協議.
    另一個選擇是, RabbitMQ java client自身攜帶了一個非常有用的工具類:Tracer.當以main方式運行時,默認情況下,它監聽于5673 ,并連接本地的5672端口.
    只需要簡單的運行它,并修改你的連接工廠配置,將其連接到本地的5673端口. 它就會在控制臺中顯示解碼的協議信息.參考Tracer javadocs 來了解詳細信息.


    3.2 Logging Subsystem AMQP Appenders

    框架為多個流行的日志系統提供了日志appenders:

    • log4j (從Spring AMQP1.1版本開始)
    • logback (從Spring AMQP1.4版本開始)
    • log4j2 (從Spring AMQP1.6版本開始)

    appenders使用正常機制為為子系統配置,可用屬性參照下面的規定。

    3.2.1 共同屬性

    下面的屬性對于所有appenders都可用:

    Table 3.4. 共同Appender屬性


    PropertyDefaultDescription
    exchangeName
    logs

    用于發布日志事件的交換器名稱.

    exchangeType
    topic

    發布日志事件的交換器類型- 只在appender聲明了交換器的情況下才需要. 參考declareExchange.

    routingKeyPattern
    %c.%p

    日志子系統生成路由鍵的模式格式.

    applicationId

    Application ID - 如果模式包含 %X{applicationId},則將其添加到路由鍵.

    senderPoolSize
    2

    用于發布日志事件的線程數目.

    maxSenderRetries
    30

    當broker不可用時或有某些錯誤時,重試的次數. 延時重試像: N ^ log(N)N 表示重試次數.

    addresses

    一個逗號分隔的broker地址列表: host:port[,host:port]* -覆蓋host 和 port.

    host
    localhost

    要連接RabbitMQ的主機.

    port
    5672
    virtualHost
    /

    要連接的RabbitMQ虛擬主機.

    username
    guest

    要連接RabbitMQ的用戶.

    password
    guest

    要連接RabbitMQ的用戶密碼.

    contentType
    text/plain

    日志消息的content-type屬性

    contentEncoding

     日志消息的content-encoding屬性.

    declareExchange
    false

    當appender啟動時,是否需要聲明配置的交換器.也可參考 durable 和autoDelete.

    durable
    true

    declareExchange 為 true ,durable 標志才會設置此值.

    autoDelete
    false

    當 declareExchange 為true , auto delete 標志才會設置此值.

    charset
    null

    當將字符串轉成byte[]時要使用的編碼,默認為null (使用系統默認字符集).如果當前平臺上不支持此字符集,將回退到使用系統字符集.

    deliveryMode
    PERSISTENT

    PERSISTENT 或 NON_PERSISTENT 用于決定RabbitMQ是否應該持久化消息.

    generateId
    false

    用于確定messageId 屬性是否需要設置成唯一值.

    clientConnectionProperties
    null

    一個逗號分隔的key:value 對,它是連接RabbitMQ時設置的自定義客戶端屬性

    3.2.2 Log4j Appender

    樣例log4j.properties片斷. 

    log4j.appender.amqp.addresses=foo:5672,bar:5672
    log4j.appender.amqp=org.springframework.amqp.rabbit.log4j.AmqpAppender
    log4j.appender.amqp.applicationId=myApplication
    log4j.appender.amqp.routingKeyPattern=%X{applicationId}.%c.%p
    log4j.appender.amqp.layout=org.apache.log4j.PatternLayout
    log4j.appender.amqp.layout.ConversionPattern=%d %p %t [%c] - <%m>%n
    log4j.appender.amqp.generateId=true
    log4j.appender.amqp.charset=UTF-8
    log4j.appender.amqp.durable=false
    log4j.appender.amqp.deliveryMode=NON_PERSISTENT
    log4j.appender.amqp.declareExchange=true

    3.2.3 Log4j2 Appender

    樣例 log4j2.xml 片斷

    <Appenders>
        ...
        <RabbitMQ name="rabbitmq"
            addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
            exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
            applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
            contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
            charset="UTF-8"
            senderPoolSize="3" maxSenderRetries="5">
        </RabbitMQ>
    </Appenders>

    3.2.4 Logback Appender

    樣例 logback.xml 片斷

    <appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
        <layout>
            <pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
        </layout>
        <addresses>foo:5672,bar:5672</addresses>
        <abbreviation>36</abbreviation>
        <applicationId>myApplication</applicationId>
        <routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
        <generateId>true</generateId>
        <charset>UTF-8</charset>
        <durable>false</durable>
        <deliveryMode>NON_PERSISTENT</deliveryMode>
        <declareExchange>true</declareExchange>
    </appender>

    3.2.5 定制Messages

    每個appenders都可以子類化,以允許你在發布前修改消息.

    Customizing the Log Messages. 

    public class MyEnhancedAppender extends AmqpAppender {
    
        @Override
     public Message postProcessMessageBeforeSend(Message message, Event event) {
            message.getMessageProperties().setHeader("foo", "bar");
            return message;
        }
    
    }

    3.2.6 定制客戶端屬性

    簡化 String 屬性

    每個appender都支持在RabbitMQ連接中添加客戶端屬性.

    log4j. 

    log4j.appender.amqp.clientConnectionProperties=foo:bar,baz:qux

    logback. 

    <appender name="AMQP"...>...
    <clientConnectionProperties>foo:bar,baz:qux</clientConnectionProperties>
    ...</appender>

    log4j2. 

    <Appenders>
        ...
        <RabbitMQname="rabbitmq"...clientConnectionProperties="foo:bar,baz:qux"...</RabbitMQ></Appenders>

    這些屬性是逗號分隔的key:value 隊列表; 鍵和值不能包含逗號或 冒號.

    當RabbitMQ Admin UI中查看連接上,你會看到這些屬性.

    Log4j和logback先進技術

    使用 log4j 和 logback appenders, appenders 可以是子類化的, 允許你在連接建立前,修改客戶連接屬性:

    定制客戶端連接屬性. 

    public class MyEnhancedAppender extends AmqpAppender {
    
        private String foo;
    
        @Override
    protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
            clientProperties.put("foo", this.foo);
        }
    
        public void setFoo(String foo) {
            this.foo = foo;
        }
    
    }

    對于 log4j2, 添加 log4j.appender.amqp.foo=bar 到log4j.properties 來設置發展.

    對于logback, 在logback.xml中添加 <foo>bar</foo> .

    當然,對于像這個例子中簡單的String 屬性,可以使用先前的技術;

    子類允許更豐富的屬性(如添加 Map 的numeric 屬性).

    使用log4j2, 子類是不被支持的,因為 log4j2 使用靜態工廠方法.

    3.3 樣例應用程序

    3.3.1 介紹

     Spring AMQP Samples 項目包含了兩個樣例應用程序. 第一個簡單的"Hello World" 示例演示了同步和異步消息的處理. 它為理解基礎部分提供了一個很好的開端.
    第二個基于股票交易的例子演示了真實應用程序中的交互場景.在本章中,我們會每個示例進行快速瀏覽,
    使您可以專注于最重要的組成部分.
    這兩個例子都是基于Maven的,因此你可以直接將它們導入任何支持Maven的IDE中(如. 
    SpringSource Tool Suite).

    3.3.2 Hello World

    介紹

    Hello World示例演示了同步和異步消息處理.你可以導入spring-rabbit-helloworld 示例到IDE中并跟隨下面的討論.

    同步例子

    src/main/java 目錄中,導航到org.springframework.amqp.helloworld 包中.

    打開HelloWorldConfiguration 類,你可以注意到它包含了@Configuration 類級注解和一些@Bean 方法級注解. 

    這是Spring 的基于Java的配置.你可進一步的了解here.

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =null;
        connectionFactory =new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; }

    配置中同樣也包含了RabbitAdmin的實例, 它會默認查找類型為Exchange, Queue, 或 Binding的bean并在broker中進行聲明.
    事實上,"helloWorldQueue" bean是在HelloWorldConfiguration 中生成的,因為它是 Queue的實例.

    @Bean
    public Queue helloWorldQueue() {
        returnnew Queue(this.helloWorldQueueName);
    }

    重看"rabbitTemplate"bean配置,你會看到它將helloWorldQueue的名稱設成了"queue"屬性(用于接收消息) 以及"routingKey" 屬性(用于發送消息).

    現在,我們已經探索了配置,讓我們看看實際上使用這些組件的代碼。
    首先,從同一個包內打開Producer類。它包含一個用于創建Spring ApplicationContext的main()方法.


    publicstaticvoid main(String[] args) {

        ApplicationContext context =
            new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class); amqpTemplate.convertAndSend("Hello World"); System.out.println("Sent: Hello World"); }

    在上面的例子中你可以看到, 取回的AmqpTemplate用來發送消息.因為客戶端代碼應該盡可能地依賴于接口,因此類型是AmqpTemplate而不是RabbitTemplate.
    即使在HelloWorldConfiguration中創建的bean是RabbitTemplate的實例,依賴于接口則意味著這端代碼更具有便攜性(portable) (配置可以獨立于代碼進行修改).
    因為convertAndSend() 方法是通過模板來調用的,因此模板會將調用委派給它的MessageConverter實例.在這種情況下,它默認使用的是SimpleMessageConverter,但也可以在HelloWorldConfiguration中為"rabbitTemplate"指定其它的實現.

    現在打開Consumer類. 它實際上共享了同一個配置基類,這意味著它將共享"rabbitTemplate" bean. 這就是為什么我們要使用"routingKey" (發送) 和"queue" (接收)來配置模板的原因.
    正如你在Section 3.1.4, “AmqpTemplate”中看到的,你可以代替在發送方法中傳遞routingKey參數,代替在接收方法中傳遞queue 參數.  Consumer 代碼基本上是Producer的鏡子,只不過調用的是receiveAndConvert()而非convertAndSend()方法.

    publicstaticvoid main(String[] args) {
        ApplicationContext context =
            new AnnotationConfigApplicationContext(RabbitConfiguration.class);
        AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
        System.out.println("Received: " + amqpTemplate.receiveAndConvert());
    }

    你如果運行Producer,然后再運行Consumer, 在控制臺輸出中,你應該能看到消息"Received: Hello World" 

    異步示例

    我們已經講解了同步Hello World樣例, 是時候移動到一個稍微先進,但更強大的選擇上了.稍微修改一下代碼,Hello World 樣例就可以可以提供異步接收的示例了,又名 Message-driven POJOs. 事實上,有一個子包明確地提供了這種功能: org.springframework.amqp.samples.helloworld.async.

    再一次地我們將從發送端開始. 打開ProducerConfiguration類可注意到它創建了一個"connectionFactory"和"rabbitTemplate" bean.
    這次,由于配置是專用于消息發送端,因此我們不需要任何隊列定義,RabbitTemplate只須設置routingKey屬性.
    回想一下,消息是發送到交換器上的而不是直接發到隊列上的. AMQP默認交換器是無名稱的direct類型交換器.
    所有隊列都是通過使用它們的名稱作為路由鍵綁定到默認交換器上的.這就是為什么在這里我們只提供路由鍵的原因.

    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.helloWorldQueueName);
        return template;
    }

    由于這個示例展示的是異步消息處理,生產方設計為連續發送消息(盡管類似于同步版本中的 message-per-execution模型,但不太明顯,實際上它是消息驅動消費者)負責連續發送消息的組件是作為ProducerConfiguration類中的內部類來定義的,每3秒執行一次

    static class ScheduledProducer {
    
       @Autowired
     private volatile RabbitTemplate rabbitTemplate;
    
        private final AtomicInteger counter = new AtomicInteger();
    
        @Scheduled(fixedRate = 3000)
     public void sendMessage() {
            rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
        }
    }

    你不必要完全了解這些細節,因為真正的關注點是接收方(我們馬上就會講解).然而,如果你還熟悉Spring 3.0 任務調度支持,你可從here這里來了解.
    簡短故事是:在 ProducerConfiguration 中的"postProcessor" bean使用調度器來注冊了任務.

    現在,讓我們轉向接收方. 為強調 Message-driven POJO 行為,將從對消息起反應的組件開始. 

    此類被稱為HelloWorldHandler.

    publicclass HelloWorldHandler {
    
        publicvoid handleMessage(String text) {
            System.out.println("Received: " + text);
        }
    
    }

    相當明顯的, 這是一個POJO. 它沒有繼承任何基類,它沒有實現任何接口,它甚至不包含任何導入. 它將通過Spring AMQP MessageListenerAdapter來適配MessageListener接口.然后適配器可配置在SimpleMessageListenerContainer上.
    在這個例子中,容器是在ConsumerConfiguration類中創建的.你可以看到POJO是包裝在適配器中的.

    @Bean
    public SimpleMessageListenerContainer listenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueName(this.helloWorldQueueName);
        container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
        return container;
    }

    SimpleMessageListenerContainer是一個Spring生命周期組件,默認會自動啟動.如果你看了Consumer類的內部,你會看到main()方法中除了一行啟動創建ApplicationContext的代碼外,其它什么都沒有.
    Producer的main()方法也只有一行啟動,因為以 @Scheduled注解的組件會自動開始執行.你可以任何順序來啟動Producer 和Consumer,你會看每秒就會發送消息和接收到消息.

    3.3.3 股票交易(Stock Trading)

    Stock Trading 示例演示了比Hello World示例更高級的消息場景.然而,配置卻是很相似的 - 只是有一點復雜.
    由于我們已經詳細講解了Hello World配置,因此在這里我們將重點關注不一樣的東西. 有一個服務器發送市場數據(
    股票報價)到Topic交換器中.
    然后,客戶端可訂閱市場數據,即通過使用路由模式(如. "app.stock.quotes.nasdaq.*")來綁定隊列(e.g. "app.stock.quotes.nasdaq.*").
    這個例子的其它主要功能是 有一個
    請求回復“股票交易”的互動,它是由客戶發起并由服務器來處理的這涉及到一個私有的“回復(replyTo)”隊列,發送客戶端的信息在請求消息中。

    服務器的核心配置在RabbitServerConfiguration類中(位于 org.springframework.amqp.rabbit.stocks.config.server 包中).
    它繼承了 AbstractStockAppRabbitConfiguration. 這是服務器和客戶端定義常用資源的地方,包括市場數據Topic交換器(其名稱為app.stock.marketdata) 以及服務器公開股票交易的隊列(其名稱為app.stock.request).
    在那個公共配置文件中,你會看到在RabbitTemplate上配置了一個JsonMessageConverter.

    服務器特有配置由2部分組成.首先,它在RabbitTemplate上配置了市場數據交換器,這樣在發送消息時,就不必提供交換器名稱.它是通過基礎配置類中的抽象回調方法中定義做到這一點的.

    public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
    }

    其次, 聲明了股票請求隊列.在這里,它不需要任何明確的綁定,因為它將以它自己的名稱作為路由鍵來綁定到無名稱的默認交換器上.正如先前提到的,AMQP規范定義了此種行為.

    @Beanpublic Queue stockRequestQueue() {
        returnnew Queue(STOCK_REQUEST_QUEUE_NAME);
    }

    現在你已經看過了服務器的AMQP資源配置,導航到src/test/java目錄下的org.springframework.amqp.rabbit.stocks包.在那里你會實際的 提供了main()方法的Server類.
    它基于server-bootstrap.xml 創建了一個ApplicationContext.在那里,你會看到發布虛假市場數據的調度任務.

    那個配置依賴于Spring 3.0的"task"命名空間支持.bootstrap配置文件也導入了其它一些文件.最令人關注的是位于src/main/resources目錄下的server-messaging.xml.在那里,你會看到"messageListenerContainer" bean,它負責處理股票交易請求.
    最后在看一下定義在src/main/resources目錄下的server-handlers.xml,其中定義了一個 "serverHandler" bean
    .這個bean是ServerHandler類的實例,它是Message-driven POJO 的好例子,它也有發送回復消息的能力.
    注意,它自身并沒有與框架或任何AMQP概念耦合.它只是簡單地接受TradeRequest并返回一個TradeResponse.

    public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
    }

    現在我們已經看了服務端的重要配置和代碼,讓我們轉向客戶端.最佳起點是從 org.springframework.amqp.rabbit.stocks.config.client 包下的RabbitClientConfiguration開始. 

    注意,它聲明了兩個不帶明確參數的隊列.

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }
    
    @Bean
    public Queue traderJoeQueue() {
        return amqpAdmin().declareQueue();
    }

    那些是私有隊列, 唯一名稱會自動自成.客戶端會用第一個生成的隊列來綁定由服務端公開的市場交換器.
    記住在AMQP中,消費者與隊列交互,而生產者與交換器交互. 隊列和交換器之間的綁定指示broker從給定的交換器中投遞或路由什么消息給隊列.
    由于市場交換器是一個Topic交換器,綁定可通過路由正則表達式來表達. 

    RabbitClientConfiguration聲明了一個Binding對象,其對象是通過BindingBuilder的便利API來生成的.

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;
    
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
            marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    注意,實際值已經在屬性文件(src/main/resources目錄下的"client.properties")中外部化了,因此我們使用Spring的@Value 注解來注入值.這通常是一個好主意,否則值就會硬編碼在類中,沒有修改就沒有重新編譯.

    在這種情況下,通過修改綁定中的路由正則表達式,可很容易地運行多個版本的Client.讓我們立即嘗試.

    啟動運行org.springframework.amqp.rabbit.stocks.Server然后再運行 org.springframework.amqp.rabbit.stocks.Client.你將會看到NASDAQ股票的交易報價,因為關聯stocks.quote.pattern 鍵的值在client.properties中是app.stock.quotes.nasdaq.
    現在,保持現有Server 和Client 運行,將其屬性值修改為app.stock.quotes.nyse.再啟動第二個
    Client實例.你會看到第一個client仍然接收NASDAQ 報價,而第二個client接收的NYSE報價. 
    你可以改變模式,獲取所有的股票報價或個別股票的報價。

    最后一個我們將暴露的特性是從客戶端的角度來看待請求-回復交互.記住我們已經看了ServerHandler,它會接受TradeRequest對象并返回TradeResponse對象. 客戶端相應的代碼是 RabbitStockServiceGateway(位于org.springframework.amqp.rabbit.stocks.gateway 包).為發送消息,它會委派給RabbitTemplate.

    public void send(TradeRequest tradeRequest) {
        getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
                try {
                    message.getMessageProperties().setCorrelationId(
                        UUID.randomUUID().toString().getBytes("UTF-8"));
                }
                catch (UnsupportedEncodingException e) {
                    thrownew AmqpException(e);
                }
                return message;
            }
        });
    }

    注意,在發送消息前,它設置了"replyTo"地址. 這提供了隊列,此隊列是由上面的"traderJoeQueue" bean 定義生成的. 以下是StockServiceGateway類的@Bean定義.

    @Bean
    public StockServiceGateway stockServiceGateway() {
        RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
        gateway.setRabbitTemplate(rabbitTemplate());
        gateway.setDefaultReplyToQueue(traderJoeQueue());
        return gateway;
    }

    如果你沒有運行服務器和客戶端,現在就啟動它們. 嘗試使用100 TCKR的格式來發送請求.經過一個簡短的人工延遲來模擬“處理”請求,你應該看到一個確認消息出現在客戶端上。

    3.4 測試支持

    3.4.1 介紹

    為異步程序寫集成測試比測試簡單程序更復雜. 當引入了@RabbitListener這樣的注解時,這尤其更加復雜.

    現在的問題是發送消息后,如何來驗證, 監聽器按預期收到了消息.

    框架自身帶有許多單元測試和集成測試;有些使用mocks, 另外一些使用真實的RabbitMQ broker來集成測試. 您可以參照測試場景的一些想法進行測試。

    Spring AMQP 1.6版本引入了sring-rabbit-test jar ,它提供一些測試復雜場景的測試. 預計這一項目將隨著時間的推移進行擴展,但我們需要社會反饋以幫助測試。請使用JIRA問題或GitHub提供這樣的反饋。

    3.4.2 Mockito Answer<?> 實現

    當前有兩個Answer<?> 實現可幫助測試:

    第一個, LatchCountDownAndCallRealMethodAnswer 提供了返回null和計數下一個鎖存器的Answer<Void>.

    LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2);
    doAnswer(answer)
        .when(listener).foo(anyString(), anyString());
    
    ...
    
    assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));

    第二個, LambdaAnswer<T> 提供了一種調用真正方法的機制,并提供機會來返回定制結果(基于InvocationOnMock和結果).

    public class Foo {
    
        public String foo(String foo) {
            return foo.toUpperCase();
        }
    
    }
    Foo foo = spy(new Foo());
    
    doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
        .when(foo).foo(anyString());
    assertEquals("FOOFOO", foo.foo("foo"));
    
    doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
        .when(foo).foo(anyString());
    assertEquals("FOOfoo", foo.foo("foo"));
    
    doAnswer(new LambdaAnswer<String>(false, (i, r) ->
        "" + i.getArguments()[0] + i.getArguments()[0])).when(foo).foo(anyString());
    assertEquals("foofoo", foo.foo("foo"));

    When using Java 7 or earlier:

    doAnswer(new LambdaAnswer<String>(true, new ValueToReturn<String>() {
        @Overridepublic String apply(InvocationOnMock i, String r) {
            return r + r;
        }
    })).when(foo).foo(anyString());

    3.4.3 @RabbitListenerTest and RabbitListenerTestHarness

    在你的@Configuration 類中使用 @RabbitListenerTest (它也會通過@EnableRabbit來啟用@RabbitListener 探測).注解會導致框架使用子類RabbitListenerTestHarness來代替標準RabbitListenerAnnotationBeanPostProcessor.

    RabbitListenerTestHarness 通過兩種方式來增強監聽器 - 將其包裝進Mockito Spy, 啟用了Mockito 存根和驗證操作.也可在監聽器添加Advice 來啟用對參數,結果或異常的訪問.
    你可以控制哪一個(或兩個)來在
    @RabbitListenerTest啟用屬性
    . 后者用于訪問調用中更為低級數據- 它也支持測試線程阻塞,直到異步監聽器被調用.

    重要

    final @RabbitListener 不能被發現或通知 ,同時,只有帶id屬性的監聽器才能發現或通知.

    讓我們看一些例子.

    使用spy:

    @Configuration
    @RabbitListenerTest
    public class Config {
    
        @Bean
     public Listener listener() {
            returnnew Listener();
        }
    
        ...
    
    }
    
    public class Listener {
    
        @RabbitListener(id="foo", queues="#{queue1.name}")
     public String foo(String foo) {
            return foo.toUpperCase();
        }
    
        @RabbitListener(id="bar", queues="#{queue2.name}")
     public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
            ...
        }
    
    }
    
    public class MyTests {
    
        @Autowired
        private RabbitListenerTestHarness harness;
    1@Test
     public void testTwoWay() throws Exception {
            assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"    ));
    
            Listener listener = this.harness.getSpy("foo"); 2
            assertNotNull(listener);
            verify(listener).foo("foo");
        }
    
        @Test
    public void testOneWay() throws Exception {
            Listener listener = this.harness.getSpy("bar");
            assertNotNull(listener);
    
            LatchCountDownAndCallRealMethodAnswer answer = new LatchCountDownAndCallRealMethodAnswer(2); 3
            doAnswer(answer).when(listener).foo(anyString(), anyString());4
      this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
      this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
    
            assertTrue(answer.getLatch().await(10, TimeUnit.SECONDS));
            verify(listener).foo("bar", this.queue2.getName());
            verify(listener).foo("baz", this.queue2.getName());
        }
    
    }

    1

    將harness 注入進測試用于,這樣我們可訪問spy.

    2

    獲取spy引用,這樣我們可以驗證是否按預期在調用. 由于這是一個發送和接收操作,因此不必暫停測試線程,因為RabbitTemplate 在等待回復時已經暫停過了.

    3

    在這種情況下,我們只使用了發送操作,因為我們需要一個門閂來等待對容器線程中監聽器的異步調用. 我們使用了Answer<?> 一個實現來幫助完成.

    4

    配置spy來調用Answer.

    使用捕獲建議:
    @Configuration
    @ComponentScan
    @RabbitListenerTest(spy = false, capture = true)
    public class Config {
    
    }
    
    @Service
    public class Listener {
    
        private boolean failed;
    
        @RabbitListener(id="foo", queues="#{queue1.name}")
     public String foo(String foo) {
            return foo.toUpperCase();
        }
    
        @RabbitListener(id="bar", queues="#{queue2.name}")
    public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
            if (!failed && foo.equals("ex")) {
                failed = true;
                thrownew RuntimeException(foo);
            }
            failed = false;
        }
    
    }
    
    public class MyTests {
    
         @Autowired
      private RabbitListenerTestHarness harness; 1
      @Test
      public void testTwoWay() throws Exception {
            assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"  ));
    
            InvocationData invocationData =
                this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS); 2
            assertThat(invocationData.getArguments()[0], equalTo("foo"));     3
            assertThat((String) invocationData.getResult(), equalTo("FOO"));
        }
    
        @Test
      public void testOneWay() throws Exception {
            this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
            this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
            this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
    
            InvocationData invocationData =
                this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS); 4
            Object[] args = invocationData.getArguments();
            assertThat((String) args[0], equalTo("bar"));
            assertThat((String) args[1], equalTo(queue2.getName()));
    
            invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
            args = invocationData.getArguments();
            assertThat((String) args[0], equalTo("baz"));
    
            invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
            args = invocationData.getArguments();
            assertThat((String) args[0], equalTo("ex"));
            assertEquals("ex", invocationData.getThrowable().getMessage()); 5
        }
    
    }

    1

    將harness注入進測試用例,以便我們能獲取spy的訪問.

    2

    使用 harness.getNextInvocationDataFor() 來獲取調用數據 - 在這種情況下,由于它處于request/reply 場景,因為沒有必要等待,因為測試線程在RabbitTemplate 中等待結果的時候,已經暫停過了.

    3

    我們可以驗證參數和結果是否與預期一致

    4

    這次,我們需要時間來等待數據,因為它在容器線程上是異步操作,我們需要暫停測試線程.

    5

    當監聽器拋出異常時,可用調用數據中的throwable 屬性


    4. Spring 整合- 參考

    這部分參考文檔提供了在Spring集成項目中提供AMQP支持的快速介紹.

    4.1 Spring 整合AMQP支持4.1.1 介紹

    Spring Integration 項目包含了構建于Spring AMQP項目之上的AMQP 通道適配器(Channel Adapters)和網關(Gateways). 那些適配器是在Spring集成項目中開發和發布的.在Spring 集成中, "通道適配器" 是單向的,而網關是雙向的(請求-響應).
    我們提供了入站通道適配器(inbound-channel-adapter),出站通道適配器( outbound-channel-adapter), 入站網關(inbound-gateway),以及出站網關(outbound-gateway).

    由于AMQP 適配器只是Spring集成版本的一部分,因為文檔也只針對Spring集成發行版本部分可用. 

    作為一個品酒師,我們只快速了解這里的主要特征。

    4.1.2 入站通道適配器

    為了從隊列中接收AMQP消息,需要配置一個個<inbound-channel-adapter>

    <amqp:inbound-channel-adapter channel="fromAMQP" queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>

    4.1.3 站通道適配器
    為了向交換器發送AMQP消息,需要配置一個<outbound-channel-adapter>. 除了交換名稱外,還可選擇提供路由鍵。

    <amqp:outbound-channel-adapter channel="toAMQP" exchange-name="some.exchange"    routing-key="foo" amqp-template="rabbitTemplate"/>

    4.1.4 入站網關

    為了從隊列中接收AMQP消息,并回復到它的reply-to地址,需要配置一個<inbound-gateway>.

    <amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP"         queue-names="some.queue" connection-factory="rabbitConnectionFactory"/>

    4.1.5 出站網關

    為了向交換器發送AMQP消息并接收來自遠程客戶端的響應,需要配置一個<outbound-gateway>.

    除了交換名稱外,還可選擇提供路由鍵。

    <amqp:outbound-gateway request-channel="toAMQP" reply-channel="fromAMQP"       exchange-name="some.exchange" routing-key="foo" amqp-template="rabbitTemplate"/>

    5. 其它資源

    除了這份參考文檔,還有其它資源可幫助你了解AMQP.

    5.1 進階閱讀

    對于那些不熟悉AMQP的人來說,  規范 實際上具有相當的可讀性. 

    這當然是信息的權威來源,對于熟悉規范的人來說,Spring AMQP代碼應該很容易理解。
    目前RabbitMQ實現基于2.8.x版本,并正式支持AMQP 0.8和9.1。我們推薦閱讀9.1文檔。

    在RabbitMQ Getting Started 頁面上,還有許多精彩的文章,演示, 博客. 因為當前只有Spring AMQP實現, 但我們仍建議將其作為了解所有中間件相關概念的起點.




    posted on 2016-08-13 16:24 胡小軍 閱讀(6553) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 亚洲精品无码人妻无码| 免费一区二区无码东京热| 亚洲精品NV久久久久久久久久| 午夜免费福利小电影| 国产亚洲日韩在线a不卡| 亚洲国产精品久久66| 国产伦精品一区二区三区免费迷| 16女性下面无遮挡免费| 一区二区在线免费视频| 亚洲精品理论电影在线观看| 亚洲日本香蕉视频| 久久亚洲AV无码精品色午夜麻| 久久久久亚洲精品中文字幕| 免费大片在线观看网站| 国产免费久久精品| 亚洲精品国产精品乱码视色 | 亚洲日韩国产一区二区三区| 夜色阁亚洲一区二区三区| 成人免费看黄20分钟| 皇色在线视频免费网站| 91高清免费国产自产拍2021| 91精品国产免费入口| 免费无码黄动漫在线观看| 操美女视频免费网站| 亚洲精品无码久久不卡| 亚洲综合综合在线| 久久综合亚洲色一区二区三区| 亚洲影院在线观看| 亚洲视频免费一区| 黄网站色视频免费观看45分钟| 精品视频免费在线| 久久久久久夜精品精品免费啦| 日韩免费观看一区| 久久精品免费观看国产| 国产精品深夜福利免费观看| 亚洲精品国产va在线观看蜜芽| 亚洲欧洲自拍拍偷午夜色| 一级黄色免费大片| 免费可以在线看A∨网站| 暖暖日本免费在线视频| 亚洲乱码国产一区网址|