5分鐘開啟Spring AMQP的旅行.
由于默認(rèn)的Spring Framework 版本依賴是4.2.x, Spring AMQP 一般來(lái)說可兼容早期版本的Spring Framework.但基于注解的監(jiān)聽器和RabbitMessagingTemplate
需要Spring Framework 4.1+.
非常非常快速
使用純java來(lái)發(fā)送和接收消息:
ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
注意,在原生的Java Rabbit client中也有一個(gè)ConnectionFactory
. 在上面的代碼中,我們使用的是Spring 抽象. 我們依賴的是broker中的默認(rèn)交換器 (因?yàn)樵诎l(fā)送操作中沒有指定交換器),以及默認(rèn)綁定(通過其名稱作為路由鍵將所有隊(duì)列綁定到默認(rèn)交換器中).那些行為是由AMQP規(guī)范來(lái)定義的.
使用 XML配置
同上面的例子一樣,只不過將外部資源配置到了XML:
ApplicationContext context =new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate"connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
<rabbit:admin/>
聲明會(huì)默認(rèn)自動(dòng)查找類型為Queue
, Exchange
和Binding的bean,
并宣稱他們代表的broker的用戶, 因此在簡(jiǎn)單的Java driver中沒有必要明確的使用那個(gè)bean.
有大量的選項(xiàng)來(lái)配置XML schema 中的組件屬性- 你可以使用xml編輯的自動(dòng)完成功能來(lái)探索它們并查看它們的相關(guān)文檔.
使用 Java 配置
同樣的例子可使用java來(lái)外部配置:
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
returnnew CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
returnnew RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
returnnew RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
returnnew Queue("myqueue");
}
}
2.2 新特性
2.2.1從1.5到1.6的變化
測(cè)試支持
Builder
命名空間變化
連接工廠
Queue 定義
監(jiān)聽器容器變化
空閑消息監(jiān)聽器檢測(cè)
不匹配的隊(duì)列檢測(cè)
默認(rèn)情況下,當(dāng)監(jiān)聽器容器啟動(dòng)時(shí),如果探測(cè)到了隊(duì)列的mismatched屬性或參數(shù),容器會(huì)記錄異常但會(huì)繼續(xù)監(jiān)聽.現(xiàn)在容器有了一個(gè)mismatchedQueuesFatal
屬性,當(dāng)啟動(dòng)時(shí)存在問題的話,這可以阻止容器(以及上下文)啟動(dòng).如果隨后檢測(cè)到問題,它也會(huì)停止容器,如從連接故障中恢復(fù). 參考Section 3.1.15, “Message Listener Container Configuration” 來(lái)了解更多信息.
監(jiān)聽器容器日志
現(xiàn)在監(jiān)聽器容器提供了它的beanName
給內(nèi)部 SimpleAsyncTaskExecutor
作為threadNamePrefix
.對(duì)于日志分析非常有用.
默認(rèn)錯(cuò)誤處理器
默認(rèn)錯(cuò)誤處理器(ConditionalRejectingErrorHandler
) 現(xiàn)在認(rèn)為無(wú)可挽救的@RabbitListener
異常是致命的. 參考Section 3.1.13, “Exception Handling” 來(lái)了解更多信息.
AutoDeclare 與 RabbitAdmins
AmqpTemplate: receive 與 timeout
AmqpTemplate
和它的RabbitTemplate
實(shí)現(xiàn)引入了
許多新的帶有timeout的receive()
方法. 參考the section called “Polling Consumer” 來(lái)了解更多信息.
AsyncRabbitTemplate
引入了新的AsyncRabbitTemplate
. 此模塊提供了許多發(fā)送和接收的方法,其返回值為ListenableFuture
,此后可通過它來(lái)同步或異步地獲取結(jié)果. 參考 the section called “AsyncRabbitTemplate” 來(lái)了解更多信息
RabbitTemplate 變化
1.4.1 在broker支持時(shí),引入了Direct reply-to 的能力;它比使用臨時(shí)隊(duì)列來(lái)回應(yīng)更高效. 這個(gè)版本允許你覆蓋這種默認(rèn)行為,通常設(shè)置useTemporaryReplyQueues屬性為true來(lái)使用臨時(shí)隊(duì)列. 參考 the section called “RabbitMQ Direct reply-to” 來(lái)了解更多信息.
RabbitTemplate
現(xiàn)在支持user-id-expression
(userIdExpression
當(dāng)使用Java配置時(shí)). 參考See Validated User-ID RabbitMQ documentationand the section called “Validated User Id” 來(lái)了解更多信息.
消息屬性
CorrelationId
長(zhǎng)字符串頭
以前,DefaultMessagePropertiesConverter
會(huì)將頭轉(zhuǎn)換成(其頭長(zhǎng)度不超過長(zhǎng)字符串限制(默認(rèn)為1024) )成一個(gè)DataInputStream
(實(shí)際上它只是引用LongString的
DataInputStream
).
在輸出時(shí),這個(gè)頭不會(huì)進(jìn)行轉(zhuǎn)換(除了字符串, 例如. 在java.io.DataInputStream@1d057a39
上調(diào)用toString()方法)
.
在這個(gè)版本中, Long LongString
默認(rèn)作為 LongString
s ;你可以通過其getBytes[]
, toString()
, 或 getStream()
方法來(lái)訪問它的內(nèi)容. 更大的輸入LongString
現(xiàn)在也會(huì)在輸出上正確轉(zhuǎn)換了.
參考 the section called “Message Properties Converters” 來(lái)了解更多信息.
Inbound Delivery Mode
deliveryMode
屬性不再映射MessageProperties.deliveryMode
;這是為了避免意外傳播,如果同一個(gè)MessageProperties
對(duì)象用來(lái)發(fā)送出站消息.
相反, 入站deliveryMode
頭映射為MessageProperties.receivedDeliveryMode
.
參考 the section called “Message Properties Converters” 來(lái)了解更多信息.
當(dāng)使用注解endpoints時(shí),頭將在名為AmqpHeaders.RECEIVED_DELIVERY_MODE
的頭中提供
.
參考the section called “Annotated Endpoint Method Signature” 來(lái)了解更多信息.
Inbound User ID
user_id
屬性不再映射MessageProperties.userId
; 這是為了避免意外傳播,如果同一個(gè)MessageProperties
對(duì)象用來(lái)發(fā)送出站消息.相反, 入站userId
頭會(huì)映射到MessageProperties.receivedUserId
.
參考 the section called “Message Properties Converters” 來(lái)了解更多信息.
當(dāng)使用注解endpoints時(shí),頭將在名為AmqpHeaders.RECEIVED_USER_ID的頭中提供
.
參考 the section called “Annotated Endpoint Method Signature” 來(lái)了解更多信息.
RabbitAdmin 變化
Declaration Failures
之間,ignoreDeclarationFailures
標(biāo)志只在channel上發(fā)生IOException
才會(huì)生效(如未匹配參數(shù)). 現(xiàn)在它對(duì)于任何異常都可以生效(如:TimeoutException
).
此外,無(wú)論何時(shí)聲明失敗,都會(huì)發(fā)布DeclarationExceptionEvent
.RabbitAdmin
最后聲明事件將作為屬性lastDeclarationExceptionEvent也是可用的.
參考 Section 3.1.10, “Configuring the broker” 來(lái)了解更多信息.
@RabbitListener Changes
Multiple Containers per Bean
當(dāng)使用Java 8+,可以添加多個(gè)@RabbitListener
注解到 @Bean
類或它們的方法上.當(dāng)使用Java 7-,你可以使用 @RabbitListeners
容器注解來(lái)提供同樣的功能.
參考the section called “@Repeatable @RabbitListener” 來(lái)了解更多信息.
@SendTo SpEL Expressions
@SendTo
for routing replies with no replyTo
property can now be SpEL expressions evaluated against the request/reply.
參考 the section called “Reply Management”來(lái)了解更多信息.
@QueueBinding Improvements
現(xiàn)在你可以在@QueueBinding
注解中為queues, exchanges 和bindings 指定參數(shù).
Header交換器可通過@QueueBinding來(lái)支持
.
參考the section called “Annotation-driven Listener Endpoints” 來(lái)了解更多信息.
延遲消息交換器(Delayed Message Exchange)
交換器內(nèi)部標(biāo)志(Exchange internal flag)
任何Exchange
定義現(xiàn)在都可標(biāo)記為internal
,當(dāng)聲明交換器時(shí),RabbitAdmin
會(huì)將值傳遞給broker.
參考 Section 3.1.10, “Configuring the broker”來(lái)了解更多信息.
CachingConnectionFactory 變化
CachingConnectionFactory Cache Statistics
訪問底層RabbitMQ連接工廠
Channel Cache
默認(rèn)的channel 緩存大小已從1增加到了25. 參考Section 3.1.2, “Connection and Resource Management”來(lái)了解更多信息.
此外, SimpleMessageListenerContainer
不再設(shè)置緩存大小至少要與concurrentConsumers的數(shù)目一樣大
- 這是多余的,因?yàn)槿萜飨M(fèi)者channels是不會(huì)緩存的.
RabbitConnectionFactoryBean
factory bean現(xiàn)在暴露了一個(gè)屬性來(lái)將client連接屬性添加到由工廠產(chǎn)生的連接中.
Java 反序列化(Deserialization)
當(dāng)使用Java反序列化時(shí),現(xiàn)在允許配置類的白名單. 如果你從不可信來(lái)源接收消息,考慮創(chuàng)建白名單是很重要的. 參考 the section called “Java Deserialization” 來(lái)了解更多信息.
JSON MessageConverter
Logging Appenders
Log4j2
加入了log4j2 appender,此appenders現(xiàn)在可配置addresses
屬性來(lái)連接broker集群.
Client 連接屬性
2.2.2 早期版本