<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    3.1.9 Request/Reply 消息

    介紹

    AmqpTemplate 也提供了各種各樣的sendAndReceive 方法,它們接受同樣的參數選項(exchange, routingKey, and Message)來執行單向發送操作.
    這些方法對于request/reply 場景也是有用的,因為它們在發送前處理了必要的"reply-to"屬性配置,并能通過它在專用隊列(基于回復功能臨時創建的隊列)上監聽回復消息.

    類似的request/reply方法也是可用的, MessageConverter 可應用于請求和回復上.這些方法被稱為convertSendAndReceive.參考 AmqpTemplate 的JavaDoc來了解詳情.

    從1.5.0版本開始,每個sendAndReceive 方法變種都有一個接受CorrelationData的重載版本. 連同正確配置的連接工廠,這使得發布者可以確認發送方的操作.
    參考 the section called “Publisher Confirms and Returns” 來了解詳情.

    Reply 超時

    默認情況下,這些 send和receive方法會在5秒后超時并返回null. 這可以通過設置replyTimeout屬性來修改.
    從1.5版本開始,如果你設置了 mandatory 屬性為true (或特定消息上的 mandatory-expression 評估為true),如果消息不能投遞到隊列中,將拋出AmqpMessageReturnedException.
    這個 exception 有 returnedMessagereplyCodereplyText 屬性, 如同用于發送的exchangeroutingKey


    這個功能使用了發布者返回特性,可通過在CachingConnectionFactory設置publisherReturns 為true來啟用(參考the section called “Publisher Confirms and Returns”).
    此外,你不必在RabbitTemplate上注冊你自己的ReturnCallback.

    RabbitMQ Direct reply-to

    重要
    從3.4.0版本開始,RabbitMQ 服務器現在支持 Direct reply-to,基于主要原因,它消除了固定回復隊列(為了避免為每個請求創建臨時隊列).
    Spring AMQP 1.4.1 版本開始,Direct reply-to 就已經做為了默認使用(如果服務器支持的話),而不再創建臨時隊列.
    當沒有提供replyQueue(或設置名稱為amq.rabbitmq.reply-to), RabbitTemplate 會自動探測是否支持Direct reply-to, 要么使用它或使用臨時回復隊列來回退. 當使用Direct reply-to, reply-listener不是必需的,不應該被配置。

    Reply 監聽器仍然運行命名隊列(不是amq.rabbitmq.reply-to),允許控制并發回復.

    從.16版本開始,出于某些原因,你想為每個回復使用臨時的,專用的,自動刪除的隊列,你可以設置useTemporaryReplyQueues 屬性為true. 如果你設置了replyAddress,此屬性會被忽略.

    決定是否使用 direct reply-to,可以通過繼承RabbitTemplate 并覆蓋useDirectReplyTo()來修改. 

    此方法只在發出第一個請求時,調用一次.

    應答隊列的消息相關性

    當使用固定回復隊列時(不是amq.rabbitmq.reply-to), 必須要提供 correlation data,這樣回復才能關聯請求.參考RabbitMQ Remote Procedure Call (RPC).
    默認情況下,標準correlationId 屬性會用來持有correlation data. 然而,如果你想使用自定義屬性來持有correlation data, 你可在 <rabbit-template/>中設置 correlation-key 屬性.
    顯示設置屬性為correlationId 將與缺省屬性相同. 當然,客戶端和服務器對于correlation data必須要有相同的頭.

    Spring AMQP 1.1版本為這個data使用自定義屬性spring_reply_correlation.如果你想在當前版本中恢復這種行為,也許是為了保持1.1中的另一個應用程序的兼容性,你必須設置屬性以spring_reply_correlation。

    回復監聽器容器

    當使用3.4.0之前的Rabbit版本,每個回復都會使用一個新臨時隊列. 然而,可在模板上配置單個回復隊列, 這將更加高效,同時也允許你在隊列上設置參數.然而,在這種情況下,你必須提供<reply-listener/>子元素. 

    這個元素為回復隊列提供了監聽器容器, 以模板為監聽器.
    所有 Section 3.1.15, “Message Listener Container Configuration” 中的屬性都可以配置在<listener-container/> 元素中,除了connection-factory 和 message-converter(它們是模塊配置中繼承下來的).

    重要

    如果運行了多個應用程序實例或者使用了多個RabbitTemplate,那么你必須為每個都使用唯一的回復隊列- RabbitMQ 沒有在隊列中選擇消息的能力,如果它們都使用相同隊列,每個實例都將競爭的答復,而不一定是收到他們自己的。

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
    reply-queue="replies" reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
    </rabbit:template>

    由于容器和模板可共享一個連接工廠,它們不會共享一個通道,因此請求和回復不是在同一個事務中執行的(如果是事務的).

    重要

    在1.5.0版本之前, reply-address 屬性不可用,回復總是通過默認交換器和reply-queue作路由鍵來進行的. 現在這依然是默認的,但現在你可以指定新的reply-address 屬性. 
    reply-address 可以包含<exchange>/<routingKey> 形式的地址,回復將會路由到設定的exchange和路由到routing key綁定的隊列上. 
    reply-address 優先于 reply-queue<reply-listener> 必須配置為一個單獨的<listener-container> 組件, 當只使用reply-address 時,無論是reply-address 還是 reply-queue (在<listener-container>中的queue屬性) 必須指的是同一個隊列.

    在這個配置中,SimpleListenerContainer 用于接收回復; 而RabbitTemplate 將成為MessageListener. 當使用<rabbit:template/> 命名空間元素定義模板時, 正如上面所展示的, 分析器會定義容器,并將模板作為監聽器進行包裝.

    重要

    當模板不使用固定 replyQueue (或使用Direct reply-to - 參考 the section called “RabbitMQ Direct reply-to”) ,則不需要監聽器容器. 當在RabbitMQ3.4.0+使用時,Direct reply-to 是更好的機制.


    如果你將 RabbitTemplate 定義為 <bean/>, 或使用 @Configuration 類將其定義為@Bean,或者通過編程來創建模板,你需要自己定義和包裝回復監聽器容器.
    如果這樣做失敗了,模板將不會收到回復,并最終會超時并返回null作為對sendAndReceive 方法調用者的回復.

    從1.5版本開始, RabbitTemplate 會探測是否配置了MessageListener 來接收回復.如果沒有,它會嘗試發送并使用回復地地址來接收消息,如果失敗了,則會拋出 IllegalStateException (因為不會收到回復).

    此外,如果使用了簡單的replyAddress (隊列名稱),回復監聽器容器會驗證與監聽的隊列是否是一樣的名稱.但如果這個地址是交換器和路由鍵,這種檢查不會被執行,會輸出調試日志信息.

    重要

    當在編寫回復監聽器和模板時,重要的一點是要保證模板的replyQueue 與容器的queues (或queueNames) 屬性指的是相同的隊列. 模板會將回復隊列插入到出站消息的replyTo屬性中.

    下面的例子展示了如何來包裝這些beans.

    <bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    </bean>
    <bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
    </bean>
    <rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {         
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(msgConv());
    rabbitTemplate.setReplyQueue(replyQueue());
    rabbitTemplate.setReplyTimeout(60000);
    return rabbitTemplate; }
    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {         
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(amqpTemplate());
    return container;
    }
    @Bean
    public Queue replyQueue() {         
    return new Queue("my.reply.queue");
    }

    完整的RabbitTemplate 包裝固定回復隊列,與遠程監聽器容器的請求回復處理展示在 this test case.

    重要

    當回復超時時(replyTimeout), sendAndReceive() 方法會返回null.


    在1.3.6版本之前, 消息超時回復只是簡單地記錄下來.現在,如果收到了遲到回復,將會拒絕(模板會拋出AmqpRejectAndDontRequeueException).
    如果回復隊列配置了將拒絕消息到死信交換器中, 可獲取回復來作后面的分析. 只須將隊列以隊列名稱作為路由鍵綁定到死信交換器中.

    參考RabbitMQ Dead Letter Documentation 來了解更多關于死信的配置信息.

    你也可以看示例中關于FixedReplyQueueDeadLetterTests 測試用例.

    AsyncRabbitTemplate

    1.6版本引入了 AsyncRabbitTemplate

    它有與 AmqpTemplate 上類似的sendAndReceive (和 convertSendAndReceive) 方法,但不是阻塞的,它們會返回一個 ListenableFuture.

    sendAndReceive 方法返回一個RabbitMessageFutureconvertSendAndReceive 方法會返回一個RabbitConverterFuture.

    你可以同步稍后在future上調用get()方法來獲取結果,也可以注冊一個回調異步來獲取結果.

    @Autowired
    private AsyncRabbitTemplate template;  
    ...
    public void doSomeWorkAndGetResultLater() {
    ...
    ListenableFuture<String> future = this.template.convertSendAndReceive("foo");
    // do some more work
    String reply = null;
    try {
    reply = future.get();
    }
    catch (ExecutionException e) {
    ...
    } ...
    }
    public void doSomeWorkAndGetResultAsync() {
    ...
    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.addCallback(new ListenableFutureCallback<String>() {
    @Override
    publicvoid onSuccess(String result) { 
    ...
    }
    @Override
    publicvoid onFailure(Throwable ex) { 
    ...
    }
    });
    ...
    }

    如果設置了mandatory ,且消息不能投遞,future 會拋出一個ExecutionException ,并帶有AmqpMessageReturnedException 原因,它封裝了返回的消息和以及關于返回的信息.

    如果設置了enableConfirms ,future會包含一個屬性confirm ,它是 ListenableFuture<Boolean> , true 表示成功的發布.

    如果confirm future是false,RabbitFuture 會有一個屬性nackCause - 如果可用的話,則代表的是失敗的原因.

    重要

    發布者確認已被廢棄了(如果在回復之后收到),-因為回復已經暗示了成功發布.

    在模板上設置receiveTimeout 屬性來表示回復超時時間(它默認為 30 秒).如果發生了超時,future會以AmqpReplyTimeoutException結束.

    模板可實現SmartLifecycle; 這樣可阻止模板在等待回復時Future 退出.

    Spring 遠程調用 AMQP

    Spring Framework 有一個普遍的遠程處理能力, 允許 Remote Procedure Calls (RPC) 使用多種傳輸協議. Spring-AMQP 通過在客戶端使用AmqpProxyFactoryBean ,在服務端使用AmqpInvokerServiceExporter也可以提供類似的機制.
    它提供了基于AMQP的RPC. 在客戶端,
    RabbitTemplate 可以按照上面一樣來使用,在服務器端, invoker (配置為MessageListener) 會收到消息, 調用配置的服務,使用入站消息的replyTo信息來返回回復.

    客戶端工廠可注入任何bean (使用它的serviceInterface);客戶端然后可以調用代理上的方法,導致在AMQP上遠程執行.

    重要

    使用默認 MessageConverter 器,方法參數和返回值必須是Serializable的實例.

    在服務器端,AmqpInvokerServiceExporter 包含AmqpTemplate 和 MessageConverter屬性. 

    目前,未使用模板的MessageConverter.如果你需要提供定制的消息轉換器,那么你需要使用messageConverter 屬性進行提供.在客戶端,可在AmqpTemplate 中添加定制消息轉換器,它是使用其amqpTemplate 屬性提供給 AmqpProxyFactoryBean 的.

    樣例 client 和server 配置如下所示.

    <bean id="client" class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
    <propertyname="amqpTemplate "ref="template" />
    <propertyname="serviceInterface"value="foo.ServiceInterface" />
    </bean>
    <rabbit:connection-factory id="connectionFactory" />
    <rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"routing-key="remoting.binding" exchange="remoting.exchange" />
    <rabbit:admin connection-factory="connectionFactory" />
    <rabbit:queue name="remoting.queue" />
    <rabbit:direct-exchange name="remoting.exchange">
    <rabbit:bindings>
    <rabbit:binding queue="remoting.queue" key="remoting.binding" />
    </rabbit:bindings>
    </rabbit:direct-exchange>
    <bean id="listener" class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
    <property name="serviceInterface" value="foo.ServiceInterface" />
    <property name="service" ref="service" />
    <property name="amqpTemplate" ref="template" />
    </bean><bean id="service" class="foo.ServiceImpl" />
    <rabbit:connection-factory id="connectionFactory" />
    <rabbit:template id="template" connection-factory="connectionFactory" />
    <rabbit:queue name="remoting.queue" />
    <rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="listener" queue-names="remoting.queue" />
    </rabbit:listener-container>
    重要
    AmqpInvokerServiceExporter 只能處理適當格式的消息,如果從AmqpProxyFactoryBean中發出的消息. 如果它接收到一個不能解釋的消息,那么將發送一個序列化的RuntimeException 作為回復.
    如果這些消息無re
    plyToAddress 屬性,消息會被拒絕且在沒有配置死信交換器時會永久丟失.
    默認情況下,如果請求消息不能投遞,調用線程最終會超時,并會拋出RemoteProxyFailureException. 超時時間是5秒,可在RabbitTemplate通過設置replyTimeout 屬性來修改.
    從1.5版本開始,如果設置 mandatory 屬性為true, 并在連接工廠中啟用了返回(參考 the section called “Publisher Confirms and Returns”), 調用線程會拋出一個AmqpMessageReturnedException.
    參考 the section called “Reply Timeout” 來了解更多信息.



    posted on 2016-08-13 15:59 胡小軍 閱讀(6653) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 8x成人永久免费视频| 国产成人精品免费视频软件| 亚洲无人区视频大全| 男女交性永久免费视频播放| 又黄又大的激情视频在线观看免费视频社区在线 | 亚洲人成小说网站色| 亚洲国产成人AV在线播放| 国产亚洲成av片在线观看| 日本免费网站视频www区| 亚洲国产成人久久综合| 亚洲人成网77777色在线播放| 波多野结衣在线免费视频| 一级特黄特色的免费大片视频| 18gay台湾男同亚洲男同| 啊灬啊灬别停啊灬用力啊免费看| 免费精品久久天干天干| 亚洲av无码专区在线电影天堂| 亚洲精品无码专区久久久| 日韩中文无码有码免费视频 | 成人免费a级毛片无码网站入口| av午夜福利一片免费看久久| 亚洲AV一二三区成人影片| 国产AV无码专区亚洲AWWW| 野花高清在线观看免费完整版中文 | 亚洲国产精品成人AV无码久久综合影院 | 一级毛片成人免费看免费不卡| 老子影院午夜伦不卡亚洲| 亚洲一本综合久久| 亚洲精品色婷婷在线影院| 无限动漫网在线观看免费| 久久亚洲免费视频| 一级毛片免费播放试看60分钟| 亚洲AV成人影视在线观看| 久久精品国产亚洲av麻豆小说 | 毛片免费观看视频| 2015日韩永久免费视频播放| 国产va免费观看| 全部在线播放免费毛片| 亚洲国产精品无码中文lv| 亚洲日韩乱码久久久久久| 亚洲高清在线视频|