介紹
AMQP 規(guī)范描述了協(xié)議是如何用于broker中隊(duì)列,交換器以及綁定上的.這些操作是從0.8規(guī)范中移植的,更高的存在于org.springframework.amqp.core包中的AmqpAdmin 接口中.
那個(gè)接口的RabbitMQ 實(shí)現(xiàn)是RabbitAdmin,它位于org.springframework.amqp.rabbit.core 包.
AmqpAdmin接口是基于Spring AMQP 域抽象,展示如下:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
getQueueProperties()
方法會(huì)返回關(guān)于隊(duì)列的的一些有限信息(消息個(gè)數(shù)和消費(fèi)者數(shù)目). 屬性返回的keys像RabbitTemplate
(QUEUE_NAME
, QUEUE_MESSAGE_COUNT
, QUEUE_CONSUMER_COUNT
)中的常量一樣是可用的.
RabbitMQ REST API 提供了更多關(guān)于 QueueInfo
對(duì)象的信息.
無(wú)參 declareQueue()
方法在broker上定義了一個(gè)隊(duì)列,其名稱是自動(dòng)生成的. 自動(dòng)生成隊(duì)列的其它屬性是exclusive=true
, autoDelete=true
, and durable=false
.
declareQueue(Queue queue)
方法接受一個(gè) Queue
對(duì)象,并且返回聲明隊(duì)列的名稱.如果提供的隊(duì)列名稱是空字符串,broker 使用生成的名稱來(lái)聲明隊(duì)列再將名稱返回給調(diào)用者. Queue
對(duì)象本身是不會(huì)變化的.
這種功能只能用于編程下直接調(diào)用RabbitAdmin
. 它不支持在應(yīng)用程序上下文中由admin來(lái)定義隊(duì)列的自動(dòng)聲明.
與此形成鮮明對(duì)比的是,AnonymousQueue,
框架會(huì)為其生成唯一名稱(UUID),durable為false,exclusive
, autoDelete
為true的匿名隊(duì)列
. <rabbit:queue/>
帶空的或缺失的name
屬性總會(huì)創(chuàng)建 一個(gè)AnonymousQueue
.
參考the section called “AnonymousQueue” 來(lái)理解為什么 AnonymousQueue
會(huì)優(yōu)先選擇broker生成隊(duì)列名稱,以及如何來(lái)控制名稱格式. 聲明隊(duì)列必須有固定的名稱,因?yàn)樗鼈兛赡軙?huì)上下文的其它地方引用,例如,在監(jiān)聽器中:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
參考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”.
此接口的RabbitMQ實(shí)現(xiàn)是RabbitAdmin,當(dāng)用Spring XML配置時(shí),看起來(lái)像下面這樣:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
當(dāng)CachingConnectionFactory
緩存模式是CHANNEL
時(shí)(默認(rèn)的), RabbitAdmin
實(shí)現(xiàn)會(huì)在同一個(gè)ApplicationContext中自動(dòng)延遲聲明 Queues
,Exchanges
和 Bindings
.
只要Connection打開了與Broker的連接,這些組件就會(huì)被聲明.有一些命名空間特性可以使這些變得便利,如,在Stocks 樣例程序中有:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue"pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在上面的例子中,我們使用匿名隊(duì)列(實(shí)際上由框架內(nèi)部生成,而非由broker生成的隊(duì)列),并用ID進(jìn)行了指定.我們也可以使用明確的名稱來(lái)聲明隊(duì)列,也作為上下文中bean定義的標(biāo)識(shí)符.如.
<rabbit:queue name="stocks.trade.queue"/>
重要
你可以提供id 和 name 屬性.這允許你獨(dú)立于隊(duì)列名稱通過(guò)id來(lái)指定隊(duì)列.它允許使用標(biāo)準(zhǔn)的Spring 屬性,如屬性占位符和隊(duì)列名稱的SpEL 表達(dá)式; 當(dāng)使用名稱來(lái)作為標(biāo)識(shí)符,這些特性是不可用的.
隊(duì)列也可以使用其它的參數(shù)進(jìn)行配置,例如x-message-ttl 或 x-ha-policy.通過(guò)命名空間支持,它們可以通過(guò)<rabbit:queue-arguments>元素以參數(shù)名/參數(shù)值的MAP形式來(lái)提供 .
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
默認(rèn)情況下,參數(shù)假設(shè)為字符串.對(duì)于其它類型的參數(shù),需要提供類型.
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
當(dāng)提供混合類型的參數(shù)時(shí),可為每個(gè)entry元素提供type:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
在Spring Framework 3.2或以后,聲明起來(lái)更加簡(jiǎn)潔:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
重要
RabbitMQ broker 不允許使用不匹配的參數(shù)來(lái)聲明隊(duì)列. 例如,如果一個(gè)無(wú)time to live參數(shù)的隊(duì)列已經(jīng)存在,然后你試圖使用 key="x-message-ttl" value="100"進(jìn)行聲明
,那么會(huì)拋出一個(gè)異常.
默認(rèn)情況下,當(dāng)出現(xiàn)異常時(shí), RabbitAdmin
會(huì)立即停止所有聲明的處理過(guò)程;這可能會(huì)導(dǎo)致下游問(wèn)題- 如監(jiān)聽器容器會(huì)初始化失敗,因另外的隊(duì)列沒有聲明.
這種行為可以通過(guò)在RabbitAdmin上設(shè)置 ignore-declaration-exceptions
為true來(lái)修改. 此選項(xiàng)會(huì)指示RabbitAdmin
記錄異常,并繼續(xù)聲明其它元素.當(dāng)使用Java來(lái)配置RabbitAdmin
時(shí), 此屬性為ignoreDeclarationExceptions
.
這是一個(gè)全局設(shè)置,它將應(yīng)用到所有元素上,如應(yīng)用到queues, exchanges 和bindings這些具有相似屬性的元素上.
在1.6版本之前, 此屬性只會(huì)在channel上發(fā)生IOExcepton時(shí)才會(huì)起作用- 如當(dāng)目前和期望屬性發(fā)生錯(cuò)配時(shí). 現(xiàn)在, 這個(gè)屬性可在任何異常上起作用,包括TimeoutException
等等.
此外,任何聲明異常都會(huì)導(dǎo)致發(fā)布DeclarationExceptionEvent
, 這是一個(gè)ApplicationEvent
,在上下文中可通過(guò)任何ApplicationListener
消費(fèi). 此事件包含了admin的引用, 正在聲明的元素以及Throwable
.
從1.3版本開始, HeadersExchange
可配置匹配多個(gè)headers; 你也可以指定是否需要必須匹配任何一個(gè)或全部headers:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entrykey="foo"value="bar"/>
<entrykey="baz"value="qux"/>
<entrykey="x-match"value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
從1.6版本開始,Exchanges
可使用internal
標(biāo)志來(lái)配置(默認(rèn)為false
) ,當(dāng)然,這樣的Exchange
也可以通過(guò) RabbitAdmin
來(lái)配置(如果在應(yīng)用程序上下文中存在).
如果對(duì)于交換器來(lái)說(shuō),internal
標(biāo)志為true
, RabbitMQ 會(huì)允許客戶端來(lái)使用交換器.這對(duì)于死信交換器來(lái)說(shuō)或交換器到交換器綁定來(lái)說(shuō),是很用的,因?yàn)樵谶@些地方你不想讓發(fā)布者直接使用交換器.
要看如何使用Java來(lái)配置AMQP基礎(chǔ)設(shè)施,可查看Stock樣例程序,在那里有一個(gè)帶@Configuration
注解的抽象AbstractStockRabbitConfiguration
類,它依次有RabbitClientConfiguration
和 RabbitServerConfiguration
子類. AbstractStockRabbitConfiguration
的代碼展示如下:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
returnnew JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
returnnew TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在Stock 程序中,服務(wù)器使用下面的@Configuration注解來(lái)配置:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
returnnew Queue("app.stock.request");
}
}
這是整個(gè)@Configuration 類繼承鏈結(jié)束的地方. 最終結(jié)果是TopicExchange 和隊(duì)列會(huì)在應(yīng)用程序啟動(dòng)時(shí)被聲明.在服務(wù)器配置中,沒有TopicExchange與隊(duì)列的綁定,因?yàn)檫@是在客戶端程序完成的.
然后stock 請(qǐng)求隊(duì)列是自動(dòng)綁定到AMQP 默認(rèn)交換器上的 - 這種行為是由規(guī)范來(lái)定義的.
客戶端 @Configuration 類令人關(guān)注的地方展示如下.
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客戶端使用AmqpAdmin的declareQueue()方法聲明了另一個(gè)隊(duì)列,并將其綁定到了market data 交換器上(路由鍵模式是通常外部properties文件來(lái)定義的).
Queues 和Exchanges的Builder API
當(dāng)使用Java配置時(shí),1.6版本引入了一個(gè)便利的API來(lái)配置Queue
和Exchange
對(duì)象:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
查看 org.springframework.amqp.core.QueueBuilder
和 org.springframework.amqp.core.ExchangeBuilder
的JavaDoc來(lái)了解更多信息.
Declaring Collections of Exchanges, Queues, Bindings
從1.5版本開始,可以在一個(gè)@Bean聲明多個(gè)條目來(lái)返回集合.
只有集合中的第一個(gè)元素可認(rèn)為是Declarablea的,并且只有集合中的Declarable
元素會(huì)被處理.(
Only collections where the first element is a Declarable
are considered, and only Declarable
elements from such collections are processed.)
@Configuration
public static class Config {
@Bean
public ConnectionFactory cf() {
returnnew CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
returnnew RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
returnnew DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
returnnew Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public List<Exchange> es() {
return Arrays.<Exchange>asList(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true)
);
}
@Bean
public List<Queue> qs() {
return Arrays.asList(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true)
);
}
@Bean
public List<Binding> bs() {
return Arrays.asList(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)
);
}
@Bean
public List<Declarable> ds() {
return Arrays.<Declarable>asList(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)
);
}
}
條件聲明
默認(rèn)情況下,所有queues, exchanges,和bindings 都可通過(guò)應(yīng)用程序上下文中所有RabbitAdmin
實(shí)例來(lái)聲明(設(shè)置了auto-startup="true"
).
重要
從1.2版本開始,可以有條件地聲明元素.當(dāng)程序連接了多個(gè)brokers,并需要在哪些brokers上聲明特定元素時(shí),特別有用.
代表這些元素要實(shí)現(xiàn)Declarable
接口,此接口有兩個(gè)方法: shouldDeclare()
和 getDeclaringAdmins()
. RabbitAdmin
使用這些方法來(lái)確定某個(gè)特定實(shí)例是否應(yīng)該在其Connection上處理聲明.
這些屬性作為命名空間的屬性是可用的,如下面的例子所示.
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:queue id="declaredByBothAdminsImplicitly" />
<rabbit:queue id="declaredByBothAdmins" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAny" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:bindingkey="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
重要
默認(rèn)情況下,如果沒有提供declared-by(或是空的), auto-declare
屬性則為 true,那么所有
RabbitAdmin將聲明對(duì)象
(只要admin的auto-startup
屬性為true,默認(rèn)值).
現(xiàn)樣的,你可以使用基于Java的@Configuration
注解來(lái)達(dá)到同樣的效果.在這個(gè)例子中,組件會(huì)由admin1來(lái)聲明,而不是admin2
:
@Bean
public RabbitAdmin admin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
public RabbitAdmin admin2() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin());
return binding;
}
AnonymousQueue
一般來(lái)說(shuō),當(dāng)需要一個(gè)獨(dú)特命名,專用的,自動(dòng)刪除隊(duì)列時(shí),建議使用AnonymousQueue
來(lái)代替中間件定義的隊(duì)列名稱(使用 ""
作為隊(duì)列名稱會(huì)導(dǎo)致中間件生成隊(duì)列名稱).
這是因?yàn)?
- 隊(duì)列實(shí)際上是在與broker的連接建立時(shí)聲明的;這在bean創(chuàng)建和包裝之后要很長(zhǎng)時(shí)間;使用這個(gè)隊(duì)列的beans需要知道其名稱.而事實(shí)上,當(dāng)app啟動(dòng)時(shí),broker甚至還沒有運(yùn)行.
- 如果與broker的連接因某種原因丟失了,admin會(huì)使用相同的名稱會(huì)重新聲明
AnonymousQueue
.如果我們使用broker-聲明隊(duì)列,隊(duì)列名稱可能會(huì)改變.
從1.5.3版本開始,你可通過(guò)AnonymousQueue 來(lái)控制隊(duì)列名稱的格式.
默認(rèn)情況下,隊(duì)列名稱是UUID的字符串表示; 例如: 07afcfe9-fe77-4983-8645-0061ec61a47a
.
現(xiàn)在,你可以提供一個(gè) AnonymousQueue.NamingStrategy
實(shí)現(xiàn)作為其構(gòu)造器參數(shù):
@Bean
public Queue anon1() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
}
第一個(gè)會(huì)生成隊(duì)列名稱前輟spring.gen-
其后為UUID base64 的表示,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
. 第二個(gè)會(huì)生成隊(duì)列名稱前輟為foo-
其后為UUID的 base64 表示.
base64 編碼使用RFC 4648的"URL and Filename Safe Alphabet" ; 刪除了字符(=
).
你可以提供你自己的命名策略, 可以包括隊(duì)列名稱中的其他信息(例如應(yīng)用程序、客戶端主機(jī))。
從1.6版本開始,當(dāng)使用XML配置時(shí),可以指定命名策略; naming-strategy
屬性出現(xiàn)在<rabbit:queue>元素的屬性中,對(duì)于bean引用來(lái)說(shuō),它們實(shí)現(xiàn)了
AnonymousQueue.NamingStrategy
.
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="springNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一個(gè)創(chuàng)建了UUID字符串表示的名稱.第二個(gè)創(chuàng)建了類似spring.gen-MRBv9sqISkuCiPfOYfpo4g的名稱
. 第三個(gè)創(chuàng)建了類似custom.gen-MRBv9sqISkuCiPfOYfpo4g
的名稱.
當(dāng)然,你可以提供你自己的命名策略bean.
3.1.11 延遲的消息交換器
1.6版本引入了 Delayed Message Exchange Plugin支持.
該插件目前被標(biāo)記為實(shí)驗(yàn)性質(zhì),但可用已超過(guò)一年(在寫作的時(shí)間)。如果插件的變化是必要的,我們將盡快添加支持這樣的變化。因此,這種在Spring AMQP支持同樣也應(yīng)考慮為實(shí)驗(yàn)性質(zhì).這個(gè)功能在RabbitMQ 3.6.0版本和0.0.1插件版本中經(jīng)過(guò)測(cè)試。
要使用RabbitAdmin
來(lái)聲明一個(gè)延遲交換器,只需要在交換器上簡(jiǎn)單地設(shè)置delayed
屬性為true. RabbitAdmin
會(huì)使用交換器類型(Direct
, Fanout
等)來(lái)設(shè)置x-delayed-type
參數(shù),并使用x-delayed-message來(lái)聲明交換器
.
當(dāng)使用XML來(lái)配置交換器beans時(shí),delayed
屬性 (默認(rèn)為false
)是可用的.
<rabbit:topic-exchange name="topic" delayed="true" />
要發(fā)送延遲消息,只需要通過(guò)MessageProperties設(shè)置x-delay
header:
MessageProperties properties = new MessageProperties();
properties.setXDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
或
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setXDelay(15000);
return message;
}
});
要檢查消息是否是延遲的,可調(diào)用MessageProperties的getReceivedDelay()
. 它是一個(gè)獨(dú)立的屬性,以避免從一個(gè)輸入消息意外的傳播到一個(gè)輸出消息。
3.1.12 RabbitMQ REST API
當(dāng)啟用了管理插件時(shí),RabbitMQ 服務(wù)器公開了 REST API 來(lái)監(jiān)控和配置broker.
現(xiàn)在提供了 Java Binding for the API.一般來(lái)說(shuō),你可以直接使用API,但提供了便利的包裝器來(lái)使用熟悉的Spring AMQP Queue
, Exchange
, 和 Binding
域?qū)ο?
當(dāng)直接使用 com.rabbitmq.http.client.Client
API (分別使用QueueInfo
, ExchangeInfo
, 和BindingInfo
),那些對(duì)象的更多信息將可用. 下面是RabbitManagementTemplate上的可用操作
:
public interface AmqpManagementOperations {
void addExchange(Exchange exchange);
void addExchange(String vhost, Exchange exchange);
void purgeQueue(Queue queue);
void purgeQueue(String vhost, Queue queue);
void deleteQueue(Queue queue);
void deleteQueue(String vhost, Queue queue);
Queue getQueue(String name);
Queue getQueue(String vhost, String name);
List<Queue> getQueues();
List<Queue> getQueues(String vhost);
void addQueue(Queue queue);
void addQueue(String vhost, Queue queue);
void deleteExchange(Exchange exchange);
void deleteExchange(String vhost, Exchange exchange);
Exchange getExchange(String name);
Exchange getExchange(String vhost, String name);
List<Exchange> getExchanges();
List<Exchange> getExchanges(String vhost);
List<Binding> getBindings();
List<Binding> getBindings(String vhost);
List<Binding> getBindingsForExchange(String vhost, String exchange);
}
參考javadocs 來(lái)了解更多信息.
3.1.13 異常處理
RabbitMQ Java client的許多操作會(huì)拋出受查異常. 例如,有許多可能拋出IOExceptions的地方. RabbitTemplate, SimpleMessageListenerContainer, 和其它Spring AMQP 組件會(huì)捕獲這些異常,并將它們轉(zhuǎn)換為運(yùn)行時(shí)層次的異常.
這些是定義在org.springframework.amqp 包中的,且 AmqpException 是層次結(jié)構(gòu)的基礎(chǔ).
當(dāng)監(jiān)聽器拋出異常時(shí),它會(huì)包裝在一個(gè) ListenerExecutionFailedException
中,正常情況下消息會(huì)被拒絕并由broker重新排隊(duì).將defaultRequeueRejected
設(shè)置為false 可導(dǎo)致消息丟棄(或路由到死信交換器中).
正如 the section called “Message Listeners and the Asynchronous Case”討論的,監(jiān)聽器可拋出 AmqpRejectAndDontRequeueException
來(lái)有條件地控制這種行為。
然而,有一種類型的錯(cuò)誤,監(jiān)聽器無(wú)法控制其行為. 當(dāng)遇到消息不能轉(zhuǎn)換時(shí)(例如,無(wú)效的content_encoding
頭),那么消息在到達(dá)用戶代碼前會(huì)拋出一些異常.當(dāng)設(shè)置 defaultRequeueRejected
為 true
(默認(rèn)),這樣的消息可能會(huì)一遍又一遍地重新投遞.
在1.3.2版本之前,用戶需要編寫定制ErrorHandler
, 正如Section 3.1.13, “Exception Handling” 描述的內(nèi)容來(lái)避免這種情況.
從1.3.2版本開始,默認(rèn)的ErrorHandler
是 ConditionalRejectingErrorHandler
,它將拒絕那些失敗且不可恢復(fù)的消息 (不會(huì)重新排隊(duì)):
o.s.amqp...MessageConversionException
o.s.messaging...MessageConversionException
o.s.messaging...MethodArgumentNotValidException
o.s.messaging...MethodArgumentTypeMismatchException
第一個(gè)是在使用MessageConverter轉(zhuǎn)換傳入消息負(fù)荷時(shí)拋出的.
第二個(gè)是當(dāng)映射到@RabbitListener方法時(shí),轉(zhuǎn)換服務(wù)需要其它轉(zhuǎn)換拋出的.
第三個(gè)是在監(jiān)聽器上使用了驗(yàn)證(如.@Valid),且驗(yàn)證失敗時(shí)拋出的.
第四個(gè)是對(duì)于目標(biāo)方法傳入消息類型轉(zhuǎn)換失敗拋出的.例如,參數(shù)聲明為Message<Foo>
,但收到的是Message<Bar>
.
錯(cuò)誤處理器的實(shí)例可使用FatalExceptionStrategy
來(lái)配置,因?yàn)橛脩艨梢蕴峁┧鼈兊囊?guī)則來(lái)有條件的拒絕消息,如. 來(lái)自 Spring Retry (the section called “Message Listeners and the Asynchronous Case”)中的BinaryExceptionClassifier代理實(shí)現(xiàn).
此外, ListenerExecutionFailedException
現(xiàn)在有一個(gè)可用于決策的failedMessage
屬性.如果FatalExceptionStrategy.isFatal()
方法返回true,錯(cuò)誤處理器會(huì)拋出AmqpRejectAndDontRequeueException
.
默認(rèn)FatalExceptionStrategy
會(huì)記錄warning信息.
3.1.14 事務(wù)(Transactions)
介紹
Spring Rabbit 框架支持在同步和異步使用中使用不同語(yǔ)義(這一點(diǎn)對(duì)于現(xiàn)有Spring事務(wù)的用戶是很熟悉的)來(lái)支持自動(dòng)事務(wù)管理. 它做了很多,不是常見消息模式能輕易實(shí)現(xiàn)的.
有兩種方法可用來(lái)向框架發(fā)出期望事務(wù)語(yǔ)義的信號(hào).在RabbitTemplate
和 SimpleMessageListenerContainer
中,這里有一個(gè)channelTransacted
標(biāo)記,如果它為true,就會(huì)告知框架使用事務(wù)通道,并根據(jù)結(jié)果使用提交或回滾來(lái)結(jié)束所有操作,出現(xiàn)異常時(shí)則發(fā)出回滾信號(hào).
另一個(gè)提供的信號(hào)是Spring的PlatformTransactionManager實(shí)現(xiàn)(作為正在進(jìn)行的操作的上下文)的外部事務(wù).
當(dāng)框架發(fā)送或接收消息時(shí),如果過(guò)程中已經(jīng)存在一個(gè)事務(wù),且channelTransacted
標(biāo)記為true, 那么當(dāng)前消息事務(wù)的提交或回滾操作會(huì)延遲直到在當(dāng)前事務(wù)結(jié)束.如果channelTransacted
標(biāo)記為false,那么消息操作是不會(huì)應(yīng)用事務(wù)語(yǔ)義(它是自動(dòng)應(yīng)答的).
channelTransacted
標(biāo)記是一個(gè)配置時(shí)設(shè)置:它只在AMQP組件聲明時(shí)執(zhí)行一次,通常在應(yīng)用程序啟動(dòng)時(shí).原則上,外部事務(wù)更加動(dòng)態(tài)化,因?yàn)樾枰谶\(yùn)行時(shí)根據(jù)當(dāng)前線程狀態(tài)來(lái)響應(yīng),當(dāng)事務(wù)分層到應(yīng)用程序上時(shí),原則上來(lái)說(shuō)它通常也是一個(gè)配置設(shè)置.
對(duì)于使用RabbitTemplate
的同步使用,外部事務(wù)是由調(diào)用者提供的, 要么是聲明的,要么是強(qiáng)制的(日常Spring事務(wù)模式).
下面是聲明方法的一個(gè)例子(通常選擇這個(gè),因?yàn)樗?/font>非侵入的), 下面的例子中,模板已經(jīng)配置了channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
收到字符負(fù)荷,轉(zhuǎn)換,并以消息體發(fā)送到@Transactional標(biāo)記的方法中,因此如果數(shù)據(jù)處理因異常失敗了,傳入消息將返回到broker,并且輸出消息不會(huì)被發(fā)送.
在事務(wù)方法鏈中,這適用于RabbitTemplate
中的所有操作(除非Channel
較早地直接控制了提交事務(wù)).
對(duì)于SimpleMessageListenerContainer
的異步使用情況,如果需要外部事務(wù),當(dāng)設(shè)置了監(jiān)聽器時(shí),必須由容器來(lái)發(fā)出請(qǐng)求.
為了表示需要外部事務(wù),當(dāng)配置時(shí),用戶為容器提供了PlatformTransactionManager
實(shí)現(xiàn).例如:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在上面的例子中,事務(wù)管理器是通過(guò)其它bean中注入添加的(未顯示),并且channelTransacted
也設(shè)置為了true.其效果是如果監(jiān)聽器因異常失敗了,那么事務(wù)將回滾,消息也會(huì)退回到broker中.
明顯地,如果事務(wù)提交失敗(如.數(shù)據(jù)庫(kù)約束錯(cuò)誤,或通過(guò)問(wèn)題),那么AMQP 事務(wù)也要回滾,且消息也會(huì)回退到broker中.
有時(shí)候,這被稱為最好努力1階段提交(Best Efforts 1 Phase Commit),它是可靠消息非常強(qiáng)大的模式.
如果在上面的例子中將channelTransacted
標(biāo)志設(shè)為false(默認(rèn)為false),那么外部事務(wù)仍會(huì)提供給監(jiān)聽器,但所有消息操作都是自動(dòng)應(yīng)答的, 因此其效果是即使發(fā)生了業(yè)務(wù)操作,也會(huì)提供消息操作.
AMQP 事務(wù)只適用于發(fā)送應(yīng)答給broker, 所以當(dāng)有 Spring 事務(wù)回滾且又收到了消息時(shí),Spring AMQP做的不僅要回滾事務(wù),還要手動(dòng)拒絕消息.
消息上的拒絕操作獨(dú)立于事務(wù),依賴于defaultRequeueRejected
屬性(默認(rèn)為true
). 更多關(guān)于拒絕失敗消息的詳情,請(qǐng)參考the section called “Message Listeners and the Asynchronous Case”.
關(guān)于RabbitMQ 事務(wù)及其局限性的更多信息,參考RabbitMQ Broker Semantics.
重要
在 RabbitMQ 2.7.0前, 這樣的消息(當(dāng)通道關(guān)閉或中斷時(shí)未應(yīng)的消息)會(huì)回到隊(duì)列中,從2.7.0, 拒絕消息會(huì)跑到隊(duì)列前邊,與JMS回滾消息方式類似.
使用RabbitTransactionManager
RabbitTransactionManager 是執(zhí)行同步,外部事務(wù)Rabbit操作的另一種選擇.這個(gè)事務(wù)管理器是PlatformTransactionManager 接口的實(shí)現(xiàn)類,應(yīng)該在單個(gè)Rabbit ConnectionFactory中使用.
重要
此策略不能提供XA事務(wù),比如,要在消息和數(shù)據(jù)庫(kù)之間共享事務(wù).
應(yīng)用代碼需要通過(guò)ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)來(lái)獲取事務(wù)性Rabbit資源而不是使用Connection.createChannel()
調(diào)用.
當(dāng)使用Spring AMQP的 RabbitTemplate時(shí), 它會(huì)自動(dòng)檢測(cè)線程綁定通道和自動(dòng)參與事務(wù)。
在 Java 配置中,你可以使用下面的代碼來(lái)設(shè)置一個(gè)新的RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
returnnew RabbitTransactionManager(connectionFactory);
}
如果你喜歡使用XML 配置,可以像下面進(jìn)行聲明:
<bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<propertyname="connectionFactory" ref="connectionFactory"/>
</bean>
3.1.15 消息監(jiān)聽器容器配置
有相當(dāng)多的配置SimpleMessageListenerContainer
相關(guān)事務(wù)和服務(wù)質(zhì)量的選項(xiàng),它們之間可以互相交互.當(dāng)使用命名空間來(lái)配置<rabbit:listener-container/>時(shí),
下表顯示了容器屬性名稱和它們等價(jià)的屬性名稱(在括號(hào)中).
未被命名空間暴露的屬性,以`N/A`表示.