<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評(píng)論

    閱讀排行榜

    評(píng)論排行榜

    Consumer Tags

    從1.4.5版本開(kāi)始,你可以提供一種策略來(lái)生成consumer tags.默認(rèn)情況下,consumer tag是由broker來(lái)生成的.

    public interface ConsumerTagStrategy {      String createConsumerTag(String queue);  }
    該隊(duì)列是可用的,所以它可以(可選)在tag中使用。

    參考Section 3.1.15, “Message Listener Container Configuration”.

    注解驅(qū)動(dòng)的監(jiān)聽(tīng)器Endpoints

    介紹

    從1.4版本開(kāi)始,異步接收消息的最簡(jiǎn)單方式是使用注解監(jiān)聽(tīng)器端點(diǎn)基礎(chǔ)設(shè)施. 簡(jiǎn)而言之,它允許你暴露管理bean的方法來(lái)作為Rabbit 監(jiān)聽(tīng)器端點(diǎn).

    @Component
    public class MyService {      
     @RabbitListener(queues = "myQueue")
     public void processOrder(String data) {         
         ...
      }
    }
    上面例子的含義是,當(dāng)消息在org.springframework.amqp.core.Queue "myQueue"上可用時(shí), 會(huì)調(diào)用processOrder方法(在這種情況下,帶有消息的負(fù)載).

    通過(guò)使用RabbitListenerContainerFactory,注解端點(diǎn)基礎(chǔ)設(shè)施在每個(gè)注解方法的幕后都創(chuàng)建了一個(gè)消息監(jiān)聽(tīng)器容器.在上面的例子中,myQueue 必須是事先存在的,
    并綁定了某個(gè)交換器上.從1.5.0版本開(kāi)始
    ,只要在上下文中存在RabbitAdmin,隊(duì)列可自動(dòng)聲明和綁定.

    @Component
    public class MyService {
    
      @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "myQueue", durable = "true"),
            exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
            key = "orderRoutingKey")
      )public void processOrder(String data) {
        ...
      }
    
      @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "auto.exch"),
            key = "invoiceRoutingKey")
      )public void processInvoice(String data) {
        ...
      }
    
    }

    在第一個(gè)例子中,隊(duì)列myQueue 會(huì)與交換器一起自動(dòng)聲明(持久化的), 如果需要,可使用路由鍵來(lái)綁定到交換器上.在第二個(gè)例子中,匿名(專用的,自動(dòng)刪除的)隊(duì)列將會(huì)聲明并綁定.
    可提供多個(gè) QueueBinding 條目,允許監(jiān)聽(tīng)器監(jiān)聽(tīng)多個(gè)隊(duì)列.

    當(dāng)前只支持DIRECT, FANOUT, TOPIC 和HEADERS的交換器類型.當(dāng)需要高級(jí)配置時(shí),可使用@Bean 定義.

    注意第一個(gè)例子中交換器上的 ignoreDeclarationExceptions .這允許,例如, 綁定到有不同的設(shè)置(如.internal)的交換器上. 默認(rèn)情況下,現(xiàn)有交換器的屬性必須被匹配.

    從1.6版本開(kāi)始,你可為隊(duì)列,交換器和綁定的@QueueBinding 注解中指定參數(shù).示例:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "auto.headers", autoDelete = "true",
                            arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                    type = "java.lang.Integer")),
            exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
            arguments = {
                    @Argument(name = "x-match", value = "all"),
                    @Argument(name = "foo", value = "bar"),
                    @Argument(name = "baz")
            })
    )
    public String handleWithHeadersExchange(String foo) { ... }

    注意隊(duì)列的x-message-ttl 參數(shù)設(shè)為了10秒鐘,因?yàn)閰?shù)類型不是String, 因此我們指定了它的類型,在這里是Integer.有了這些聲明后,如果隊(duì)列已經(jīng)存在了,參數(shù)必須匹配現(xiàn)有隊(duì)列上的參數(shù).對(duì)于header交換器,我們?cè)O(shè)置binding arguments 要匹配頭中foo為bar,且baz可為任意值的消息. x-match 參數(shù)則意味著必須同時(shí)滿足兩個(gè)條件.

    參數(shù)名稱,參數(shù)值,及類型可以是屬性占位符(${...}) 或SpEL 表達(dá)式(#{...}). name 必須要能解析為String; type的表達(dá)式必須能解析為Class 或類的全限定名. value 必須能由DefaultConversionService 類型進(jìn)行轉(zhuǎn)換(如上面例子中x-message-ttl).

    如果name 解析為null 或空字符串,那么將忽略 @Argument.

    元注解(Meta-Annotations)

    有時(shí),你想將同樣的配置用于多個(gè)監(jiān)聽(tīng)器上. 為減少重復(fù)配置,你可以使用元注解來(lái)創(chuàng)建你自己的監(jiān)聽(tīng)器注解:

    @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
    public@interface MyAnonFanoutListener {
    }
    
    public class MetaListener {
    
        @MyAnonFanoutListener
    public void handle1(String foo) {
            ...
        }
    
        @MyAnonFanoutListener
    public void handle2(String foo) {
            ...
        }
    
    }

    在這個(gè)例子中,每個(gè)通過(guò)@MyAnonFanoutListener創(chuàng)建的監(jiān)聽(tīng)器都會(huì)綁定一個(gè)匿名,自動(dòng)刪除的隊(duì)列到fanout交換器 metaFanout. 元注解機(jī)制是簡(jiǎn)單的,在那些用戶定義注解中的屬性是不會(huì)經(jīng)過(guò)檢查的- 因此你不能從元注解中覆蓋設(shè)置.當(dāng)需要高級(jí)配置時(shí),使用一般的 @Bean 定義.

    Enable Listener Endpoint Annotations

    為了啟用 @RabbitListener 注解,需要在你的某個(gè)@Configuration類中添加@EnableRabbit 注解.

    @Configuration
    @EnableRabbit
    publicclass AppConfig {
    
        @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory());
            factory.setConcurrentConsumers(3);
            factory.setMaxConcurrentConsumers(10);
            return factory;
        }
    }

    默認(rèn)情況下,基礎(chǔ)設(shè)施會(huì)查找一個(gè)名為rabbitListenerContainerFactory 的bean作為工廠來(lái)源來(lái)創(chuàng)建消息監(jiān)聽(tīng)器容器. 在這種情況下,會(huì)忽略RabbitMQ 基礎(chǔ)設(shè)施計(jì)劃, processOrder 方法可使用核心輪詢大小為3個(gè)線程最大10個(gè)線程的池大小來(lái)調(diào)用.

    可通過(guò)使用注解或?qū)崿F(xiàn)RabbitListenerConfigurer

    接口來(lái)自定義監(jiān)聽(tīng)器容器工廠. 默認(rèn)只需要注冊(cè)至少一個(gè)Endpoints,而不需要一個(gè)特定的容器工廠.查看javadoc來(lái)了解詳情和例子.

    如果你更喜歡XML配置,可使用 <rabbit:annotation-driven> 元素.

    <rabbit:annotation-driven/>
    <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="concurrentConsumers "value="3"/>
    <property name="maxConcurrentConsumers"value="10"/>
    </bean>
    注解方法的消息轉(zhuǎn)換

    在調(diào)用監(jiān)聽(tīng)器之前,在管道中有兩個(gè)轉(zhuǎn)換步驟. 第一個(gè)使用 MessageConverter 來(lái)將傳入的Spring AMQP Message 轉(zhuǎn)換成spring-消息系統(tǒng)的消息. 當(dāng)目標(biāo)方法調(diào)用時(shí),消息負(fù)載將被轉(zhuǎn)換,如果有必要,也會(huì)參考消息參數(shù)類型來(lái)進(jìn)行.

    第一步中的默認(rèn) MessageConverter 是一個(gè)Spring AMQP SimpleMessageConverter ,它可以處理String 和 java.io.Serializable對(duì)象之間的轉(zhuǎn)換; 其它所有的將保留為byte[]. 在下面的討論中,我們稱其為消息轉(zhuǎn)換器.

    第二個(gè)步驟的默認(rèn)轉(zhuǎn)換器是GenericMessageConverter ,它將委派給轉(zhuǎn)換服務(wù)(DefaultFormattingConversionService的實(shí)例). 在下面的討論中,我們稱其為方法參數(shù)轉(zhuǎn)換器.

    要改變消息轉(zhuǎn)換器,可在連接工廠bean中設(shè)置其相關(guān)屬性:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ...
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        ...
        return factory;
    }

    這配置了一個(gè)Jackson2 轉(zhuǎn)換器,希望頭信息能通過(guò)它來(lái)指導(dǎo)轉(zhuǎn)換.

    你也可以考慮使用ContentTypeDelegatingMessageConverter ,它可以處理不同內(nèi)容類型的轉(zhuǎn)換.

    大多數(shù)情況下,沒(méi)有必要來(lái)定制方法參數(shù)轉(zhuǎn)換器,除非你想要用自定義的ConversionService.

    在1.6版本之前,用于轉(zhuǎn)換JSON的類型信息必須在消息頭中提供或者需要一個(gè)自定義的ClassMapper. 從1.6版本開(kāi)始,如果沒(méi)有類型信息頭,類型可根據(jù)目標(biāo)方法參數(shù)推斷.

    類型推斷只能用于 @RabbitListener 的方法級(jí).

    參考 the section called “Jackson2JsonMessageConverter” 來(lái)了解更多信息.

    如果您希望自定義方法參數(shù)轉(zhuǎn)換器,您可以這樣做如下:
    @Configuration
    @EnableRabbit
    public class AppConfig implements RabbitListenerConfigurer {
    
        ...
    
        @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        	DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        	factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
        	return factory;
        }
    
        @Bean
    public ConversionService myConversionService() {
        	DefaultConversionService conv = new DefaultConversionService();
        	conv.addConverter(mySpecialConverter());
        	return conv;
        }
    
        @Override
    publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        	registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    
        ...
    
    }
    重要
    對(duì)于多方法監(jiān)聽(tīng)器(參考 the section called “Multi-Method Listeners”), 方法選擇是基于消息轉(zhuǎn)換后的消息負(fù)載,方法參數(shù)轉(zhuǎn)換器只在方法被選擇后才會(huì)調(diào)用.
    編程式 Endpoint 注冊(cè)

    RabbitListenerEndpoint 提供了一個(gè)Rabbit endpoint 模型并負(fù)責(zé)為那個(gè)模型配置容器.除了通過(guò)RabbitListener注解檢測(cè)外這個(gè)基礎(chǔ)設(shè)施允許你通過(guò)編程來(lái)配置endpoints.

    @Configuration
    @EnableRabbit
    publicclass AppConfig implements RabbitListenerConfigurer {
    
        @Override
    publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
            endpoint.setQueueNames("anotherQueue");
            endpoint.setMessageListener(message -> {
                // processing
            });
            registrar.registerEndpoint(endpoint);
        }
    }

    在上面的例子中,我們使用了SimpleRabbitListenerEndpoint (它使用MessageListener 來(lái)進(jìn)行處理),但你也可以構(gòu)建你自己的endpoint變種來(lái)描述自定義的調(diào)用機(jī)制.

    應(yīng)該指出的是,你也可以跳過(guò)@RabbitListener 的使用,通過(guò)RabbitListenerConfigurer來(lái)編程注冊(cè)你的endpoints.

    Annotated Endpoint Method Signature
    到目前為止,我們已經(jīng)在我們的端點(diǎn)上注入了一個(gè)簡(jiǎn)單的字符串,但它實(shí)際上可以有一個(gè)非常靈活的方法簽名。讓我們重寫(xiě)它,以一個(gè)自定義的頭來(lái)控制注入順序:
    @Component
    publicclass MyService {
    
        @RabbitListener(queues = "myQueue")
    publicvoid processOrder(Order order, @Header("order_type") String orderType) {
            ...
        }
    }

    下面是你可以在監(jiān)聽(tīng)端點(diǎn)上注入的主要元素:

    原生org.springframework.amqp.core.Message.

    用于接收消息的com.rabbitmq.client.Channel

     org.springframework.messaging.Message 代表的是傳入的AMQP消息.注意,這個(gè)消息持有自定義和標(biāo)準(zhǔn)的頭部信息 (AmqpHeaders定義).


    從1.6版本開(kāi)始, 入站deliveryMode 頭可以AmqpHeaders.RECEIVED_DELIVERY_MODE 使用,代替了AmqpHeaders.DELIVERY_MODE.

    @Header-注解方法參數(shù)可 提取一個(gè)特定頭部值,包括標(biāo)準(zhǔn)的AMQP頭.

    @Headers-注解參數(shù)為了訪問(wèn)所有頭信息,必須能指定為java.util.Map.

    非注解元素(非支持類型(如. Message 和Channel))可認(rèn)為是負(fù)荷(payload).你可以使用 @Payload來(lái)明確標(biāo)識(shí). 你也可以添加額外的 @Valid來(lái)進(jìn)行驗(yàn)證.

    注入Spring消息抽象的能力是特別有用的,它可受益于存儲(chǔ)在特定傳輸消息中的信息,而不需要依賴于特定傳輸API.

    @RabbitListener(queues = "myQueue")
    public void processOrder(Message<Order> order) { ...
    }

    方法參數(shù)的處理是由DefaultMessageHandlerMethodFactory 提供的,它可以更進(jìn)一步地定制以支持其它的方法參數(shù). 轉(zhuǎn)換和驗(yàn)證支持也可以定制.

    例如,如果我們想確保我們的Order在處理之前是有效的,我們可以使用@Valid 來(lái)注解負(fù)荷,并配置必須驗(yàn)證器,就像下面這樣:

    @Configuration
    @EnableRabbit
    public class AppConfig implements RabbitListenerConfigurer {
    
        @Override
    publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    
        @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            factory.setValidator(myValidator());
            return factory;
        }
    }
    監(jiān)聽(tīng)多個(gè)隊(duì)列

    當(dāng)使用queues 屬性時(shí),你可以指定相關(guān)的容器來(lái)監(jiān)聽(tīng)多個(gè)隊(duì)列. 你可以使用 @Header 注解來(lái)指定對(duì)于那些隊(duì)列中收到的消息對(duì)POJO方法可用:

    @Component
    public class MyService {
    
        @RabbitListener(queues = { "queue1", "queue2" } )
    public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
            ...
        }
    
    }

    從1.5版本開(kāi)始,隊(duì)列名稱可以使用屬性占位符和SpEL:

    @Component
    public class MyService {
    
        @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
    public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
            ...
        }
    
    }

    1.5版本之前,只有單個(gè)隊(duì)列可以這種方法進(jìn)行指定,每個(gè)隊(duì)列需要一個(gè)單獨(dú)的屬性.

    回復(fù)管理

    MessageListenerAdapter 現(xiàn)有的支持已經(jīng)允許你的方法有一個(gè)非void的返回類型.在這種情況下,調(diào)用的結(jié)果被封裝在一個(gè)發(fā)送消息中,其消息發(fā)送地址要么是原始消息的ReplyToAddress頭指定的地址要么是監(jiān)聽(tīng)器上配置的默認(rèn)地址.默認(rèn)地址現(xiàn)在可通過(guò)@SendTo 注解進(jìn)行設(shè)置.

    假設(shè)我們的processOrder 方法現(xiàn)在需要返回一個(gè)OrderStatus, 可將其寫(xiě)成下面這樣來(lái)自動(dòng)發(fā)送一個(gè)回復(fù):

    @RabbitListener(destination = "myQueue")
    @SendTo("status")
    public OrderStatus processOrder(Order order) {
        // order processing
     return status;
    }

    如果你需要以傳輸獨(dú)立的方式來(lái)設(shè)置其它頭,你可以返回Message,就像這樣:

    @RabbitListener(destination = "myQueue")
    @SendTo("status")
    public Message<OrderStatus> processOrder(Order order) {
        // order processing
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
    }

    @SendTo 值按照exchange/routingKey模式(其中的一部分可以省略)來(lái)作為對(duì)exchange 和 routingKey 的回復(fù).有效值為:

    foo/bar - 以交換器和路由鍵進(jìn)行回復(fù).

    foo/ - 以交換器和默認(rèn)路由鍵進(jìn)行回復(fù).

    bar or /bar - 以路由鍵和默認(rèn)交換器進(jìn)行回復(fù).

    / or empty - 以默認(rèn)交換器和默認(rèn)路由鍵進(jìn)行回復(fù).

     @SendTo 也可以沒(méi)有value 屬性. 這種情況等價(jià)于空的sendTo 模式. @SendTo 只能應(yīng)用于沒(méi)有replyToAddress 屬性的入站消息中.

    從1.5版本開(kāi)始, @SendTo 值可以通過(guò)bean SpEL 表達(dá)式初始化,例如…​

    @RabbitListener(queues = "test.sendTo.spel")
    @SendTo("#{spelReplyTo}")
    public String capitalizeWithSendToSpel(String foo) {
        return foo.toUpperCase();
    }
    ...
    @Bean
    public String spelReplyTo() {
        return"test.sendTo.reply.spel";
    }

    表達(dá)式必須能評(píng)估為String,它可以是簡(jiǎn)單的隊(duì)列名稱(將發(fā)送到默認(rèn)交換器中) 或者是上面談到的exchange/routingKey 形式.

    在初始化時(shí),#{...} 表達(dá)式只評(píng)估一次.

    對(duì)于動(dòng)態(tài)路由回復(fù),消息發(fā)送者應(yīng)該包含一個(gè)reply_to 消息屬性或使用運(yùn)行時(shí)SpEL 表達(dá)式.

    從1.6版本開(kāi)始, @SendTo 可以是SpEL 表達(dá)式,它可在運(yùn)行時(shí)根據(jù)請(qǐng)求和回復(fù)來(lái)評(píng)估:

    @RabbitListener(queues = "test.sendTo.spel")
    @SendTo("!{'some.reply.queue.with.' + result.queueName}")
    public Bar capitalizeWithSendToSpel(Foo foo) {
        return processTheFooAndReturnABar(foo);
    }

    SpEL 表達(dá)式的運(yùn)行時(shí)性質(zhì)是由 !{...} 定界符表示的. 表達(dá)式評(píng)估上下文的#root 對(duì)象有三個(gè)屬性:

    • request - o.s.amqp.core.Message 請(qǐng)求對(duì)象.
    • source - 轉(zhuǎn)換后的 o.s.messaging.Message<?>.
    • result - 方法結(jié)果.

    上下文有一個(gè)map 屬性訪問(wèn)器,標(biāo)準(zhǔn)類型轉(zhuǎn)換器以及一個(gè)bean解析器,允許引用上下文中的其它beans (如.@someBeanName.determineReplyQ(request, result)).

    總結(jié)一下, #{...} 只在初始化的時(shí)候評(píng)估一次, #root 對(duì)象代表的是應(yīng)用程序上下文; beans可通過(guò)其名稱來(lái)引用. !{...} 會(huì)在運(yùn)行時(shí),對(duì)于每個(gè)消息,都將使用root對(duì)象的屬性進(jìn)行評(píng)估,bean可以使用其名稱進(jìn)行引用,前輟為@.

    多方法監(jiān)聽(tīng)器

    從1.5.0版本開(kāi)始,@RabbitListener 注解現(xiàn)在可以在類級(jí)上進(jìn)行指定.與新的@RabbitHandler 注解一起,基于傳入消息的負(fù)荷類型,這可以允許在單個(gè)監(jiān)聽(tīng)器上調(diào)用不同的方法.這可以用一個(gè)例子來(lái)描述:

    @RabbitListener(id="multi", queues = "someQueue")
    publicclass MultiListenerBean {
    
        @RabbitHandler
    @SendTo("my.reply.queue")
    public String bar(Bar bar) {
            ...
        }
    
        @RabbitHandler
    public String baz(Baz baz) {
            ...
        }
    
        @RabbitHandler
    public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
            ...
        }
    
    }

    在這種情況下,獨(dú)立的 @RabbitHandler 方法會(huì)被調(diào)用,如果轉(zhuǎn)換后負(fù)荷是BarBaz 或Qux. 理解基于負(fù)荷類型系統(tǒng)來(lái)確定唯一方法是很重要的.類型檢查是通過(guò)單個(gè)無(wú)注解參數(shù)來(lái)執(zhí)行的,否則就要使用@Payload 進(jìn)行注解. 注意同樣的方法簽名可應(yīng)用于方法級(jí) @RabbitListener 之上.

    注意,如果有必要,需要在每個(gè)方法上指定@SendTo, 在類級(jí)上它是不支持的.

    @Repeatable @RabbitListener

    從1.6版本開(kāi)始,@RabbitListener 注解可用 @Repeatable進(jìn)行標(biāo)記. 這就是說(shuō),這個(gè)注解可多次出現(xiàn)在相同的注解元素上(方法或類).在這種情況下,對(duì)于每個(gè)注解,都會(huì)創(chuàng)建獨(dú)立的監(jiān)聽(tīng)容器,它們每個(gè)都會(huì)調(diào)用相同的監(jiān)聽(tīng)器@Bean. Repeatable 注解能用于 Java 8+;當(dāng)在Java 7-使用時(shí),同樣的效果可以使用 @RabbitListeners "container" 注解(包含@RabbitListener注解的數(shù)組)來(lái)達(dá)到.

    Proxy @RabbitListener and Generics

    如果你的服務(wù)是用于代理(如,在 @Transactional的情況中) ,當(dāng)接口有泛型參數(shù)時(shí),需要要一些考慮.要有一個(gè)泛型接口和特定實(shí)現(xiàn),如:

    interface TxService<P> {
    
       String handle(P payload, String header);
    
    }
    
    static class TxServiceImpl implements TxService<Foo> {
    
        @Override
     @RabbitListener(...)
     public String handle(Foo foo, String rk) {
             ...
        }
    
    }

    你被迫切換到CGLIB目標(biāo)類代理,因?yàn)榻涌趆andle方法的實(shí)際實(shí)現(xiàn)只是一個(gè)橋接方法.在事務(wù)管理的情況下, CGLIB是通過(guò)注解選項(xiàng)來(lái)配置的- @EnableTransactionManagement(proxyTargetClass = true). 在這種情況下,所有注解都需要在實(shí)現(xiàn)類的目標(biāo)方法上進(jìn)行聲明:

    static class TxServiceImpl implements TxService<Foo> {
    
     @Override
    @Transactional
    @RabbitListener(...)
    public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
            ...
        }
    
    }
    容器管理

    由注解創(chuàng)建的容器不會(huì)在上下文中進(jìn)行注冊(cè).你可以調(diào)用 RabbitListenerEndpointRegistrygetListenerContainers()方法來(lái)獲取所有容器集合.然后,你可以迭代這個(gè)集合,例如,停止/啟動(dòng)所有容器或調(diào)用在其注冊(cè)上調(diào)用Lifecycle 方法(調(diào)用每個(gè)容器中的操作).

    你也可以使用id來(lái)獲取單個(gè)容器的引用,即 getListenerContainer(String id); 例如registry.getListenerContainer("multi") .

    從1.5.2版本開(kāi)始,你可以調(diào)用getListenerContainerIds()方法來(lái)獲取所有注冊(cè)容器的id.

    從1.5版本開(kāi)始,你可在RabbitListener端點(diǎn)上為容器分配一個(gè)組(group).這提供了一種機(jī)制來(lái)獲取子集容器的引用; 添加一個(gè)group 屬性會(huì)使Collection<MessageListenerContainer> 類型的bean使用組名稱注冊(cè)在上下文中.

    線程和異步消費(fèi)者

    一些不同的線程可與異步消費(fèi)者關(guān)聯(lián)。

    當(dāng)RabbitMQ Client投遞消息時(shí),來(lái)自于SimpleMessageListener 配置的TaskExecutor中的線程會(huì)調(diào)用MessageListener.如果沒(méi)有配置,將會(huì)使用SimpleAsyncTaskExecutor. 如果使用了池化的executor,須確保池大小可以支撐并發(fā)處理.

    當(dāng)使用默認(rèn)SimpleAsyncTaskExecutor時(shí),對(duì)于調(diào)用監(jiān)聽(tīng)器的線程,監(jiān)聽(tīng)器容器的beanName 將用作threadNamePrefix. 這有益于日志分析,在日志appender配置中,一般建議總是包含線程名稱.當(dāng)在SimpleMessageListenerContainertaskExecutor屬性中指定TaskExecutor 時(shí),線程名稱是不能修改的.建議你使用相似的技術(shù)來(lái)命名線程, 幫助在日志消息中的線程識(shí)別。

    當(dāng)創(chuàng)建連接時(shí),在CachingConnectionFactory 配置的Executor將傳遞給RabbitMQ Client ,并且它的線程將用于投遞新消息到監(jiān)聽(tīng)器容器.在寫(xiě)作的時(shí)候,如果沒(méi)有配置,client會(huì)使用池大小為5的內(nèi)部線程池executor.

    RabbitMQ client 使用ThreadFactory 來(lái)為低端I/O(socket)操作創(chuàng)建線程.要改變這個(gè)工廠,你需要配置底層RabbitMQ ConnectionFactory, 正如the section called “Configuring the Underlying Client Connection Factory”中所描述.

    檢測(cè)空閑異步消費(fèi)者

    雖然高效,但異步消費(fèi)者存在一個(gè)問(wèn)題:如何來(lái)探測(cè)它們什么是空閑的 - 當(dāng)有一段時(shí)間沒(méi)有收到消息時(shí),用戶可能想要采取某些動(dòng)作.

    從1.6版本開(kāi)始, 當(dāng)沒(méi)有消息投遞時(shí),可配置監(jiān)聽(tīng)器容器來(lái)發(fā)布ListenerContainerIdleEvent 事件. 當(dāng)容器是空閑的,事件會(huì)每隔idleEventInterval 毫秒發(fā)布事件.

    要配置這個(gè)功能,須在容器上設(shè)置idleEventInterval:

    xml
    <rabbit:listener-container connection-factory="connectionFactory"...idle-event-interval="60000"...
            >
    <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
    </rabbit:listener-container>
    Java
    @Bean
    public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        ...
        container.setIdleEventInterval(60000L);
        ...
        return container;
    }
    @RabbitListener
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setIdleEventInterval(60000L);
        ...
        return factory;
    }

    在上面這些情況中,當(dāng)容器空閑時(shí),每隔60秒就會(huì)發(fā)布事件.

    事件消費(fèi)

    通過(guò)實(shí)現(xiàn)ApplicationListener 可捕獲這些事件- 要么是一個(gè)一般的監(jiān)聽(tīng)器,要么是一個(gè)窄化的只接受特定事件的監(jiān)聽(tīng)器. 你也可以使用Spring Framework 4.2中引入的@EventListener.

    下面的例子在單個(gè)類中組合使用了@RabbitListener 和@EventListener .重點(diǎn)要理解,應(yīng)用程序監(jiān)聽(tīng)器會(huì)收到所有容器的事件,因此如果你只對(duì)某個(gè)容器采取措施,那么你需要檢查監(jiān)聽(tīng)器id.你也可以使用@EventListener 條件來(lái)達(dá)到此目的.

    事件有4個(gè)屬性:

    • source - 監(jiān)聽(tīng)容器實(shí)例
    • id - 監(jiān)聽(tīng)器id(或容器bean名稱)
    • idleTime - 當(dāng)事件發(fā)布時(shí),容器已經(jīng)空閑的時(shí)間
    • queueNames - 容器監(jiān)聽(tīng)的隊(duì)列名稱
    public class Listener {
    
        @RabbitListener(id="foo", queues="#{queue.name}")
        public String listen(String foo) {
            return foo.toUpperCase();
        }
    
        @EventListener(condition = "event.listenerId == 'foo'")
        public void onApplicationEvent(ListenerContainerIdleEvent event) {
            ...
        }
    
    }
    重要
    事件監(jiān)聽(tīng)器會(huì)查看所有容器的事件,因此,在上面的例子中,我們根據(jù)監(jiān)聽(tīng)器ID縮小了要接收的事件.
    警告
    如果你想使用idle事件來(lái)停止監(jiān)聽(tīng)器容器,你不應(yīng)該在調(diào)用監(jiān)聽(tīng)器的線程上來(lái)調(diào)用container.stop() 方法- 它會(huì)導(dǎo)致延遲和不必要的日志消息. 相反,你應(yīng)該把事件交給一個(gè)不同的線程,然后可以停止容器。

    3.1.7 消息轉(zhuǎn)換器

    介紹

    AmqpTemplate 同時(shí)也定義了多個(gè)發(fā)送和接收消息(委派給MessageConverter)的方法.

    MessageConverter 本身是很簡(jiǎn)單的. 在每個(gè)方向上它都提供了一個(gè)方法:一個(gè)用于轉(zhuǎn)換成Message,另一個(gè)用于從Message中轉(zhuǎn)換.注意,當(dāng)轉(zhuǎn)換成Message時(shí),除了object外,你還需要提供消息屬性. "object"參數(shù)通常對(duì)應(yīng)的是Message body.

    public interface MessageConverter {
    
        Message toMessage(Object object, MessageProperties messageProperties)
                throws MessageConversionException;
    
        Object fromMessage(Message message) throws MessageConversionException;
    
    }

    AmqpTemplate中相關(guān)的消息發(fā)送方法列舉在下邊. 這比我們前面提到的要簡(jiǎn)單,因?yàn)樗鼈儾恍枰?code style="font-family: Monaco, Consolas, Courier, 'Lucida Console', monospace; color: #6d180b; background-color: #f2f2f2;">Message 實(shí)例. 相反地,  MessageConverter 負(fù)責(zé)創(chuàng)建每個(gè)消息(通過(guò)將提供的對(duì)象轉(zhuǎn)換成Message body的字節(jié)數(shù)組,以及添加提供的MessageProperties).

    void convertAndSend(Object message) throws AmqpException;
    
    void convertAndSend(String routingKey, Object message) throws AmqpException;
    
    void convertAndSend(String exchange, String routingKey, Object message)
        throws AmqpException;
    
    void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
        throws AmqpException;
    
    void convertAndSend(String routingKey, Object message,
        MessagePostProcessor messagePostProcessor) throws AmqpException;
    
    void convertAndSend(String exchange, String routingKey, Object message,
        MessagePostProcessor messagePostProcessor) throws AmqpException;

    在接收端,這里只有兩個(gè)方法:一個(gè)接受隊(duì)列名稱,另一個(gè)依賴于模板設(shè)置的隊(duì)列屬性.

    Object receiveAndConvert() throws AmqpException;
    
    Object receiveAndConvert(String queueName) throws AmqpException;
    在 the section called “Asynchronous Consumer” 中提到的MessageListenerAdapter也使用了MessageConverter.

    SimpleMessageConverter

    MessageConverter 策略的默認(rèn)實(shí)現(xiàn)被稱為SimpleMessageConverter. 如果你沒(méi)有明確配置,RabbitTemplate實(shí)例會(huì)使用此轉(zhuǎn)換器的實(shí)例.它能處理基于文本內(nèi)容,序列化Java對(duì)象,以及簡(jiǎn)單的字節(jié)數(shù)組.

    從 Message中轉(zhuǎn)換

    如果傳入消息的內(nèi)容類型以"text" (如. "text/plain")開(kāi)頭,它同時(shí)也會(huì)檢查內(nèi)容編碼屬性,以確定將消息body字節(jié)數(shù)組轉(zhuǎn)換成字符串所要使用的字符集. 如果在輸入消息中沒(méi)有指定內(nèi)容編碼屬性, 它默認(rèn)會(huì)使用"UTF-8"字符集.如果你需要覆蓋默認(rèn)設(shè)置,你可以配置一個(gè)SimpleMessageConverter實(shí)例,設(shè)置其"defaultCharset" 屬性,再將其注入到RabbitTemplate 實(shí)例中.

    如果傳入消息的內(nèi)容類型屬性值為"application/x-java-serialized-object", SimpleMessageConverter 將嘗試將字節(jié)數(shù)組反序列化為一個(gè)Java object. 雖然這對(duì)于簡(jiǎn)單的原型是有用的,但一般不推薦依賴于Java序列化機(jī)制,因?yàn)樗鼤?huì)生產(chǎn)者和消費(fèi)者之間的緊密耦合。當(dāng)然,這也排除了在兩邊使用非Java的可能性.由于AMQP 是線路級(jí)協(xié)議, 因這樣的限制失去了許多優(yōu)勢(shì),這是不幸的. 在后面的兩個(gè)章節(jié)中,我們將探討通過(guò)豐富的域?qū)ο蟮膬?nèi)容來(lái)替代java序列化.

    對(duì)于其它內(nèi)容類型,SimpleMessageConverter 會(huì)以字節(jié)數(shù)組形式直接返回消息body內(nèi)容.

    參考the section called “Java Deserialization” 來(lái)了解更多信息.

    轉(zhuǎn)換成消息

    當(dāng)從任意Java對(duì)象轉(zhuǎn)換成Message時(shí), SimpleMessageConverter 同樣可以處理字節(jié)數(shù)組,字符串,以及序列化實(shí)例.它會(huì)將每一種都轉(zhuǎn)換成字節(jié)(在字節(jié)數(shù)組的情況下,不需要任何轉(zhuǎn)換), 并且會(huì)相應(yīng)地設(shè)置內(nèi)容類型屬性.如果要轉(zhuǎn)換的對(duì)象不匹配這些類型,Message body 將是null.

    SerializerMessageConverter

    除了它可以使用其它application/x-java-serialized-object轉(zhuǎn)換的Spring框架Serializer 和 Deserializer 實(shí)現(xiàn)來(lái)配置外,此轉(zhuǎn)換器類似于SimpleMessageConverter

    參考the section called “Java Deserialization” 來(lái)了解更多信息.

    Jackson2JsonMessageConverter

    轉(zhuǎn)換成消息

    正如前面章節(jié)提到的,一般來(lái)說(shuō)依賴于Java序列化機(jī)制不是推薦的.另一個(gè)常見(jiàn)更靈活且可跨語(yǔ)言平臺(tái)的選擇JSON (JavaScript Object Notation).可通過(guò)在RabbitTemplate實(shí)例上配置轉(zhuǎn)換器來(lái)覆蓋默認(rèn)SimpleMessageConverter.Jackson2JsonMessageConverter 使用的是com.fasterxml.jackson 2.x 包.

    <bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
    <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
    <!-- if necessary, override the DefaultClassMapper -->
    <property name="classMapper"  ref="customClassMapper"/>
    </bean>
    </property>
    </bean>

    正如上面展示的, Jackson2JsonMessageConverter 默認(rèn)使用的是DefaultClassMapper. 類型信息是添加到MessageProperties中的(也會(huì)從中獲取). 如果入站消息在MessageProperties沒(méi)有包含類型信息,但你知道預(yù)期類型,你可以使用defaultType 屬性來(lái)配置靜態(tài)類型

    <bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
    <property name="classMapper">
    <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
    <property name="defaultType" value="foo.PurchaseOrder"/>
    </bean>
    </property>
    </bean>
    轉(zhuǎn)換Message

    入站消息會(huì)根據(jù)發(fā)送系統(tǒng)頭部中添加的類型信息來(lái)轉(zhuǎn)換成對(duì)象.

    在1.6之前的版本中,如果不存在類型信息,轉(zhuǎn)換將失敗。從1.6版開(kāi)始,如果類型信息丟失,轉(zhuǎn)換器將使用Jsckson默認(rèn)值(通常是一個(gè)map)來(lái)轉(zhuǎn)換JSON.
    此外,從1.6版本開(kāi)始,當(dāng)在方法上使用@RabbitListener 注解時(shí), 推斷類型信息會(huì)添加到MessageProperties; 這允許轉(zhuǎn)換器轉(zhuǎn)換成目標(biāo)方法的參數(shù)類型.這只適用于無(wú)注解的參數(shù)或使用@Payload注解的單個(gè)參數(shù). 在分析過(guò)程中忽略類型消息的參數(shù)。

    重要
    默認(rèn)情況下,推斷類型信息會(huì)覆蓋inbound __TypeId__ 和發(fā)送系統(tǒng)創(chuàng)建的相關(guān)headers. 這允許允許接收系統(tǒng)自動(dòng)轉(zhuǎn)換成不同的領(lǐng)域?qū)ο? 這只適用于具體的參數(shù)類型(不是抽象的或不是接口)或者來(lái)自java.util 包中的對(duì)象.其它情況下,將使用 __TypeId__ 和相關(guān)的頭.也可能有你想覆蓋默認(rèn)行為以及總是使用__TypeId__信息的情況. 例如, 讓我們假設(shè)你有一個(gè)接受Foo參數(shù)的@RabbitListener ,但消息中包含了Bar( 它是的Foo (具體類)的子類). 推斷類型是不正確的.要處理這種情況,需要設(shè)置Jackson2JsonMessageConverter 的TypePrecedence 屬性為TYPE_ID 而替換默認(rèn)的INFERRED. 這個(gè)屬性實(shí)際上轉(zhuǎn)換器的DefaultJackson2JavaTypeMapper ,但為了方便在轉(zhuǎn)換器上提供了一個(gè)setter方法. 如果你想注入一個(gè)自定義類型mapper, 你應(yīng)該設(shè)置屬性mapper.
    @RabbitListener
    public void foo(Foo foo) {...}
    
    @RabbitListener
    public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}
    
    @RabbitListener
    public void foo(Foo foo, o.s.amqp.core.Message message) {...}
    
    @RabbitListener
    public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}
    
    @RabbitListener
    public void foo(Foo foo, String bar) {...}
    
    @RabbitListener
    public void foo(Foo foo, o.s.messaging.Message<?> message) {...}

    上面前4種情況下,轉(zhuǎn)換器會(huì)嘗試轉(zhuǎn)換成Foo 類型. 第五個(gè)例子是無(wú)效的,因?yàn)槲覀儾荒艽_定使用哪個(gè)參數(shù)來(lái)接收消息負(fù)荷. 在第六個(gè)例子中, Jackson 會(huì)根據(jù)泛型WildcardType來(lái)應(yīng)用.

    然而,你也可以創(chuàng)建一個(gè)自定義轉(zhuǎn)換器,并使用targetMethod 消息屬性來(lái)決定將JSON轉(zhuǎn)換成哪種類型.

    這種類型接口只能在@RabbitListener 注解聲明在方法級(jí)上才可實(shí)現(xiàn).在類級(jí)@RabbitListener, 轉(zhuǎn)換類型用來(lái)選擇調(diào)用哪個(gè)@RabbitHandler 方法.基于這個(gè)原因,基礎(chǔ)設(shè)施提供了targetObject 消息屬性,它可用于自定義轉(zhuǎn)換器來(lái)確定類型.

    MarshallingMessageConverter

    還有一個(gè)選擇是MarshallingMessageConverter.它會(huì)委派到Spring OXM 包的 Marshaller 和 Unmarshaller 策略接口實(shí)現(xiàn). 

    你可從here了解更多. 在配置方面,最常見(jiàn)的是只提供構(gòu)造器參數(shù),因?yàn)榇蟛糠?code style="font-family: Monaco, Consolas, Courier, 'Lucida Console', monospace; color: #6d180b; background-color: #f2f2f2;">Marshaller 的實(shí)現(xiàn)都將實(shí)現(xiàn)Unmarshaller.

    <bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
    <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
    <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
    </bean>
    </property>
    </bean>

    ContentTypeDelegatingMessageConverter

    這個(gè)類是在1.4.2版本中引入的,并可基于MessagePropertiescontentType屬性允許委派給一個(gè)特定的MessageConverter.默認(rèn)情況下,如果沒(méi)有contentType屬性或值沒(méi)有匹配配置轉(zhuǎn)換器時(shí),它會(huì)委派給SimpleMessageConverter.

    <bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
    <property name="delegates">
    <map>
    <entry key="application/json" value-ref="jsonMessageConverter" />
    <entry key="application/xml" value-ref="xmlMessageConverter" />
    </map> 
    </property>
    </bean>

    Java 反序列化

    重要
    當(dāng)從不可信任的來(lái)源反序列化Java對(duì)象時(shí),存在一個(gè)可能的漏洞.如果從不可信來(lái)源,使用內(nèi)容類型 application/x-java-serialized-object來(lái)接收消息,你可以考慮配置允許哪些包/類能反序列化.這既適用于SimpleMessageConverter,也適用于SerializerMessageConverter,當(dāng)它被配置為使用一個(gè)DefaultDeserializer時(shí) -或含蓄地或通過(guò)配置方式的。

    默認(rèn)情況下,白名單列表是空的,這意味著所有類都會(huì)反序列化.你可以設(shè)置模式列表,如 foo.*foo.bar.Baz 或 *.MySafeClass.模式會(huì)按照順序進(jìn)行檢查,直到找到匹配的模式.如果沒(méi)有找到匹配,將拋出SecurityException.在這些轉(zhuǎn)換器上,可使用whiteListPatterns 屬性來(lái)設(shè)置.

    消息屬性轉(zhuǎn)換器

     MessagePropertiesConverter 策略接口用于Rabbit Client BasicProperties 與Spring AMQP MessageProperties之間轉(zhuǎn)換. 默認(rèn)實(shí)現(xiàn)(DefaultMessagePropertiesConverter)通常可滿雖大部分需求,但如果有需要,你可以自己實(shí)現(xiàn). 當(dāng)大小不超過(guò)1024字節(jié)時(shí),默認(rèn)屬性轉(zhuǎn)換器將 BasicProperties 中的LongString 轉(zhuǎn)換成String . 更大的 LongString 將不會(huì)進(jìn)行轉(zhuǎn)換(參考下面的內(nèi)容.這個(gè)限制可通過(guò)構(gòu)造器參數(shù)來(lái)覆蓋.

    從1.6版本開(kāi)始, 現(xiàn)在headers 長(zhǎng)超過(guò) long string 限制(默認(rèn)為1024) 將被DefaultMessagePropertiesConverter保留作為 LongString . 你可以通過(guò) the getBytes[]toString(), 或getStream() 方法來(lái)訪問(wèn)內(nèi)容.

    此前, DefaultMessagePropertiesConverter 會(huì)將這樣的頭轉(zhuǎn)換成一個(gè) DataInputStream (實(shí)際上它只是引用了LongStringDataInputStream). 在輸出時(shí),這個(gè)頭不會(huì)進(jìn)行轉(zhuǎn)換(除字符串外,如在流上調(diào)用toString()方法 java.io.DataInputStream@1d057a39).

    更大輸入LongString 頭現(xiàn)在可正確地轉(zhuǎn)換,在輸出時(shí)也一樣.

    它提供了一個(gè)新的構(gòu)造器來(lái)配置轉(zhuǎn)換器,這樣可像以前一樣來(lái)工作:

    /**
     * Construct an instance where LongStrings will be returned
     * unconverted or as a java.io.DataInputStream when longer than this limit.
     * Use this constructor with 'true' to restore pre-1.6 behavior.
     * @param longStringLimit the limit.
     * @param convertLongLongStrings LongString when false,
     * DataInputStream when true.
     * @since 1.6
     */
    public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

    另外,從1.6版本開(kāi)始,在 MessageProperties中添加了一個(gè)新屬性correlationIdString.此前,當(dāng)在RabbitMQ 客戶端中轉(zhuǎn)換BasicProperties 時(shí),將會(huì)執(zhí)行不必要的byte[] <-> String 轉(zhuǎn)換,這是因?yàn)?nbsp;MessageProperties.correlationId 是一個(gè)byte[] 而 BasicProperties 使用的是String

    (最終,RabbitMQ客戶端使用UTF-8字符串轉(zhuǎn)化為字節(jié)并放在協(xié)議消息中).


    為提供最大向后兼容性,新屬性correlationIdPolicy 已經(jīng)被加入到了DefaultMessagePropertiesConverter.它接受DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚舉參數(shù).

    默認(rèn)情況下,它設(shè)置為BYTES (復(fù)制先前的行為).

    對(duì)于入站消息:

    • STRING - 只映射correlationIdString 屬性
    • BYTES - 只映射correlationId 屬性
    • BOTH - 會(huì)同時(shí)映射兩個(gè)屬性
    對(duì)于出站消息:
    • STRING - 只映射correlationIdString 屬性
    • BYTES - 只映射correlationId 屬性
    • BOTH - 兩種屬性都會(huì)考慮,但會(huì)優(yōu)先考慮String 屬性

    也從1.6版本開(kāi)始,入站deliveryMode 屬性不再需要映射 MessageProperties.deliveryMode,相反使用MessageProperties.receivedDeliveryMode 來(lái)代替.另外,入站userId 屬性也不需要再映射MessageProperties.userId,相反使用MessageProperties.receivedUserId 來(lái)映射. 

    這種變化是為了避免這些屬性的意外傳播,如果同樣的MessageProperties 對(duì)象用于出站消息時(shí).

    3.1.8 修改消息- 壓縮以及更多

    提供了許多的擴(kuò)展點(diǎn),通過(guò)它們你可以對(duì)消息執(zhí)行預(yù)處理,要么在發(fā)送RabbitMQ之前,要么在接收到消息之后.

    正如你在Section 3.1.7, “Message Converters”看到的,這樣的擴(kuò)展點(diǎn)存在于AmqpTemplate convertAndReceive 操作中,在那里你可以提供一個(gè)MessagePostProcessor

    例如,你的POJO轉(zhuǎn)換之后, MessagePostProcessor 允許你在Message上設(shè)置自定義的頭或?qū)傩裕?/font>

    從1.4.2版本開(kāi)始,額外的擴(kuò)展點(diǎn)已經(jīng)添加到RabbitTemplate - setBeforePublishPostProcessors() 和setAfterReceivePostProcessors(). 第一個(gè)開(kāi)啟了一個(gè)post processor來(lái)在發(fā)送消息到RabbitMQ之前立即運(yùn)行.當(dāng)使用批量時(shí)(參考 the section called “Batching”), 這會(huì)在批處理裝配之后發(fā)送之前調(diào)用. 

    第二個(gè)會(huì)在收到消息后立即調(diào)用.

    這些擴(kuò)展點(diǎn)對(duì)于壓縮這此功能是有用的,基于這些目的,提供了多個(gè)MessagePostProcessor:

    • GZipPostProcessor
    • ZipPostProcessor

    針對(duì)于發(fā)送前的消息壓縮,以及

    • GUnzipPostProcessor
    • UnzipPostProcessor

    針對(duì)于消息解壓.

    類似地, SimpleMessageListenerContainer 也有一個(gè) setAfterReceivePostProcessors() 方法,

    允許在消息收到由容器來(lái)執(zhí)行解壓縮.


    posted on 2016-08-13 12:48 胡小軍 閱讀(13047) 評(píng)論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 久久精品亚洲综合一品| 无码少妇精品一区二区免费动态| 国产在线观看免费不卡| 亚洲欧美自偷自拍另类视| 毛片免费视频在线观看| 亚洲国产熟亚洲女视频| 18国产精品白浆在线观看免费 | 最近中文字幕mv免费高清视频7| 2020久久精品亚洲热综合一本| 永久免费av无码不卡在线观看| 亚洲国产成人99精品激情在线| 三年片在线观看免费观看高清电影 | 最近免费中文在线视频| 亚洲理论在线观看| 麻豆视频免费播放| 亚洲中文字幕AV在天堂| 白白国产永久免费视频| 亚洲欧美日韩中文高清www777| 日韩一级在线播放免费观看| 日韩亚洲综合精品国产| 亚洲国产激情一区二区三区| jzzjzz免费观看大片免费| 亚洲午夜久久久久久噜噜噜| 久久精品免费观看国产| 亚洲欧洲日韩综合| 成全视频在线观看免费高清动漫视频下载| 亚洲一区二区三区写真| 国产精品四虎在线观看免费| 免费国产黄网站在线看| 亚洲人成人网站色www| 免费女人高潮流视频在线观看| 亚洲成a人片在线网站| 成人a免费α片在线视频网站| 蜜臀亚洲AV无码精品国产午夜.| 亚洲国产精品成人一区| 97国免费在线视频| 亚洲精品国产电影午夜| 成人毛片18女人毛片免费96| 免费高清A级毛片在线播放| 国产亚洲成av人片在线观看| 69影院毛片免费观看视频在线 |