介紹
AMQP 規范描述了協議是如何用于broker中隊列,交換器以及綁定上的.這些操作是從0.8規范中移植的,更高的存在于org.springframework.amqp.core包中的AmqpAdmin 接口中.
那個接口的RabbitMQ 實現是RabbitAdmin,它位于org.springframework.amqp.rabbit.core 包.
AmqpAdmin接口是基于Spring AMQP 域抽象,展示如下:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
getQueueProperties()
方法會返回關于隊列的的一些有限信息(消息個數和消費者數目). 屬性返回的keys像RabbitTemplate
(QUEUE_NAME
, QUEUE_MESSAGE_COUNT
, QUEUE_CONSUMER_COUNT
)中的常量一樣是可用的.
RabbitMQ REST API 提供了更多關于 QueueInfo
對象的信息.
無參 declareQueue()
方法在broker上定義了一個隊列,其名稱是自動生成的. 自動生成隊列的其它屬性是exclusive=true
, autoDelete=true
, and durable=false
.
declareQueue(Queue queue)
方法接受一個 Queue
對象,并且返回聲明隊列的名稱.如果提供的隊列名稱是空字符串,broker 使用生成的名稱來聲明隊列再將名稱返回給調用者. Queue
對象本身是不會變化的.
這種功能只能用于編程下直接調用RabbitAdmin
. 它不支持在應用程序上下文中由admin來定義隊列的自動聲明.
與此形成鮮明對比的是,AnonymousQueue,
框架會為其生成唯一名稱(UUID),durable為false,exclusive
, autoDelete
為true的匿名隊列
. <rabbit:queue/>
帶空的或缺失的name
屬性總會創建 一個AnonymousQueue
.
參考the section called “AnonymousQueue” 來理解為什么 AnonymousQueue
會優先選擇broker生成隊列名稱,以及如何來控制名稱格式. 聲明隊列必須有固定的名稱,因為它們可能會上下文的其它地方引用,例如,在監聽器中:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
參考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”.
此接口的RabbitMQ實現是RabbitAdmin,當用Spring XML配置時,看起來像下面這樣:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
當CachingConnectionFactory
緩存模式是CHANNEL
時(默認的), RabbitAdmin
實現會在同一個ApplicationContext中自動延遲聲明 Queues
,Exchanges
和 Bindings
.
只要Connection打開了與Broker的連接,這些組件就會被聲明.有一些命名空間特性可以使這些變得便利,如,在Stocks 樣例程序中有:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses" xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata" xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue"pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在上面的例子中,我們使用匿名隊列(實際上由框架內部生成,而非由broker生成的隊列),并用ID進行了指定.我們也可以使用明確的名稱來聲明隊列,也作為上下文中bean定義的標識符.如.
<rabbit:queue name="stocks.trade.queue"/>
重要
你可以提供id 和 name 屬性.這允許你獨立于隊列名稱通過id來指定隊列.它允許使用標準的Spring 屬性,如屬性占位符和隊列名稱的SpEL 表達式; 當使用名稱來作為標識符,這些特性是不可用的.
隊列也可以使用其它的參數進行配置,例如x-message-ttl 或 x-ha-policy.通過命名空間支持,它們可以通過<rabbit:queue-arguments>元素以參數名/參數值的MAP形式來提供 .
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
默認情況下,參數假設為字符串.對于其它類型的參數,需要提供類型.
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
當提供混合類型的參數時,可為每個entry元素提供type:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
在Spring Framework 3.2或以后,聲明起來更加簡潔:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
重要
RabbitMQ broker 不允許使用不匹配的參數來聲明隊列. 例如,如果一個無time to live參數的隊列已經存在,然后你試圖使用 key="x-message-ttl" value="100"進行聲明
,那么會拋出一個異常.
默認情況下,當出現異常時, RabbitAdmin
會立即停止所有聲明的處理過程;這可能會導致下游問題- 如監聽器容器會初始化失敗,因另外的隊列沒有聲明.
這種行為可以通過在RabbitAdmin上設置 ignore-declaration-exceptions
為true來修改. 此選項會指示RabbitAdmin
記錄異常,并繼續聲明其它元素.當使用Java來配置RabbitAdmin
時, 此屬性為ignoreDeclarationExceptions
.
這是一個全局設置,它將應用到所有元素上,如應用到queues, exchanges 和bindings這些具有相似屬性的元素上.
在1.6版本之前, 此屬性只會在channel上發生IOExcepton時才會起作用- 如當目前和期望屬性發生錯配時. 現在, 這個屬性可在任何異常上起作用,包括TimeoutException
等等.
此外,任何聲明異常都會導致發布DeclarationExceptionEvent
, 這是一個ApplicationEvent
,在上下文中可通過任何ApplicationListener
消費. 此事件包含了admin的引用, 正在聲明的元素以及Throwable
.
從1.3版本開始, HeadersExchange
可配置匹配多個headers; 你也可以指定是否需要必須匹配任何一個或全部headers:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entrykey="foo"value="bar"/>
<entrykey="baz"value="qux"/>
<entrykey="x-match"value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
從1.6版本開始,Exchanges
可使用internal
標志來配置(默認為false
) ,當然,這樣的Exchange
也可以通過 RabbitAdmin
來配置(如果在應用程序上下文中存在).
如果對于交換器來說,internal
標志為true
, RabbitMQ 會允許客戶端來使用交換器.這對于死信交換器來說或交換器到交換器綁定來說,是很用的,因為在這些地方你不想讓發布者直接使用交換器.
要看如何使用Java來配置AMQP基礎設施,可查看Stock樣例程序,在那里有一個帶@Configuration
注解的抽象AbstractStockRabbitConfiguration
類,它依次有RabbitClientConfiguration
和 RabbitServerConfiguration
子類. AbstractStockRabbitConfiguration
的代碼展示如下:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
returnnew JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
returnnew TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在Stock 程序中,服務器使用下面的@Configuration注解來配置:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
returnnew Queue("app.stock.request");
}
}
這是整個@Configuration 類繼承鏈結束的地方. 最終結果是TopicExchange 和隊列會在應用程序啟動時被聲明.在服務器配置中,沒有TopicExchange與隊列的綁定,因為這是在客戶端程序完成的.
然后stock 請求隊列是自動綁定到AMQP 默認交換器上的 - 這種行為是由規范來定義的.
客戶端 @Configuration 類令人關注的地方展示如下.
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客戶端使用AmqpAdmin的declareQueue()方法聲明了另一個隊列,并將其綁定到了market data 交換器上(路由鍵模式是通常外部properties文件來定義的).
Queues 和Exchanges的Builder API
當使用Java配置時,1.6版本引入了一個便利的API來配置Queue
和Exchange
對象:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
查看 org.springframework.amqp.core.QueueBuilder
和 org.springframework.amqp.core.ExchangeBuilder
的JavaDoc來了解更多信息.
Declaring Collections of Exchanges, Queues, Bindings
從1.5版本開始,可以在一個@Bean聲明多個條目來返回集合.
只有集合中的第一個元素可認為是Declarablea的,并且只有集合中的Declarable
元素會被處理.(
Only collections where the first element is a Declarable
are considered, and only Declarable
elements from such collections are processed.)
@Configuration
public static class Config {
@Bean
public ConnectionFactory cf() {
returnnew CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
returnnew RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
returnnew DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
returnnew Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public List<Exchange> es() {
return Arrays.<Exchange>asList(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true)
);
}
@Bean
public List<Queue> qs() {
return Arrays.asList(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true)
);
}
@Bean
public List<Binding> bs() {
return Arrays.asList(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)
);
}
@Bean
public List<Declarable> ds() {
return Arrays.<Declarable>asList(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)
);
}
}
條件聲明
默認情況下,所有queues, exchanges,和bindings 都可通過應用程序上下文中所有RabbitAdmin
實例來聲明(設置了auto-startup="true"
).
重要
從1.2版本開始,可以有條件地聲明元素.當程序連接了多個brokers,并需要在哪些brokers上聲明特定元素時,特別有用.
代表這些元素要實現Declarable
接口,此接口有兩個方法: shouldDeclare()
和 getDeclaringAdmins()
. RabbitAdmin
使用這些方法來確定某個特定實例是否應該在其Connection上處理聲明.
這些屬性作為命名空間的屬性是可用的,如下面的例子所示.
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:queue id="declaredByBothAdminsImplicitly" />
<rabbit:queue id="declaredByBothAdmins" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAny" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:bindingkey="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
重要
默認情況下,如果沒有提供declared-by(或是空的), auto-declare
屬性則為 true,那么所有
RabbitAdmin將聲明對象
(只要admin的auto-startup
屬性為true,默認值).
現樣的,你可以使用基于Java的@Configuration
注解來達到同樣的效果.在這個例子中,組件會由admin1來聲明,而不是admin2
:
@Bean
public RabbitAdmin admin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
public RabbitAdmin admin2() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin());
return binding;
}
AnonymousQueue
一般來說,當需要一個獨特命名,專用的,自動刪除隊列時,建議使用AnonymousQueue
來代替中間件定義的隊列名稱(使用 ""
作為隊列名稱會導致中間件生成隊列名稱).
這是因為:
- 隊列實際上是在與broker的連接建立時聲明的;這在bean創建和包裝之后要很長時間;使用這個隊列的beans需要知道其名稱.而事實上,當app啟動時,broker甚至還沒有運行.
- 如果與broker的連接因某種原因丟失了,admin會使用相同的名稱會重新聲明
AnonymousQueue
.如果我們使用broker-聲明隊列,隊列名稱可能會改變.
從1.5.3版本開始,你可通過AnonymousQueue 來控制隊列名稱的格式.
默認情況下,隊列名稱是UUID的字符串表示; 例如: 07afcfe9-fe77-4983-8645-0061ec61a47a
.
現在,你可以提供一個 AnonymousQueue.NamingStrategy
實現作為其構造器參數:
@Bean
public Queue anon1() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
}
第一個會生成隊列名稱前輟spring.gen-
其后為UUID base64 的表示,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
. 第二個會生成隊列名稱前輟為foo-
其后為UUID的 base64 表示.
base64 編碼使用RFC 4648的"URL and Filename Safe Alphabet" ; 刪除了字符(=
).
你可以提供你自己的命名策略, 可以包括隊列名稱中的其他信息(例如應用程序、客戶端主機)。
從1.6版本開始,當使用XML配置時,可以指定命名策略; naming-strategy
屬性出現在<rabbit:queue>元素的屬性中,對于bean引用來說,它們實現了
AnonymousQueue.NamingStrategy
.
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="springNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一個創建了UUID字符串表示的名稱.第二個創建了類似spring.gen-MRBv9sqISkuCiPfOYfpo4g的名稱
. 第三個創建了類似custom.gen-MRBv9sqISkuCiPfOYfpo4g
的名稱.
當然,你可以提供你自己的命名策略bean.
3.1.11 延遲的消息交換器
1.6版本引入了 Delayed Message Exchange Plugin支持.
該插件目前被標記為實驗性質,但可用已超過一年(在寫作的時間)。如果插件的變化是必要的,我們將盡快添加支持這樣的變化。因此,這種在Spring AMQP支持同樣也應考慮為實驗性質.這個功能在RabbitMQ 3.6.0版本和0.0.1插件版本中經過測試。
要使用RabbitAdmin
來聲明一個延遲交換器,只需要在交換器上簡單地設置delayed
屬性為true. RabbitAdmin
會使用交換器類型(Direct
, Fanout
等)來設置x-delayed-type
參數,并使用x-delayed-message來聲明交換器
.
當使用XML來配置交換器beans時,delayed
屬性 (默認為false
)是可用的.
<rabbit:topic-exchange name="topic" delayed="true" />
要發送延遲消息,只需要通過MessageProperties設置x-delay
header:
MessageProperties properties = new MessageProperties();
properties.setXDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
或
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setXDelay(15000);
return message;
}
});
要檢查消息是否是延遲的,可調用MessageProperties的getReceivedDelay()
. 它是一個獨立的屬性,以避免從一個輸入消息意外的傳播到一個輸出消息。
3.1.12 RabbitMQ REST API
當啟用了管理插件時,RabbitMQ 服務器公開了 REST API 來監控和配置broker.
現在提供了 Java Binding for the API.一般來說,你可以直接使用API,但提供了便利的包裝器來使用熟悉的Spring AMQP Queue
, Exchange
, 和 Binding
域對象.
當直接使用 com.rabbitmq.http.client.Client
API (分別使用QueueInfo
, ExchangeInfo
, 和BindingInfo
),那些對象的更多信息將可用. 下面是RabbitManagementTemplate上的可用操作
:
public interface AmqpManagementOperations {
void addExchange(Exchange exchange);
void addExchange(String vhost, Exchange exchange);
void purgeQueue(Queue queue);
void purgeQueue(String vhost, Queue queue);
void deleteQueue(Queue queue);
void deleteQueue(String vhost, Queue queue);
Queue getQueue(String name);
Queue getQueue(String vhost, String name);
List<Queue> getQueues();
List<Queue> getQueues(String vhost);
void addQueue(Queue queue);
void addQueue(String vhost, Queue queue);
void deleteExchange(Exchange exchange);
void deleteExchange(String vhost, Exchange exchange);
Exchange getExchange(String name);
Exchange getExchange(String vhost, String name);
List<Exchange> getExchanges();
List<Exchange> getExchanges(String vhost);
List<Binding> getBindings();
List<Binding> getBindings(String vhost);
List<Binding> getBindingsForExchange(String vhost, String exchange);
}
參考javadocs 來了解更多信息.
3.1.13 異常處理
RabbitMQ Java client的許多操作會拋出受查異常. 例如,有許多可能拋出IOExceptions的地方. RabbitTemplate, SimpleMessageListenerContainer, 和其它Spring AMQP 組件會捕獲這些異常,并將它們轉換為運行時層次的異常.
這些是定義在org.springframework.amqp 包中的,且 AmqpException 是層次結構的基礎.
當監聽器拋出異常時,它會包裝在一個 ListenerExecutionFailedException
中,正常情況下消息會被拒絕并由broker重新排隊.將defaultRequeueRejected
設置為false 可導致消息丟棄(或路由到死信交換器中).
正如 the section called “Message Listeners and the Asynchronous Case”討論的,監聽器可拋出 AmqpRejectAndDontRequeueException
來有條件地控制這種行為。
然而,有一種類型的錯誤,監聽器無法控制其行為. 當遇到消息不能轉換時(例如,無效的content_encoding
頭),那么消息在到達用戶代碼前會拋出一些異常.當設置 defaultRequeueRejected
為 true
(默認),這樣的消息可能會一遍又一遍地重新投遞.
在1.3.2版本之前,用戶需要編寫定制ErrorHandler
, 正如Section 3.1.13, “Exception Handling” 描述的內容來避免這種情況.
從1.3.2版本開始,默認的ErrorHandler
是 ConditionalRejectingErrorHandler
,它將拒絕那些失敗且不可恢復的消息 (不會重新排隊):
o.s.amqp...MessageConversionException
o.s.messaging...MessageConversionException
o.s.messaging...MethodArgumentNotValidException
o.s.messaging...MethodArgumentTypeMismatchException
第一個是在使用MessageConverter轉換傳入消息負荷時拋出的.
第二個是當映射到@RabbitListener方法時,轉換服務需要其它轉換拋出的.
第三個是在監聽器上使用了驗證(如.@Valid),且驗證失敗時拋出的.
第四個是對于目標方法傳入消息類型轉換失敗拋出的.例如,參數聲明為Message<Foo>
,但收到的是Message<Bar>
.
錯誤處理器的實例可使用FatalExceptionStrategy
來配置,因為用戶可以提供它們的規則來有條件的拒絕消息,如. 來自 Spring Retry (the section called “Message Listeners and the Asynchronous Case”)中的BinaryExceptionClassifier代理實現.
此外, ListenerExecutionFailedException
現在有一個可用于決策的failedMessage
屬性.如果FatalExceptionStrategy.isFatal()
方法返回true,錯誤處理器會拋出AmqpRejectAndDontRequeueException
.
默認FatalExceptionStrategy
會記錄warning信息.
3.1.14 事務(Transactions)
介紹
Spring Rabbit 框架支持在同步和異步使用中使用不同語義(這一點對于現有Spring事務的用戶是很熟悉的)來支持自動事務管理. 它做了很多,不是常見消息模式能輕易實現的.
有兩種方法可用來向框架發出期望事務語義的信號.在RabbitTemplate
和 SimpleMessageListenerContainer
中,這里有一個channelTransacted
標記,如果它為true,就會告知框架使用事務通道,并根據結果使用提交或回滾來結束所有操作,出現異常時則發出回滾信號.
另一個提供的信號是Spring的PlatformTransactionManager實現(作為正在進行的操作的上下文)的外部事務.
當框架發送或接收消息時,如果過程中已經存在一個事務,且channelTransacted
標記為true, 那么當前消息事務的提交或回滾操作會延遲直到在當前事務結束.如果channelTransacted
標記為false,那么消息操作是不會應用事務語義(它是自動應答的).
channelTransacted
標記是一個配置時設置:它只在AMQP組件聲明時執行一次,通常在應用程序啟動時.原則上,外部事務更加動態化,因為需要在運行時根據當前線程狀態來響應,當事務分層到應用程序上時,原則上來說它通常也是一個配置設置.
對于使用RabbitTemplate
的同步使用,外部事務是由調用者提供的, 要么是聲明的,要么是強制的(日常Spring事務模式).
下面是聲明方法的一個例子(通常選擇這個,因為它是非侵入的), 下面的例子中,模板已經配置了channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
收到字符負荷,轉換,并以消息體發送到@Transactional標記的方法中,因此如果數據處理因異常失敗了,傳入消息將返回到broker,并且輸出消息不會被發送.
在事務方法鏈中,這適用于RabbitTemplate
中的所有操作(除非Channel
較早地直接控制了提交事務).
對于SimpleMessageListenerContainer
的異步使用情況,如果需要外部事務,當設置了監聽器時,必須由容器來發出請求.
為了表示需要外部事務,當配置時,用戶為容器提供了PlatformTransactionManager
實現.例如:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在上面的例子中,事務管理器是通過其它bean中注入添加的(未顯示),并且channelTransacted
也設置為了true.其效果是如果監聽器因異常失敗了,那么事務將回滾,消息也會退回到broker中.
明顯地,如果事務提交失敗(如.數據庫約束錯誤,或通過問題),那么AMQP 事務也要回滾,且消息也會回退到broker中.
有時候,這被稱為最好努力1階段提交(Best Efforts 1 Phase Commit),它是可靠消息非常強大的模式.
如果在上面的例子中將channelTransacted
標志設為false(默認為false),那么外部事務仍會提供給監聽器,但所有消息操作都是自動應答的, 因此其效果是即使發生了業務操作,也會提供消息操作.
AMQP 事務只適用于發送應答給broker, 所以當有 Spring 事務回滾且又收到了消息時,Spring AMQP做的不僅要回滾事務,還要手動拒絕消息.
消息上的拒絕操作獨立于事務,依賴于defaultRequeueRejected
屬性(默認為true
). 更多關于拒絕失敗消息的詳情,請參考the section called “Message Listeners and the Asynchronous Case”.
關于RabbitMQ 事務及其局限性的更多信息,參考RabbitMQ Broker Semantics.
重要
在 RabbitMQ 2.7.0前, 這樣的消息(當通道關閉或中斷時未應的消息)會回到隊列中,從2.7.0, 拒絕消息會跑到隊列前邊,與JMS回滾消息方式類似.
使用RabbitTransactionManager
RabbitTransactionManager 是執行同步,外部事務Rabbit操作的另一種選擇.這個事務管理器是PlatformTransactionManager 接口的實現類,應該在單個Rabbit ConnectionFactory中使用.
重要
此策略不能提供XA事務,比如,要在消息和數據庫之間共享事務.
應用代碼需要通過ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)來獲取事務性Rabbit資源而不是使用Connection.createChannel()
調用.
當使用Spring AMQP的 RabbitTemplate時, 它會自動檢測線程綁定通道和自動參與事務。
在 Java 配置中,你可以使用下面的代碼來設置一個新的RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
returnnew RabbitTransactionManager(connectionFactory);
}
如果你喜歡使用XML 配置,可以像下面進行聲明:
<bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<propertyname="connectionFactory" ref="connectionFactory"/>
</bean>
3.1.15 消息監聽器容器配置
有相當多的配置SimpleMessageListenerContainer
相關事務和服務質量的選項,它們之間可以互相交互.當使用命名空間來配置<rabbit:listener-container/>時,
下表顯示了容器屬性名稱和它們等價的屬性名稱(在括號中).
未被命名空間暴露的屬性,以`N/A`表示.