<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    3.1.10 配置broker

    介紹

    AMQP 規范描述了協議是如何用于broker中隊列,交換器以及綁定上的.這些操作是從0.8規范中移植的,更高的存在于org.springframework.amqp.core包中的AmqpAdmin 接口中.
    那個接口的RabbitMQ 實現是RabbitAdmin,它位于org.springframework.amqp.rabbit.core 包.

    AmqpAdmin接口是基于Spring AMQP 域抽象,展示如下:

    public interface AmqpAdmin {      
    // Exchange Operations
    void declareExchange(Exchange exchange);      
    void deleteExchange(String exchangeName);
    // Queue Operations
    Queue declareQueue();
    String declareQueue(Queue queue);
    void deleteQueue(String queueName);
    void deleteQueue(String queueName, boolean unused, boolean empty);
    void purgeQueue(String queueName, boolean noWait);
    // Binding Operations
    void declareBinding(Binding binding);      
    void removeBinding(Binding binding);
    Properties getQueueProperties(String queueName);
    }

    getQueueProperties() 方法會返回關于隊列的的一些有限信息(消息個數和消費者數目). 屬性返回的keys像RabbitTemplate (QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)中的常量一樣是可用的. 
    RabbitMQ REST API 提供了更多關于 QueueInfo 對象的信息.

    無參 declareQueue() 方法在broker上定義了一個隊列,其名稱是自動生成的. 自動生成隊列的其它屬性是exclusive=trueautoDelete=true, and durable=false.

    declareQueue(Queue queue) 方法接受一個 Queue 對象,并且返回聲明隊列的名稱.如果提供的隊列名稱是空字符串,broker 使用生成的名稱來聲明隊列再將名稱返回給調用者. Queue 對象本身是不會變化的. 

    這種功能只能用于編程下直接調用RabbitAdmin. 它不支持在應用程序上下文中由admin來定義隊列的自動聲明.

    與此形成鮮明對比的是,AnonymousQueue,框架會為其生成唯一名稱(UUID),durable為false,exclusiveautoDelete 為true的匿名隊列<rabbit:queue/> 帶空的或缺失的name 屬性總會創建 一個AnonymousQueue.

    參考the section called “AnonymousQueue” 來理解為什么 AnonymousQueue 會優先選擇broker生成隊列名稱,以及如何來控制名稱格式. 聲明隊列必須有固定的名稱,因為它們可能會上下文的其它地方引用,例如,在監聽器中

    <rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
    </rabbit:listener-container>

    參考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”.

    此接口的RabbitMQ實現是RabbitAdmin,當用Spring XML配置時,看起來像下面這樣:

    <rabbit:connection-factory id="connectionFactory"/>
    <rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

    CachingConnectionFactory 緩存模式是CHANNEL 時(默認的),  RabbitAdmin 實現會在同一個ApplicationContext中自動延遲聲明 Queues,Exchanges 和 Bindings.
    只要Connection打開了與Broker的連接,這些組件就會被聲明.有一些命名空間特性可以使這些變得便利,如,在Stocks 樣例程序中有:

    <rabbit:queue id="tradeQueue"/>
    <rabbit:queue id="marketDataQueue"/>
    <fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
    <binding queue="tradeQueue"/>
    </bindings>
    </fanout-exchange>
    <topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
    <binding queue="marketDataQueue"pattern="${stocks.quote.pattern}"/>
    </bindings>
    </topic-exchange>

    在上面的例子中,我們使用匿名隊列(實際上由框架內部生成,而非由broker生成的隊列),并用ID進行了指定.我們也可以使用明確的名稱來聲明隊列,也作為上下文中bean定義的標識符.如.

    <rabbit:queue name="stocks.trade.queue"/>
    重要
    你可以提供id 和 name 屬性.這允許你獨立于隊列名稱通過id來指定隊列.它允許使用標準的Spring 屬性,如屬性占位符和隊列名稱的SpEL 表達式; 當使用名稱來作為標識符,這些特性是不可用的.

    隊列也可以使用其它的參數進行配置,例如x-message-ttl 或 x-ha-policy.通過命名空間支持,它們可以通過<rabbit:queue-arguments>元素以參數名/參數值的MAP形式來提供 .

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
    <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    默認情況下,參數假設為字符串.對于其它類型的參數,需要提供類型.

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
    <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    當提供混合類型的參數時,可為每個entry元素提供type:

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
    <entry key="x-message-ttl">
    <value type="java.lang.Long">100</value>
    </entry>
    <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    在Spring Framework 3.2或以后,聲明起來更加簡潔:

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
    <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
    <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
    </rabbit:queue>
    重要
    RabbitMQ broker 不允許使用不匹配的參數來聲明隊列. 例如,如果一個無time to live參數的隊列已經存在,然后你試圖使用 key="x-message-ttl" value="100"進行聲明,那么會拋出一個異常.

    默認情況下,當出現異常時, RabbitAdmin 會立即停止所有聲明的處理過程;這可能會導致下游問題- 如監聽器容器會初始化失敗,因另外的隊列沒有聲明.

    這種行為可以通過在RabbitAdmin上設置 ignore-declaration-exceptions 為true來修改. 此選項會指示RabbitAdmin 記錄異常,并繼續聲明其它元素.當使用Java來配置RabbitAdmin 時, 此屬性為ignoreDeclarationExceptions.
    這是一個全局設置,它將應用到所有元素上,如應用到queues, exchanges 和bindings這些具有相似屬性的元素上.

    在1.6版本之前, 此屬性只會在channel上發生IOExcepton時才會起作用- 如當目前和期望屬性發生錯配時. 現在, 這個屬性可在任何異常上起作用,包括TimeoutException 等等.

    此外,任何聲明異常都會導致發布DeclarationExceptionEvent, 這是一個ApplicationEvent ,在上下文中可通過任何ApplicationListener 消費. 此事件包含了admin的引用, 正在聲明的元素以及Throwable.

    從1.3版本開始, HeadersExchange 可配置匹配多個headers; 你也可以指定是否需要必須匹配任何一個或全部headers:

    <rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
    <rabbit:binding queue="bucket">
    <rabbit:binding-arguments>
    <entrykey="foo"value="bar"/>
    <entrykey="baz"value="qux"/>
    <entrykey="x-match"value="all"/>
    </rabbit:binding-arguments>
    </rabbit:binding>
    </rabbit:bindings>
    </rabbit:headers-exchange>

    從1.6版本開始,Exchanges 可使用internal 標志來配置(默認為false) ,當然,這樣的Exchange 也可以通過 RabbitAdmin 來配置(如果在應用程序上下文中存在).
    如果對于交換器來說,internal 標志為true , RabbitMQ 會允許客戶端來使用交換器.這對于死信交換器來說或交換器到交換器綁定來說,是很用的,因為在這些地方你不想讓發布者直接使用交換器.

    要看如何使用Java來配置AMQP基礎設施,可查看Stock樣例程序,在那里有一個帶@Configuration 注解的抽象AbstractStockRabbitConfiguration 類,它依次有RabbitClientConfiguration 和 RabbitServerConfiguration 子類. AbstractStockRabbitConfiguration 的代碼展示如下:

    @Configuration
    public abstract class AbstractStockAppRabbitConfiguration {
    
        @Bean
     public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            return connectionFactory;
        }
    
        @Bean
     public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setMessageConverter(jsonMessageConverter());
            configureRabbitTemplate(template);
            return template;
        }
    
        @Bean
     public MessageConverter jsonMessageConverter() {
            returnnew JsonMessageConverter();
        }
    
        @Bean
     public TopicExchange marketDataExchange() {
            returnnew TopicExchange("app.stock.marketdata");
        }
    
        // additional code omitted for brevity
    
    }

    在Stock 程序中,服務器使用下面的@Configuration注解來配置:

    @Configuration
    public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {
    
        @Bean
     public Queue stockRequestQueue() {
            returnnew Queue("app.stock.request");
        }
    }

    這是整個@Configuration 類繼承鏈結束的地方. 最終結果是TopicExchange 和隊列會在應用程序啟動時被聲明.在服務器配置中,沒有TopicExchange與隊列的綁定,因為這是在客戶端程序完成的.
    然后stock 請求隊列是自動綁定到AMQP 默認交換器上的 - 這種行為是由規范來定義的.

    客戶端 @Configuration 類令人關注的地方展示如下.

    @Configuration
    public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
    
        @Value("${stocks.quote.pattern}")
     private String marketDataRoutingKey;
    
        @Bean
     public Queue marketDataQueue() {
            return amqpAdmin().declareQueue();
        }
    
        /**
         * Binds to the market data exchange.
         * Interested in any stock quotes
         * that match its routing key.
         */@Bean
      public Binding marketDataBinding() {
            return BindingBuilder.bind(
                    marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
        }
    
        // additional code omitted for brevity
    
    }

    客戶端使用AmqpAdmin的declareQueue()方法聲明了另一個隊列,并將其綁定到了market data 交換器上(路由鍵模式是通常外部properties文件來定義的).

    Queues 和Exchanges的Builder API

    當使用Java配置時,1.6版本引入了一個便利的API來配置Queue 和Exchange 對象:

    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("foo")
            .autoDelete()
            .exclusive()
            .withArgument("foo", "bar")
            .build();
    }
    
    @Bean
    public Exchange exchange() {
      return ExchangeBuilder.directExchange("foo")
          .autoDelete()
          .internal()
          .withArgument("foo", "bar")
          .build();
    }

    查看 org.springframework.amqp.core.QueueBuilder 

    和 org.springframework.amqp.core.ExchangeBuilder 的JavaDoc來了解更多信息.

    Declaring Collections of Exchanges, Queues, Bindings

    從1.5版本開始,可以在一個@Bean聲明多個條目來返回集合.

    只有集合中的第一個元素可認為是Declarablea的,并且只有集合中的Declarable 元素會被處理.(

    Only collections where the first element is a Declarable are considered, and only Declarable elements from such collections are processed.)

    @Configuration
    public static class Config {
    
        @Bean
    public ConnectionFactory cf() {
            returnnew CachingConnectionFactory("localhost");
        }
    
        @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
            returnnew RabbitAdmin(cf);
        }
    
        @Bean
    public DirectExchange e1() {
        	returnnew DirectExchange("e1", false, true);
        }
    
        @Bean
    public Queue q1() {
        	returnnew Queue("q1", false, false, true);
        }
    
        @Bean
    public Binding b1() {
        	return BindingBuilder.bind(q1()).to(e1()).with("k1");
        }
    
        @Bean
    public List<Exchange> es() {
        	return Arrays.<Exchange>asList(
        			new DirectExchange("e2", false, true),
        			new DirectExchange("e3", false, true)
        	);
        }
    
        @Bean
    public List<Queue> qs() {
        	return Arrays.asList(
        			new Queue("q2", false, false, true),
        			new Queue("q3", false, false, true)
        	);
        }
    
        @Bean
    public List<Binding> bs() {
        	return Arrays.asList(
        			new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
        			new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)
        	);
        }
    
        @Bean
    public List<Declarable> ds() {
        	return Arrays.<Declarable>asList(
        			new DirectExchange("e4", false, true),
        			new Queue("q4", false, false, true),
        			new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)
        	);
        }
    
    }

    條件聲明

    默認情況下,所有queues, exchanges,和bindings 都可通過應用程序上下文中所有RabbitAdmin 實例來聲明(設置了auto-startup="true").

    重要

    從1.2版本開始,可以有條件地聲明元素.當程序連接了多個brokers,并需要在哪些brokers上聲明特定元素時,特別有用.

    代表這些元素要實現Declarable 接口,此接口有兩個方法: shouldDeclare() 和 getDeclaringAdmins()RabbitAdmin 使用這些方法來確定某個特定實例是否應該在其Connection上處理聲明.

    這些屬性作為命名空間的屬性是可用的,如下面的例子所示.

    <rabbit:admin id="admin1" connection-factory="CF1" />
    <rabbit:admin id="admin2" connection-factory="CF2" />
    <rabbit:queue id="declaredByBothAdminsImplicitly" />
    <rabbit:queue id="declaredByBothAdmins" declared-by="admin1, admin2" />
    <rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
    <rabbit:queue id="notDeclaredByAny" auto-declare="false" />
    <rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
    <rabbit:bindingkey="foo" queue="bar"/>
    </rabbit:bindings>
    </rabbit:direct-exchange>
    重要
    默認情況下,如果沒有提供declared-by(或是空的) auto-declare 屬性則為 true,那么所有RabbitAdmin將聲明對象(只要admin的auto-startup 屬性為true,默認值).

    現樣的,你可以使用基于Java的@Configuration 注解來達到同樣的效果.在這個例子中,組件會由admin1來聲明,而不是admin2:

    @Bean
    public RabbitAdmin admin() {
    	RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
    	rabbitAdmin.afterPropertiesSet();
    	return rabbitAdmin;
    }
    
    @Bean
    public RabbitAdmin admin2() {
    	RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
    	rabbitAdmin.afterPropertiesSet();
    	return rabbitAdmin;
    }
    
    @Bean
    public Queue queue() {
    	Queue queue = new Queue("foo");
    	queue.setAdminsThatShouldDeclare(admin());
    	return queue;
    }
    
    @Bean
    public Exchange exchange() {
    	DirectExchange exchange = new DirectExchange("bar");
    	exchange.setAdminsThatShouldDeclare(admin());
    	return exchange;
    }
    
    @Bean
    public Binding binding() {
    	Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    	binding.setAdminsThatShouldDeclare(admin());
    	return binding;
    }

    AnonymousQueue

    一般來說,當需要一個獨特命名,專用的,自動刪除隊列時,建議使用AnonymousQueue 來代替中間件定義的隊列名稱(使用 "" 作為隊列名稱會導致中間件生成隊列名稱).

    這是因為:

    1. 隊列實際上是在與broker的連接建立時聲明的;這在bean創建和包裝之后要很長時間;使用這個隊列的beans需要知道其名稱.而事實上,當app啟動時,broker甚至還沒有運行.
    2. 如果與broker的連接因某種原因丟失了,admin會使用相同的名稱會重新聲明AnonymousQueue.如果我們使用broker-聲明隊列,隊列名稱可能會改變.

    從1.5.3版本開始,你可通過AnonymousQueue 來控制隊列名稱的格式.

    默認情況下,隊列名稱是UUID的字符串表示; 例如: 07afcfe9-fe77-4983-8645-0061ec61a47a.

    現在,你可以提供一個 AnonymousQueue.NamingStrategy 實現作為其構造器參數:

    @Bean
    public Queue anon1() {
        return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
    }
    
    @Bean
    public Queue anon2() {
        return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
    }

    第一個會生成隊列名稱前輟spring.gen- 其后為UUID base64 的表示,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g. 第二個會生成隊列名稱前輟為foo- 其后為UUID的 base64 表示.

    base64 編碼使用RFC 4648的"URL and Filename Safe Alphabet" ; 刪除了字符(=).

    你可以提供你自己的命名策略, 可以包括隊列名稱中的其他信息(例如應用程序、客戶端主機)。

    從1.6版本開始,當使用XML配置時,可以指定命名策略; naming-strategy 屬性出現在<rabbit:queue>元素的屬性中,對于bean引用來說,它們實現了AnonymousQueue.NamingStrategy.

    <rabbit:queue id="uuidAnon" />
    <rabbit:queue id="springAnon" naming-strategy="springNamer" />
    <rabbit:queue id="customAnon" naming-strategy="customNamer" />
    <bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy" />
    <bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
    </bean>
    第一個創建了UUID字符串表示的名稱.第二個創建了類似spring.gen-MRBv9sqISkuCiPfOYfpo4g的名稱. 第三個創建了類似custom.gen-MRBv9sqISkuCiPfOYfpo4g的名稱.

    當然,你可以提供你自己的命名策略bean.

    3.1.11 延遲的消息交換器

    1.6版本引入了 Delayed Message Exchange Plugin支持.

    該插件目前被標記為實驗性質,但可用已超過一年(在寫作的時間)。如果插件的變化是必要的,我們將盡快添加支持這樣的變化。因此,這種在Spring AMQP支持同樣也應考慮為實驗性質.這個功能在RabbitMQ 3.6.0版本和0.0.1插件版本中經過測試。

    要使用RabbitAdmin 來聲明一個延遲交換器,只需要在交換器上簡單地設置delayed 屬性為true. RabbitAdmin 會使用交換器類型(DirectFanout 等)來設置x-delayed-type 參數,并使用x-delayed-message來聲明交換器.

    當使用XML來配置交換器beans時,delayed 屬性 (默認為false)是可用的.

    <rabbit:topic-exchange name="topic" delayed="true" />

    要發送延遲消息,只需要通過MessageProperties設置x-delay header:

    MessageProperties properties = new MessageProperties();
    properties.setXDelay(15000);
    template.send(exchange, routingKey,
            MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());

    rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
    
        @Override
    public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setXDelay(15000);
            return message;
        }
    
    });

    要檢查消息是否是延遲的,可調用MessagePropertiesgetReceivedDelay()它是一個獨立的屬性,以避免從一個輸入消息意外的傳播到一個輸出消息。

    3.1.12 RabbitMQ REST API

    當啟用了管理插件時,RabbitMQ 服務器公開了 REST API 來監控和配置broker. 

    現在提供了 Java Binding for the API.一般來說,你可以直接使用API,但提供了便利的包裝器來使用熟悉的Spring AMQP QueueExchange, 和 Binding 域對象.
    當直接使用 com.rabbitmq.http.client.Client API  (分別使用QueueInfoExchangeInfo, 和BindingInfo),那些對象的更多信息將可用. 下面是RabbitManagementTemplate上的可用操作:

    public interface AmqpManagementOperations {
    
    	void addExchange(Exchange exchange);
    
    	void addExchange(String vhost, Exchange exchange);
    
    	void purgeQueue(Queue queue);
    
    	void purgeQueue(String vhost, Queue queue);
    
    	void deleteQueue(Queue queue);
    
    	void deleteQueue(String vhost, Queue queue);
    
    	Queue getQueue(String name);
    
    	Queue getQueue(String vhost, String name);
    
    	List<Queue> getQueues();
    
    	List<Queue> getQueues(String vhost);
    
    	void addQueue(Queue queue);
    
    	void addQueue(String vhost, Queue queue);
    
    	void deleteExchange(Exchange exchange);
    
    	void deleteExchange(String vhost, Exchange exchange);
    
    	Exchange getExchange(String name);
    
    	Exchange getExchange(String vhost, String name);
    
    	List<Exchange> getExchanges();
    
    	List<Exchange> getExchanges(String vhost);
    
    	List<Binding> getBindings();
    
    	List<Binding> getBindings(String vhost);
    
    	List<Binding> getBindingsForExchange(String vhost, String exchange);
    
    }

    參考javadocs 來了解更多信息.

    3.1.13 異常處理

    RabbitMQ Java client的許多操作會拋出受查異常. 例如,有許多可能拋出IOExceptions的地方. RabbitTemplate, SimpleMessageListenerContainer, 和其它Spring AMQP 組件會捕獲這些異常,并將它們轉換為運行時層次的異常.
    這些是定義在
    org.springframework.amqp 包中的,且 AmqpException 是層次結構的基礎.

    當監聽器拋出異常時,它會包裝在一個 ListenerExecutionFailedException 中,正常情況下消息會被拒絕并由broker重新排隊.將defaultRequeueRejected 設置為false 可導致消息丟棄(或路由到死信交換器中). 

    正如 the section called “Message Listeners and the Asynchronous Case”討論的,監聽器可拋出 AmqpRejectAndDontRequeueException 來有條件地控制這種行為。

    然而,有一種類型的錯誤,監聽器無法控制其行為. 當遇到消息不能轉換時(例如,無效的content_encoding 頭),那么消息在到達用戶代碼前會拋出一些異常.當設置 defaultRequeueRejected 為 true (默認),這樣的消息可能會一遍又一遍地重新投遞.
    在1.3.2版本之前,用戶需要編寫定制ErrorHandler, 正如Section 3.1.13, “Exception Handling” 描述的內容來避免這種情況.

    從1.3.2版本開始,默認的ErrorHandler 是 ConditionalRejectingErrorHandler ,它將拒絕那些失敗且不可恢復的消息 (不會重新排隊):

    • o.s.amqp...MessageConversionException
    • o.s.messaging...MessageConversionException
    • o.s.messaging...MethodArgumentNotValidException
    • o.s.messaging...MethodArgumentTypeMismatchException

    第一個是在使用MessageConverter轉換傳入消息負荷時拋出的.
    第二個是當映射到@RabbitListener方法時,轉換服務需要其它轉換拋出的.
    第三個是在監聽器上使用了驗證(如.@Valid),且驗證失敗時拋出的.
    第四個是對于目標方法傳入消息類型轉換失敗拋出的.例如,參數聲明為Message<Foo> ,但收到的是Message<Bar>

    錯誤處理器的實例可使用FatalExceptionStrategy 來配置,因為用戶可以提供它們的規則來有條件的拒絕消息,如. 來自 Spring Retry (the section called “Message Listeners and the Asynchronous Case”)中的BinaryExceptionClassifier代理實現.
    此外, ListenerExecutionFailedException 現在有一個可用于決策的failedMessage 屬性.如果FatalExceptionStrategy.isFatal() 方法返回true,錯誤處理器會拋出AmqpRejectAndDontRequeueException.
    默認FatalExceptionStrategy 會記錄warning信息.

    3.1.14 事務(Transactions)

    介紹

    Spring Rabbit 框架支持在同步和異步使用中使用不同語義(這一點對于現有Spring事務的用戶是很熟悉的)來支持自動事務管理. 它做了很多,不是常見消息模式能輕易實現的.

    有兩種方法可用來向框架發出期望事務語義的信號.在RabbitTemplate 和 SimpleMessageListenerContainer 中,這里有一個channelTransacted 標記,如果它為true,就會告知框架使用事務通道,并根據結果使用提交或回滾來結束所有操作,出現異常時則發出回滾信號. 

    另一個提供的信號是Spring的PlatformTransactionManager實現(作為正在進行的操作的上下文)外部事務 
    當框架發送或接收消息時,如果過程中已經存在一個事
    務,且channelTransacted 標記為true, 那么當前消息事務的提交或回滾操作會延遲直到在當前事務結束.如果channelTransacted 標記為false,那么消息操作是不會應用事務語義(它是自動應答的).

    channelTransacted 標記是一個配置時設置:它只在AMQP組件聲明時執行一次,通常在應用程序啟動時.原則上,外部事務更加動態化,因為需要在運行時根據當前線程狀態來響應,當事務分層到應用程序上時,原則上來說它通常也是一個配置設置.

    對于使用RabbitTemplate 的同步使用,外部事務是由調用者提供的, 要么是聲明的,要么是強制的(日常Spring事務模式). 

    下面是聲明方法的一個例子(通常選擇這個,因為它是非侵入的), 下面的例子中,模板已經配置了channelTransacted=true:

    @Transactional
    public void doSomething() {
        String incoming = rabbitTemplate.receiveAndConvert();
        // do some more database processing...
        String outgoing = processInDatabaseAndExtractReply(incoming);
        rabbitTemplate.convertAndSend(outgoing);
    }

    收到字符負荷,轉換,并以消息體發送到@Transactional標記的方法中,因此如果數據處理因異常失敗了,傳入消息將返回到broker,并且輸出消息不會被發送.
    在事務方法鏈中,這適用于
    RabbitTemplate 中的所有操作(除非Channel 較早地直接控制了提交事務).

    對于SimpleMessageListenerContainer 的異步使用情況,如果需要外部事務,當設置了監聽器時,必須由容器來發出請求.
    為了表示需要外部事務,當配置時,用戶為容器提供了PlatformTransactionManager 實現.例如:

    @Configuration
    public class ExampleExternalTransactionAmqpConfiguration {
    
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(rabbitConnectionFactory());
            container.setTransactionManager(transactionManager());
            container.setChannelTransacted(true);
            container.setQueueName("some.queue");
            container.setMessageListener(exampleListener());
            return container;
        }
    
    }

    在上面的例子中,事務管理器是通過其它bean中注入添加的(未顯示),并且channelTransacted 也設置為了true.其效果是如果監聽器因異常失敗了,那么事務將回滾,消息也會退回到broker中.
    明顯地,如果事務提交失敗(如.數據庫約束錯誤,或通過問題),那么AMQP 事務也要回滾,且消息也會回退到broker中.
    有時候,這被稱為最好努力1階段提交(Best Efforts 1 Phase Commit),它是可靠消息非常強大的模式.
    如果在上面的例子中將channelTransacted標志設為false(默認為false),那么外部事務仍會提供給監聽器,但所有消息操作都是自動應答的, 因此其效果是即使發生了業務操作,也會提供消息操作.

    關于接收消息的回滾說明

    AMQP 事務只適用于發送應答給broker, 所以當有 Spring 事務回滾且又收到了消息時,Spring AMQP做的不僅要回滾事務,還要手動拒絕消息.
    消息上的拒絕操作獨立于事務,依賴于defaultRequeueRejected 屬性(默認為true). 更多關于拒絕失敗消息的詳情,請參考the section called “Message Listeners and the Asynchronous Case”.

    關于RabbitMQ 事務及其局限性的更多信息,參考RabbitMQ Broker Semantics.

    重要

    在 RabbitMQ 2.7.0前, 這樣的消息(當通道關閉或中斷時未應的消息)會回到隊列中,從2.7.0, 拒絕消息會跑到隊列前邊,與JMS回滾消息方式類似.

    使用RabbitTransactionManager

    RabbitTransactionManager 是執行同步,外部事務Rabbit操作的另一種選擇.這個事務管理器是PlatformTransactionManager 接口的實現類,應該在單個Rabbit ConnectionFactory中使用.

    重要

    此策略不能提供XA事務,比如,要在消息和數據庫之間共享事務.

    應用代碼需要通過ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)來獲取事務性Rabbit資源而不是使用Connection.createChannel() 調用.
    當使用Spring AMQP的 RabbitTemplate時, 
    它會自動檢測線程綁定通道和自動參與事務。

    在 Java 配置中,你可以使用下面的代碼來設置一個新的RabbitTransactionManager:

    @Bean
    public RabbitTransactionManager rabbitTransactionManager() {
        returnnew RabbitTransactionManager(connectionFactory);
    }

    如果你喜歡使用XML 配置,可以像下面進行聲明:

    <bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <propertyname="connectionFactory" ref="connectionFactory"/>
    </bean>

    3.1.15 消息監聽器容器配置

    有相當多的配置SimpleMessageListenerContainer 相關事務和服務質量的選項,它們之間可以互相交互.當使用命名空間來配置<rabbit:listener-container/>時,

    下表顯示了容器屬性名稱和它們等價的屬性名稱(在括號中).

    未被命名空間暴露的屬性,以`N/A`表示.

    posted on 2016-08-13 16:07 胡小軍 閱讀(4965) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 国产亚洲成在线播放va| 亚洲av成人一区二区三区| 亚洲av乱码一区二区三区| 美女内射无套日韩免费播放 | 9久热这里只有精品免费| 国产色爽女小说免费看| 亚洲乱码国产乱码精华| 成人免费无码精品国产电影| 亚洲乱妇老熟女爽到高潮的片| 中文字幕av无码无卡免费| 亚洲资源最新版在线观看| 18勿入网站免费永久| 亚洲免费在线视频观看| 成人毛片免费观看视频在线| 亚洲国产综合AV在线观看| 国产精品久久香蕉免费播放| 黄色免费网址在线观看| 亚洲人成影院在线观看| 中文字幕免费播放| 亚洲日韩图片专区第1页| 67pao强力打造国产免费| 日韩亚洲人成在线| 免费va人成视频网站全| 亚洲第一视频在线观看免费| 亚洲国产精品SSS在线观看AV| 久草免费手机视频| 亚洲香蕉在线观看| 四虎影视精品永久免费网站| 国产成人无码免费网站| 久久亚洲日韩精品一区二区三区| 日韩吃奶摸下AA片免费观看| 日韩国产欧美亚洲v片| 精品亚洲综合在线第一区| 日本免费一区二区在线观看| 亚洲国产精品日韩av不卡在线| 亚洲一本大道无码av天堂| 亚州免费一级毛片| 免费无码国产V片在线观看| 亚洲av永久无码精品秋霞电影影院| 中文字幕无码不卡免费视频| selaoban在线视频免费精品|