<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 41  文章 - 7  trackbacks - 0
    <2016年8月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿

    隨筆分類

    隨筆檔案

    搜索

    •  

    最新評(píng)論

    閱讀排行榜

    評(píng)論排行榜

    3.1.10 配置broker

    介紹

    AMQP 規(guī)范描述了協(xié)議是如何用于broker中隊(duì)列,交換器以及綁定上的.這些操作是從0.8規(guī)范中移植的,更高的存在于org.springframework.amqp.core包中的AmqpAdmin 接口中.
    那個(gè)接口的RabbitMQ 實(shí)現(xiàn)是RabbitAdmin,它位于org.springframework.amqp.rabbit.core 包.

    AmqpAdmin接口是基于Spring AMQP 域抽象,展示如下:

    public interface AmqpAdmin {      
    // Exchange Operations
    void declareExchange(Exchange exchange);      
    void deleteExchange(String exchangeName);
    // Queue Operations
    Queue declareQueue();
    String declareQueue(Queue queue);
    void deleteQueue(String queueName);
    void deleteQueue(String queueName, boolean unused, boolean empty);
    void purgeQueue(String queueName, boolean noWait);
    // Binding Operations
    void declareBinding(Binding binding);      
    void removeBinding(Binding binding);
    Properties getQueueProperties(String queueName);
    }

    getQueueProperties() 方法會(huì)返回關(guān)于隊(duì)列的的一些有限信息(消息個(gè)數(shù)和消費(fèi)者數(shù)目). 屬性返回的keys像RabbitTemplate (QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)中的常量一樣是可用的. 
    RabbitMQ REST API 提供了更多關(guān)于 QueueInfo 對(duì)象的信息.

    無(wú)參 declareQueue() 方法在broker上定義了一個(gè)隊(duì)列,其名稱是自動(dòng)生成的. 自動(dòng)生成隊(duì)列的其它屬性是exclusive=trueautoDelete=true, and durable=false.

    declareQueue(Queue queue) 方法接受一個(gè) Queue 對(duì)象,并且返回聲明隊(duì)列的名稱.如果提供的隊(duì)列名稱是空字符串,broker 使用生成的名稱來(lái)聲明隊(duì)列再將名稱返回給調(diào)用者. Queue 對(duì)象本身是不會(huì)變化的. 

    這種功能只能用于編程下直接調(diào)用RabbitAdmin. 它不支持在應(yīng)用程序上下文中由admin來(lái)定義隊(duì)列的自動(dòng)聲明.

    與此形成鮮明對(duì)比的是,AnonymousQueue,框架會(huì)為其生成唯一名稱(UUID),durable為false,exclusiveautoDelete 為true的匿名隊(duì)列<rabbit:queue/> 帶空的或缺失的name 屬性總會(huì)創(chuàng)建 一個(gè)AnonymousQueue.

    參考the section called “AnonymousQueue” 來(lái)理解為什么 AnonymousQueue 會(huì)優(yōu)先選擇broker生成隊(duì)列名稱,以及如何來(lái)控制名稱格式. 聲明隊(duì)列必須有固定的名稱,因?yàn)樗鼈兛赡軙?huì)上下文的其它地方引用,例如,在監(jiān)聽器中

    <rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
    </rabbit:listener-container>

    參考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”.

    此接口的RabbitMQ實(shí)現(xiàn)是RabbitAdmin,當(dāng)用Spring XML配置時(shí),看起來(lái)像下面這樣:

    <rabbit:connection-factory id="connectionFactory"/>
    <rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

    當(dāng)CachingConnectionFactory 緩存模式是CHANNEL 時(shí)(默認(rèn)的),  RabbitAdmin 實(shí)現(xiàn)會(huì)在同一個(gè)ApplicationContext中自動(dòng)延遲聲明 Queues,Exchanges 和 Bindings.
    只要Connection打開了與Broker的連接,這些組件就會(huì)被聲明.有一些命名空間特性可以使這些變得便利,如,在Stocks 樣例程序中有:

    <rabbit:queue id="tradeQueue"/>
    <rabbit:queue id="marketDataQueue"/>
    <fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
    <binding queue="tradeQueue"/>
    </bindings>
    </fanout-exchange>
    <topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
    <binding queue="marketDataQueue"pattern="${stocks.quote.pattern}"/>
    </bindings>
    </topic-exchange>

    在上面的例子中,我們使用匿名隊(duì)列(實(shí)際上由框架內(nèi)部生成,而非由broker生成的隊(duì)列),并用ID進(jìn)行了指定.我們也可以使用明確的名稱來(lái)聲明隊(duì)列,也作為上下文中bean定義的標(biāo)識(shí)符.如.

    <rabbit:queue name="stocks.trade.queue"/>
    重要
    你可以提供id 和 name 屬性.這允許你獨(dú)立于隊(duì)列名稱通過(guò)id來(lái)指定隊(duì)列.它允許使用標(biāo)準(zhǔn)的Spring 屬性,如屬性占位符和隊(duì)列名稱的SpEL 表達(dá)式; 當(dāng)使用名稱來(lái)作為標(biāo)識(shí)符,這些特性是不可用的.

    隊(duì)列也可以使用其它的參數(shù)進(jìn)行配置,例如x-message-ttl 或 x-ha-policy.通過(guò)命名空間支持,它們可以通過(guò)<rabbit:queue-arguments>元素以參數(shù)名/參數(shù)值的MAP形式來(lái)提供 .

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
    <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    默認(rèn)情況下,參數(shù)假設(shè)為字符串.對(duì)于其它類型的參數(shù),需要提供類型.

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
    <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    當(dāng)提供混合類型的參數(shù)時(shí),可為每個(gè)entry元素提供type:

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
    <entry key="x-message-ttl">
    <value type="java.lang.Long">100</value>
    </entry>
    <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
    </rabbit:queue>

    在Spring Framework 3.2或以后,聲明起來(lái)更加簡(jiǎn)潔:

    <rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
    <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
    <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
    </rabbit:queue>
    重要
    RabbitMQ broker 不允許使用不匹配的參數(shù)來(lái)聲明隊(duì)列. 例如,如果一個(gè)無(wú)time to live參數(shù)的隊(duì)列已經(jīng)存在,然后你試圖使用 key="x-message-ttl" value="100"進(jìn)行聲明,那么會(huì)拋出一個(gè)異常.

    默認(rèn)情況下,當(dāng)出現(xiàn)異常時(shí), RabbitAdmin 會(huì)立即停止所有聲明的處理過(guò)程;這可能會(huì)導(dǎo)致下游問(wèn)題- 如監(jiān)聽器容器會(huì)初始化失敗,因另外的隊(duì)列沒有聲明.

    這種行為可以通過(guò)在RabbitAdmin上設(shè)置 ignore-declaration-exceptions 為true來(lái)修改. 此選項(xiàng)會(huì)指示RabbitAdmin 記錄異常,并繼續(xù)聲明其它元素.當(dāng)使用Java來(lái)配置RabbitAdmin 時(shí), 此屬性為ignoreDeclarationExceptions.
    這是一個(gè)全局設(shè)置,它將應(yīng)用到所有元素上,如應(yīng)用到queues, exchanges 和bindings這些具有相似屬性的元素上.

    在1.6版本之前, 此屬性只會(huì)在channel上發(fā)生IOExcepton時(shí)才會(huì)起作用- 如當(dāng)目前和期望屬性發(fā)生錯(cuò)配時(shí). 現(xiàn)在, 這個(gè)屬性可在任何異常上起作用,包括TimeoutException 等等.

    此外,任何聲明異常都會(huì)導(dǎo)致發(fā)布DeclarationExceptionEvent, 這是一個(gè)ApplicationEvent ,在上下文中可通過(guò)任何ApplicationListener 消費(fèi). 此事件包含了admin的引用, 正在聲明的元素以及Throwable.

    從1.3版本開始, HeadersExchange 可配置匹配多個(gè)headers; 你也可以指定是否需要必須匹配任何一個(gè)或全部headers:

    <rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
    <rabbit:binding queue="bucket">
    <rabbit:binding-arguments>
    <entrykey="foo"value="bar"/>
    <entrykey="baz"value="qux"/>
    <entrykey="x-match"value="all"/>
    </rabbit:binding-arguments>
    </rabbit:binding>
    </rabbit:bindings>
    </rabbit:headers-exchange>

    從1.6版本開始,Exchanges 可使用internal 標(biāo)志來(lái)配置(默認(rèn)為false) ,當(dāng)然,這樣的Exchange 也可以通過(guò) RabbitAdmin 來(lái)配置(如果在應(yīng)用程序上下文中存在).
    如果對(duì)于交換器來(lái)說(shuō),internal 標(biāo)志為true , RabbitMQ 會(huì)允許客戶端來(lái)使用交換器.這對(duì)于死信交換器來(lái)說(shuō)或交換器到交換器綁定來(lái)說(shuō),是很用的,因?yàn)樵谶@些地方你不想讓發(fā)布者直接使用交換器.

    要看如何使用Java來(lái)配置AMQP基礎(chǔ)設(shè)施,可查看Stock樣例程序,在那里有一個(gè)帶@Configuration 注解的抽象AbstractStockRabbitConfiguration 類,它依次有RabbitClientConfiguration 和 RabbitServerConfiguration 子類. AbstractStockRabbitConfiguration 的代碼展示如下:

    @Configuration
    public abstract class AbstractStockAppRabbitConfiguration {
    
        @Bean
     public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            return connectionFactory;
        }
    
        @Bean
     public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setMessageConverter(jsonMessageConverter());
            configureRabbitTemplate(template);
            return template;
        }
    
        @Bean
     public MessageConverter jsonMessageConverter() {
            returnnew JsonMessageConverter();
        }
    
        @Bean
     public TopicExchange marketDataExchange() {
            returnnew TopicExchange("app.stock.marketdata");
        }
    
        // additional code omitted for brevity
    
    }

    在Stock 程序中,服務(wù)器使用下面的@Configuration注解來(lái)配置:

    @Configuration
    public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {
    
        @Bean
     public Queue stockRequestQueue() {
            returnnew Queue("app.stock.request");
        }
    }

    這是整個(gè)@Configuration 類繼承鏈結(jié)束的地方. 最終結(jié)果是TopicExchange 和隊(duì)列會(huì)在應(yīng)用程序啟動(dòng)時(shí)被聲明.在服務(wù)器配置中,沒有TopicExchange與隊(duì)列的綁定,因?yàn)檫@是在客戶端程序完成的.
    然后stock 請(qǐng)求隊(duì)列是自動(dòng)綁定到AMQP 默認(rèn)交換器上的 - 這種行為是由規(guī)范來(lái)定義的.

    客戶端 @Configuration 類令人關(guān)注的地方展示如下.

    @Configuration
    public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
    
        @Value("${stocks.quote.pattern}")
     private String marketDataRoutingKey;
    
        @Bean
     public Queue marketDataQueue() {
            return amqpAdmin().declareQueue();
        }
    
        /**
         * Binds to the market data exchange.
         * Interested in any stock quotes
         * that match its routing key.
         */@Bean
      public Binding marketDataBinding() {
            return BindingBuilder.bind(
                    marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
        }
    
        // additional code omitted for brevity
    
    }

    客戶端使用AmqpAdmin的declareQueue()方法聲明了另一個(gè)隊(duì)列,并將其綁定到了market data 交換器上(路由鍵模式是通常外部properties文件來(lái)定義的).

    Queues 和Exchanges的Builder API

    當(dāng)使用Java配置時(shí),1.6版本引入了一個(gè)便利的API來(lái)配置Queue 和Exchange 對(duì)象:

    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("foo")
            .autoDelete()
            .exclusive()
            .withArgument("foo", "bar")
            .build();
    }
    
    @Bean
    public Exchange exchange() {
      return ExchangeBuilder.directExchange("foo")
          .autoDelete()
          .internal()
          .withArgument("foo", "bar")
          .build();
    }

    查看 org.springframework.amqp.core.QueueBuilder 

    和 org.springframework.amqp.core.ExchangeBuilder 的JavaDoc來(lái)了解更多信息.

    Declaring Collections of Exchanges, Queues, Bindings

    從1.5版本開始,可以在一個(gè)@Bean聲明多個(gè)條目來(lái)返回集合.

    只有集合中的第一個(gè)元素可認(rèn)為是Declarablea的,并且只有集合中的Declarable 元素會(huì)被處理.(

    Only collections where the first element is a Declarable are considered, and only Declarable elements from such collections are processed.)

    @Configuration
    public static class Config {
    
        @Bean
    public ConnectionFactory cf() {
            returnnew CachingConnectionFactory("localhost");
        }
    
        @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
            returnnew RabbitAdmin(cf);
        }
    
        @Bean
    public DirectExchange e1() {
        	returnnew DirectExchange("e1", false, true);
        }
    
        @Bean
    public Queue q1() {
        	returnnew Queue("q1", false, false, true);
        }
    
        @Bean
    public Binding b1() {
        	return BindingBuilder.bind(q1()).to(e1()).with("k1");
        }
    
        @Bean
    public List<Exchange> es() {
        	return Arrays.<Exchange>asList(
        			new DirectExchange("e2", false, true),
        			new DirectExchange("e3", false, true)
        	);
        }
    
        @Bean
    public List<Queue> qs() {
        	return Arrays.asList(
        			new Queue("q2", false, false, true),
        			new Queue("q3", false, false, true)
        	);
        }
    
        @Bean
    public List<Binding> bs() {
        	return Arrays.asList(
        			new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
        			new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)
        	);
        }
    
        @Bean
    public List<Declarable> ds() {
        	return Arrays.<Declarable>asList(
        			new DirectExchange("e4", false, true),
        			new Queue("q4", false, false, true),
        			new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)
        	);
        }
    
    }

    條件聲明

    默認(rèn)情況下,所有queues, exchanges,和bindings 都可通過(guò)應(yīng)用程序上下文中所有RabbitAdmin 實(shí)例來(lái)聲明(設(shè)置了auto-startup="true").

    重要

    從1.2版本開始,可以有條件地聲明元素.當(dāng)程序連接了多個(gè)brokers,并需要在哪些brokers上聲明特定元素時(shí),特別有用.

    代表這些元素要實(shí)現(xiàn)Declarable 接口,此接口有兩個(gè)方法: shouldDeclare() 和 getDeclaringAdmins()RabbitAdmin 使用這些方法來(lái)確定某個(gè)特定實(shí)例是否應(yīng)該在其Connection上處理聲明.

    這些屬性作為命名空間的屬性是可用的,如下面的例子所示.

    <rabbit:admin id="admin1" connection-factory="CF1" />
    <rabbit:admin id="admin2" connection-factory="CF2" />
    <rabbit:queue id="declaredByBothAdminsImplicitly" />
    <rabbit:queue id="declaredByBothAdmins" declared-by="admin1, admin2" />
    <rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
    <rabbit:queue id="notDeclaredByAny" auto-declare="false" />
    <rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
    <rabbit:bindingkey="foo" queue="bar"/>
    </rabbit:bindings>
    </rabbit:direct-exchange>
    重要
    默認(rèn)情況下,如果沒有提供declared-by(或是空的) auto-declare 屬性則為 true,那么所有RabbitAdmin將聲明對(duì)象(只要admin的auto-startup 屬性為true,默認(rèn)值).

    現(xiàn)樣的,你可以使用基于Java的@Configuration 注解來(lái)達(dá)到同樣的效果.在這個(gè)例子中,組件會(huì)由admin1來(lái)聲明,而不是admin2:

    @Bean
    public RabbitAdmin admin() {
    	RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
    	rabbitAdmin.afterPropertiesSet();
    	return rabbitAdmin;
    }
    
    @Bean
    public RabbitAdmin admin2() {
    	RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
    	rabbitAdmin.afterPropertiesSet();
    	return rabbitAdmin;
    }
    
    @Bean
    public Queue queue() {
    	Queue queue = new Queue("foo");
    	queue.setAdminsThatShouldDeclare(admin());
    	return queue;
    }
    
    @Bean
    public Exchange exchange() {
    	DirectExchange exchange = new DirectExchange("bar");
    	exchange.setAdminsThatShouldDeclare(admin());
    	return exchange;
    }
    
    @Bean
    public Binding binding() {
    	Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    	binding.setAdminsThatShouldDeclare(admin());
    	return binding;
    }

    AnonymousQueue

    一般來(lái)說(shuō),當(dāng)需要一個(gè)獨(dú)特命名,專用的,自動(dòng)刪除隊(duì)列時(shí),建議使用AnonymousQueue 來(lái)代替中間件定義的隊(duì)列名稱(使用 "" 作為隊(duì)列名稱會(huì)導(dǎo)致中間件生成隊(duì)列名稱).

    這是因?yàn)?

    1. 隊(duì)列實(shí)際上是在與broker的連接建立時(shí)聲明的;這在bean創(chuàng)建和包裝之后要很長(zhǎng)時(shí)間;使用這個(gè)隊(duì)列的beans需要知道其名稱.而事實(shí)上,當(dāng)app啟動(dòng)時(shí),broker甚至還沒有運(yùn)行.
    2. 如果與broker的連接因某種原因丟失了,admin會(huì)使用相同的名稱會(huì)重新聲明AnonymousQueue.如果我們使用broker-聲明隊(duì)列,隊(duì)列名稱可能會(huì)改變.

    從1.5.3版本開始,你可通過(guò)AnonymousQueue 來(lái)控制隊(duì)列名稱的格式.

    默認(rèn)情況下,隊(duì)列名稱是UUID的字符串表示; 例如: 07afcfe9-fe77-4983-8645-0061ec61a47a.

    現(xiàn)在,你可以提供一個(gè) AnonymousQueue.NamingStrategy 實(shí)現(xiàn)作為其構(gòu)造器參數(shù):

    @Bean
    public Queue anon1() {
        return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
    }
    
    @Bean
    public Queue anon2() {
        return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
    }

    第一個(gè)會(huì)生成隊(duì)列名稱前輟spring.gen- 其后為UUID base64 的表示,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g. 第二個(gè)會(huì)生成隊(duì)列名稱前輟為foo- 其后為UUID的 base64 表示.

    base64 編碼使用RFC 4648的"URL and Filename Safe Alphabet" ; 刪除了字符(=).

    你可以提供你自己的命名策略, 可以包括隊(duì)列名稱中的其他信息(例如應(yīng)用程序、客戶端主機(jī))。

    從1.6版本開始,當(dāng)使用XML配置時(shí),可以指定命名策略; naming-strategy 屬性出現(xiàn)在<rabbit:queue>元素的屬性中,對(duì)于bean引用來(lái)說(shuō),它們實(shí)現(xiàn)了AnonymousQueue.NamingStrategy.

    <rabbit:queue id="uuidAnon" />
    <rabbit:queue id="springAnon" naming-strategy="springNamer" />
    <rabbit:queue id="customAnon" naming-strategy="customNamer" />
    <bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy" />
    <bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
    </bean>
    第一個(gè)創(chuàng)建了UUID字符串表示的名稱.第二個(gè)創(chuàng)建了類似spring.gen-MRBv9sqISkuCiPfOYfpo4g的名稱. 第三個(gè)創(chuàng)建了類似custom.gen-MRBv9sqISkuCiPfOYfpo4g的名稱.

    當(dāng)然,你可以提供你自己的命名策略bean.

    3.1.11 延遲的消息交換器

    1.6版本引入了 Delayed Message Exchange Plugin支持.

    該插件目前被標(biāo)記為實(shí)驗(yàn)性質(zhì),但可用已超過(guò)一年(在寫作的時(shí)間)。如果插件的變化是必要的,我們將盡快添加支持這樣的變化。因此,這種在Spring AMQP支持同樣也應(yīng)考慮為實(shí)驗(yàn)性質(zhì).這個(gè)功能在RabbitMQ 3.6.0版本和0.0.1插件版本中經(jīng)過(guò)測(cè)試。

    要使用RabbitAdmin 來(lái)聲明一個(gè)延遲交換器,只需要在交換器上簡(jiǎn)單地設(shè)置delayed 屬性為true. RabbitAdmin 會(huì)使用交換器類型(DirectFanout 等)來(lái)設(shè)置x-delayed-type 參數(shù),并使用x-delayed-message來(lái)聲明交換器.

    當(dāng)使用XML來(lái)配置交換器beans時(shí),delayed 屬性 (默認(rèn)為false)是可用的.

    <rabbit:topic-exchange name="topic" delayed="true" />

    要發(fā)送延遲消息,只需要通過(guò)MessageProperties設(shè)置x-delay header:

    MessageProperties properties = new MessageProperties();
    properties.setXDelay(15000);
    template.send(exchange, routingKey,
            MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());

    rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
    
        @Override
    public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setXDelay(15000);
            return message;
        }
    
    });

    要檢查消息是否是延遲的,可調(diào)用MessagePropertiesgetReceivedDelay()它是一個(gè)獨(dú)立的屬性,以避免從一個(gè)輸入消息意外的傳播到一個(gè)輸出消息。

    3.1.12 RabbitMQ REST API

    當(dāng)啟用了管理插件時(shí),RabbitMQ 服務(wù)器公開了 REST API 來(lái)監(jiān)控和配置broker. 

    現(xiàn)在提供了 Java Binding for the API.一般來(lái)說(shuō),你可以直接使用API,但提供了便利的包裝器來(lái)使用熟悉的Spring AMQP QueueExchange, 和 Binding 域?qū)ο?
    當(dāng)直接使用 com.rabbitmq.http.client.Client API  (分別使用QueueInfoExchangeInfo, 和BindingInfo),那些對(duì)象的更多信息將可用. 下面是RabbitManagementTemplate上的可用操作:

    public interface AmqpManagementOperations {
    
    	void addExchange(Exchange exchange);
    
    	void addExchange(String vhost, Exchange exchange);
    
    	void purgeQueue(Queue queue);
    
    	void purgeQueue(String vhost, Queue queue);
    
    	void deleteQueue(Queue queue);
    
    	void deleteQueue(String vhost, Queue queue);
    
    	Queue getQueue(String name);
    
    	Queue getQueue(String vhost, String name);
    
    	List<Queue> getQueues();
    
    	List<Queue> getQueues(String vhost);
    
    	void addQueue(Queue queue);
    
    	void addQueue(String vhost, Queue queue);
    
    	void deleteExchange(Exchange exchange);
    
    	void deleteExchange(String vhost, Exchange exchange);
    
    	Exchange getExchange(String name);
    
    	Exchange getExchange(String vhost, String name);
    
    	List<Exchange> getExchanges();
    
    	List<Exchange> getExchanges(String vhost);
    
    	List<Binding> getBindings();
    
    	List<Binding> getBindings(String vhost);
    
    	List<Binding> getBindingsForExchange(String vhost, String exchange);
    
    }

    參考javadocs 來(lái)了解更多信息.

    3.1.13 異常處理

    RabbitMQ Java client的許多操作會(huì)拋出受查異常. 例如,有許多可能拋出IOExceptions的地方. RabbitTemplate, SimpleMessageListenerContainer, 和其它Spring AMQP 組件會(huì)捕獲這些異常,并將它們轉(zhuǎn)換為運(yùn)行時(shí)層次的異常.
    這些是定義在
    org.springframework.amqp 包中的,且 AmqpException 是層次結(jié)構(gòu)的基礎(chǔ).

    當(dāng)監(jiān)聽器拋出異常時(shí),它會(huì)包裝在一個(gè) ListenerExecutionFailedException 中,正常情況下消息會(huì)被拒絕并由broker重新排隊(duì).將defaultRequeueRejected 設(shè)置為false 可導(dǎo)致消息丟棄(或路由到死信交換器中). 

    正如 the section called “Message Listeners and the Asynchronous Case”討論的,監(jiān)聽器可拋出 AmqpRejectAndDontRequeueException 來(lái)有條件地控制這種行為。

    然而,有一種類型的錯(cuò)誤,監(jiān)聽器無(wú)法控制其行為. 當(dāng)遇到消息不能轉(zhuǎn)換時(shí)(例如,無(wú)效的content_encoding 頭),那么消息在到達(dá)用戶代碼前會(huì)拋出一些異常.當(dāng)設(shè)置 defaultRequeueRejected 為 true (默認(rèn)),這樣的消息可能會(huì)一遍又一遍地重新投遞.
    在1.3.2版本之前,用戶需要編寫定制ErrorHandler, 正如Section 3.1.13, “Exception Handling” 描述的內(nèi)容來(lái)避免這種情況.

    從1.3.2版本開始,默認(rèn)的ErrorHandler 是 ConditionalRejectingErrorHandler ,它將拒絕那些失敗且不可恢復(fù)的消息 (不會(huì)重新排隊(duì)):

    • o.s.amqp...MessageConversionException
    • o.s.messaging...MessageConversionException
    • o.s.messaging...MethodArgumentNotValidException
    • o.s.messaging...MethodArgumentTypeMismatchException

    第一個(gè)是在使用MessageConverter轉(zhuǎn)換傳入消息負(fù)荷時(shí)拋出的.
    第二個(gè)是當(dāng)映射到@RabbitListener方法時(shí),轉(zhuǎn)換服務(wù)需要其它轉(zhuǎn)換拋出的.
    第三個(gè)是在監(jiān)聽器上使用了驗(yàn)證(如.@Valid),且驗(yàn)證失敗時(shí)拋出的.
    第四個(gè)是對(duì)于目標(biāo)方法傳入消息類型轉(zhuǎn)換失敗拋出的.例如,參數(shù)聲明為Message<Foo> ,但收到的是Message<Bar>

    錯(cuò)誤處理器的實(shí)例可使用FatalExceptionStrategy 來(lái)配置,因?yàn)橛脩艨梢蕴峁┧鼈兊囊?guī)則來(lái)有條件的拒絕消息,如. 來(lái)自 Spring Retry (the section called “Message Listeners and the Asynchronous Case”)中的BinaryExceptionClassifier代理實(shí)現(xiàn).
    此外, ListenerExecutionFailedException 現(xiàn)在有一個(gè)可用于決策的failedMessage 屬性.如果FatalExceptionStrategy.isFatal() 方法返回true,錯(cuò)誤處理器會(huì)拋出AmqpRejectAndDontRequeueException.
    默認(rèn)FatalExceptionStrategy 會(huì)記錄warning信息.

    3.1.14 事務(wù)(Transactions)

    介紹

    Spring Rabbit 框架支持在同步和異步使用中使用不同語(yǔ)義(這一點(diǎn)對(duì)于現(xiàn)有Spring事務(wù)的用戶是很熟悉的)來(lái)支持自動(dòng)事務(wù)管理. 它做了很多,不是常見消息模式能輕易實(shí)現(xiàn)的.

    有兩種方法可用來(lái)向框架發(fā)出期望事務(wù)語(yǔ)義的信號(hào).在RabbitTemplate 和 SimpleMessageListenerContainer 中,這里有一個(gè)channelTransacted 標(biāo)記,如果它為true,就會(huì)告知框架使用事務(wù)通道,并根據(jù)結(jié)果使用提交或回滾來(lái)結(jié)束所有操作,出現(xiàn)異常時(shí)則發(fā)出回滾信號(hào). 

    另一個(gè)提供的信號(hào)是Spring的PlatformTransactionManager實(shí)現(xiàn)(作為正在進(jìn)行的操作的上下文)外部事務(wù) 
    當(dāng)框架發(fā)送或接收消息時(shí),如果過(guò)程中已經(jīng)存在一個(gè)事
    務(wù),且channelTransacted 標(biāo)記為true, 那么當(dāng)前消息事務(wù)的提交或回滾操作會(huì)延遲直到在當(dāng)前事務(wù)結(jié)束.如果channelTransacted 標(biāo)記為false,那么消息操作是不會(huì)應(yīng)用事務(wù)語(yǔ)義(它是自動(dòng)應(yīng)答的).

    channelTransacted 標(biāo)記是一個(gè)配置時(shí)設(shè)置:它只在AMQP組件聲明時(shí)執(zhí)行一次,通常在應(yīng)用程序啟動(dòng)時(shí).原則上,外部事務(wù)更加動(dòng)態(tài)化,因?yàn)樾枰谶\(yùn)行時(shí)根據(jù)當(dāng)前線程狀態(tài)來(lái)響應(yīng),當(dāng)事務(wù)分層到應(yīng)用程序上時(shí),原則上來(lái)說(shuō)它通常也是一個(gè)配置設(shè)置.

    對(duì)于使用RabbitTemplate 的同步使用,外部事務(wù)是由調(diào)用者提供的, 要么是聲明的,要么是強(qiáng)制的(日常Spring事務(wù)模式). 

    下面是聲明方法的一個(gè)例子(通常選擇這個(gè),因?yàn)樗?/font>非侵入的), 下面的例子中,模板已經(jīng)配置了channelTransacted=true:

    @Transactional
    public void doSomething() {
        String incoming = rabbitTemplate.receiveAndConvert();
        // do some more database processing...
        String outgoing = processInDatabaseAndExtractReply(incoming);
        rabbitTemplate.convertAndSend(outgoing);
    }

    收到字符負(fù)荷,轉(zhuǎn)換,并以消息體發(fā)送到@Transactional標(biāo)記的方法中,因此如果數(shù)據(jù)處理因異常失敗了,傳入消息將返回到broker,并且輸出消息不會(huì)被發(fā)送.
    在事務(wù)方法鏈中,這適用于
    RabbitTemplate 中的所有操作(除非Channel 較早地直接控制了提交事務(wù)).

    對(duì)于SimpleMessageListenerContainer 的異步使用情況,如果需要外部事務(wù),當(dāng)設(shè)置了監(jiān)聽器時(shí),必須由容器來(lái)發(fā)出請(qǐng)求.
    為了表示需要外部事務(wù),當(dāng)配置時(shí),用戶為容器提供了PlatformTransactionManager 實(shí)現(xiàn).例如:

    @Configuration
    public class ExampleExternalTransactionAmqpConfiguration {
    
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(rabbitConnectionFactory());
            container.setTransactionManager(transactionManager());
            container.setChannelTransacted(true);
            container.setQueueName("some.queue");
            container.setMessageListener(exampleListener());
            return container;
        }
    
    }

    在上面的例子中,事務(wù)管理器是通過(guò)其它bean中注入添加的(未顯示),并且channelTransacted 也設(shè)置為了true.其效果是如果監(jiān)聽器因異常失敗了,那么事務(wù)將回滾,消息也會(huì)退回到broker中.
    明顯地,如果事務(wù)提交失敗(如.數(shù)據(jù)庫(kù)約束錯(cuò)誤,或通過(guò)問(wèn)題),那么AMQP 事務(wù)也要回滾,且消息也會(huì)回退到broker中.
    有時(shí)候,這被稱為最好努力1階段提交(Best Efforts 1 Phase Commit),它是可靠消息非常強(qiáng)大的模式.
    如果在上面的例子中將channelTransacted標(biāo)志設(shè)為false(默認(rèn)為false),那么外部事務(wù)仍會(huì)提供給監(jiān)聽器,但所有消息操作都是自動(dòng)應(yīng)答的, 因此其效果是即使發(fā)生了業(yè)務(wù)操作,也會(huì)提供消息操作.

    關(guān)于接收消息的回滾說(shuō)明

    AMQP 事務(wù)只適用于發(fā)送應(yīng)答給broker, 所以當(dāng)有 Spring 事務(wù)回滾且又收到了消息時(shí),Spring AMQP做的不僅要回滾事務(wù),還要手動(dòng)拒絕消息.
    消息上的拒絕操作獨(dú)立于事務(wù),依賴于defaultRequeueRejected 屬性(默認(rèn)為true). 更多關(guān)于拒絕失敗消息的詳情,請(qǐng)參考the section called “Message Listeners and the Asynchronous Case”.

    關(guān)于RabbitMQ 事務(wù)及其局限性的更多信息,參考RabbitMQ Broker Semantics.

    重要

    在 RabbitMQ 2.7.0前, 這樣的消息(當(dāng)通道關(guān)閉或中斷時(shí)未應(yīng)的消息)會(huì)回到隊(duì)列中,從2.7.0, 拒絕消息會(huì)跑到隊(duì)列前邊,與JMS回滾消息方式類似.

    使用RabbitTransactionManager

    RabbitTransactionManager 是執(zhí)行同步,外部事務(wù)Rabbit操作的另一種選擇.這個(gè)事務(wù)管理器是PlatformTransactionManager 接口的實(shí)現(xiàn)類,應(yīng)該在單個(gè)Rabbit ConnectionFactory中使用.

    重要

    此策略不能提供XA事務(wù),比如,要在消息和數(shù)據(jù)庫(kù)之間共享事務(wù).

    應(yīng)用代碼需要通過(guò)ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)來(lái)獲取事務(wù)性Rabbit資源而不是使用Connection.createChannel() 調(diào)用.
    當(dāng)使用Spring AMQP的 RabbitTemplate時(shí), 
    它會(huì)自動(dòng)檢測(cè)線程綁定通道和自動(dòng)參與事務(wù)。

    在 Java 配置中,你可以使用下面的代碼來(lái)設(shè)置一個(gè)新的RabbitTransactionManager:

    @Bean
    public RabbitTransactionManager rabbitTransactionManager() {
        returnnew RabbitTransactionManager(connectionFactory);
    }

    如果你喜歡使用XML 配置,可以像下面進(jìn)行聲明:

    <bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <propertyname="connectionFactory" ref="connectionFactory"/>
    </bean>

    3.1.15 消息監(jiān)聽器容器配置

    有相當(dāng)多的配置SimpleMessageListenerContainer 相關(guān)事務(wù)和服務(wù)質(zhì)量的選項(xiàng),它們之間可以互相交互.當(dāng)使用命名空間來(lái)配置<rabbit:listener-container/>時(shí),

    下表顯示了容器屬性名稱和它們等價(jià)的屬性名稱(在括號(hào)中).

    未被命名空間暴露的屬性,以`N/A`表示.

    posted on 2016-08-13 16:07 胡小軍 閱讀(4967) 評(píng)論(0)  編輯  收藏 所屬分類: RabbitMQ
    主站蜘蛛池模板: 在线a亚洲v天堂网2018| 国产男女猛烈无遮挡免费网站 | 日本高清免费中文字幕不卡| 色偷偷亚洲第一综合网| 亚洲综合色婷婷七月丁香| 亚洲精品视频免费看| 成人婷婷网色偷偷亚洲男人的天堂| 奇米影视亚洲春色| 曰批全过程免费视频播放网站| 成人精品国产亚洲欧洲| 亚洲精品国精品久久99热一| 成人毛片18女人毛片免费96 | 大胆亚洲人体视频| 91精品免费观看| 亚洲一区二区三区国产精华液| 国产色婷婷精品免费视频| 国产福利免费视频| 亚洲综合激情九月婷婷 | 国产偷国产偷亚洲高清日韩| 国产成人精品一区二区三区免费| 亚洲精品在线免费观看| 免费观看的a级毛片的网站| 亚洲免费日韩无码系列| 亚洲精品天天影视综合网| 免费看片免费播放| 国产激情久久久久影院老熟女免费| 亚洲AV之男人的天堂| 久久国产高潮流白浆免费观看| 亚洲日韩国产精品无码av| 黑人大战亚洲人精品一区| 成年私人影院免费视频网站| 成全视频高清免费观看电视剧| 精品亚洲国产成人av| 亚洲最新中文字幕| 亚洲AV无码精品无码麻豆| 亚洲区不卡顿区在线观看| 日本高清免费不卡在线| 久久久久久久免费视频| 日本免费高清视频| 成人免费一区二区三区| 免费手机在线看片|