5分鐘開啟Spring AMQP的旅行.
由于默認的Spring Framework 版本依賴是4.2.x, Spring AMQP 一般來說可兼容早期版本的Spring Framework.但基于注解的監聽器和RabbitMessagingTemplate
需要Spring Framework 4.1+.
非常非常快速
使用純java來發送和接收消息:
ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
注意,在原生的Java Rabbit client中也有一個ConnectionFactory
. 在上面的代碼中,我們使用的是Spring 抽象. 我們依賴的是broker中的默認交換器 (因為在發送操作中沒有指定交換器),以及默認綁定(通過其名稱作為路由鍵將所有隊列綁定到默認交換器中).那些行為是由AMQP規范來定義的.
使用 XML配置
同上面的例子一樣,只不過將外部資源配置到了XML:
ApplicationContext context =new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate"connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
<rabbit:admin/>
聲明會默認自動查找類型為Queue
, Exchange
和Binding的bean,
并宣稱他們代表的broker的用戶, 因此在簡單的Java driver中沒有必要明確的使用那個bean.
有大量的選項來配置XML schema 中的組件屬性- 你可以使用xml編輯的自動完成功能來探索它們并查看它們的相關文檔.
使用 Java 配置
同樣的例子可使用java來外部配置:
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
returnnew CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
returnnew RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
returnnew RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
returnnew Queue("myqueue");
}
}
2.2 新特性
2.2.1從1.5到1.6的變化
測試支持
Builder
命名空間變化
連接工廠
Queue 定義
監聽器容器變化
空閑消息監聽器檢測
不匹配的隊列檢測
默認情況下,當監聽器容器啟動時,如果探測到了隊列的mismatched屬性或參數,容器會記錄異常但會繼續監聽.現在容器有了一個mismatchedQueuesFatal
屬性,當啟動時存在問題的話,這可以阻止容器(以及上下文)啟動.如果隨后檢測到問題,它也會停止容器,如從連接故障中恢復. 參考Section 3.1.15, “Message Listener Container Configuration” 來了解更多信息.
監聽器容器日志
現在監聽器容器提供了它的beanName
給內部 SimpleAsyncTaskExecutor
作為threadNamePrefix
.對于日志分析非常有用.
默認錯誤處理器
默認錯誤處理器(ConditionalRejectingErrorHandler
) 現在認為無可挽救的@RabbitListener
異常是致命的. 參考Section 3.1.13, “Exception Handling” 來了解更多信息.
AutoDeclare 與 RabbitAdmins
AmqpTemplate: receive 與 timeout
AsyncRabbitTemplate
引入了新的AsyncRabbitTemplate
. 此模塊提供了許多發送和接收的方法,其返回值為ListenableFuture
,此后可通過它來同步或異步地獲取結果. 參考 the section called “AsyncRabbitTemplate” 來了解更多信息
RabbitTemplate 變化
消息屬性
CorrelationId
長字符串頭
以前,DefaultMessagePropertiesConverter
會將頭轉換成(其頭長度不超過長字符串限制(默認為1024) )成一個DataInputStream
(實際上它只是引用LongString的
DataInputStream
).
在輸出時,這個頭不會進行轉換(除了字符串, 例如. 在java.io.DataInputStream@1d057a39
上調用toString()方法)
.
在這個版本中, Long LongString
默認作為 LongString
s ;你可以通過其getBytes[]
, toString()
, 或 getStream()
方法來訪問它的內容. 更大的輸入LongString
現在也會在輸出上正確轉換了.
參考 the section called “Message Properties Converters” 來了解更多信息.
Inbound Delivery Mode
deliveryMode
屬性不再映射MessageProperties.deliveryMode
;這是為了避免意外傳播,如果同一個MessageProperties
對象用來發送出站消息.
相反, 入站deliveryMode
頭映射為MessageProperties.receivedDeliveryMode
.
參考 the section called “Message Properties Converters” 來了解更多信息.
當使用注解endpoints時,頭將在名為AmqpHeaders.RECEIVED_DELIVERY_MODE
的頭中提供
.
參考the section called “Annotated Endpoint Method Signature” 來了解更多信息.
Inbound User ID
RabbitAdmin 變化
Declaration Failures
之間,ignoreDeclarationFailures
標志只在channel上發生IOException
才會生效(如未匹配參數). 現在它對于任何異常都可以生效(如:TimeoutException
).
此外,無論何時聲明失敗,都會發布DeclarationExceptionEvent
.RabbitAdmin
最后聲明事件將作為屬性lastDeclarationExceptionEvent也是可用的.
參考 Section 3.1.10, “Configuring the broker” 來了解更多信息.
@RabbitListener Changes
Multiple Containers per Bean
當使用Java 8+,可以添加多個@RabbitListener
注解到 @Bean
類或它們的方法上.當使用Java 7-,你可以使用 @RabbitListeners
容器注解來提供同樣的功能.
參考the section called “@Repeatable @RabbitListener” 來了解更多信息.
@SendTo SpEL Expressions
@SendTo
for routing replies with no replyTo
property can now be SpEL expressions evaluated against the request/reply.
參考 the section called “Reply Management”來了解更多信息.
@QueueBinding Improvements
延遲消息交換器(Delayed Message Exchange)
交換器內部標志(Exchange internal flag)
CachingConnectionFactory 變化
CachingConnectionFactory Cache Statistics
訪問底層RabbitMQ連接工廠
Channel Cache
默認的channel 緩存大小已從1增加到了25. 參考Section 3.1.2, “Connection and Resource Management”來了解更多信息.
此外, SimpleMessageListenerContainer
不再設置緩存大小至少要與concurrentConsumers的數目一樣大
- 這是多余的,因為容器消費者channels是不會緩存的.
RabbitConnectionFactoryBean
factory bean現在暴露了一個屬性來將client連接屬性添加到由工廠產生的連接中.
Java 反序列化(Deserialization)
JSON MessageConverter
Logging Appenders
Log4j2
加入了log4j2 appender,此appenders現在可配置addresses
屬性來連接broker集群.
Client 連接屬性
2.2.2 早期版本