Queue Affinity 和 LocalizedQueueConnectionFactory
當(dāng)在集群中使用HA隊(duì)列時,為了獲取最佳性能,可以希望連接到主隊(duì)列所在的物理broker. 雖然CachingConnectionFactory
可以配置為使用多個broker 地址; 這會失敗的,client會嘗試按順序來連接. LocalizedQueueConnectionFactory
使用管理插件提供的 REST API來確定包含master隊(duì)列的節(jié)點(diǎn).然后,它會創(chuàng)建(或從緩存中獲取)一個只連接那個節(jié)點(diǎn)的CachingConnectionFactory
.如果連接失敗了,將會確定一個新的消費(fèi)者可連接的master節(jié)點(diǎn). LocalizedQueueConnectionFactory
使用默認(rèn)的連接工廠進(jìn)行配置,在隊(duì)列物理位置不能確定的情況下,它會按照正常情況來連接集群.
LocalizedQueueConnectionFactory
是一個RoutingConnectionFactory
, SimpleMessageListenerContainer
會使用隊(duì)列名稱作為其lookup key ,這些已經(jīng)在上面的 the section called “Routing Connection Factory” 討論過了.
基于這個原因(使用隊(duì)列名稱來作查找鍵),LocalizedQueueConnectionFactory
只在容器配置為監(jiān)聽某個單一隊(duì)列時才可使用.
RabbitMQ 管理插件應(yīng)該在每個節(jié)點(diǎn)上開啟.
警告
這種連接工廠用于長連接,如用在SimpleMessageListenerContainer的連接
.它的目的不是用于短連接, 如在 RabbitTemplate中使用,這是因?yàn)樵谶B接前,它要調(diào)用
REST API. 此外,對于發(fā)布操作來說,隊(duì)列是未知的,不管如何, 消息會發(fā)布到所有集群成員中,因此查找節(jié)點(diǎn)的邏輯幾乎沒有什么意義。
這里有一個樣例配置,使用了Spring Boot的RabbitProperties來配置工廠:
@Autowired
private RabbitProperties props;
private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };
private final String[] nodes = { "rabbit@host1", "rabbit@host2" };
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
注意,三個參數(shù)是 addresses
, adminUris
和 nodes的數(shù)組
. 當(dāng)一個容器試圖連接一個隊(duì)列時,它們是有位置性的,它決定了哪個節(jié)點(diǎn)上的隊(duì)列是mastered,并以同樣數(shù)組位置來連接其地址.
發(fā)布者確認(rèn)和返回
確認(rèn)和返回消息可通過分別設(shè)置CachingConnectionFactory
的 publisherConfirms
和publisherReturns
屬性為ture來完成.
當(dāng)設(shè)置了這些選項(xiàng)時,由工廠創(chuàng)建的通道將包裝在PublisherCallbackChannel中
,這用來方便回調(diào). 當(dāng)獲取到這樣的通道時,client可在channel上注冊一個 PublisherCallbackChannel.Listener
. PublisherCallbackChannel
實(shí)現(xiàn)包含一些邏輯來路由確認(rèn)/返回給適當(dāng)?shù)谋O(jiān)聽器. 這些特性將在下面的章節(jié)中進(jìn)一步解釋.
對于一些更多的背景信息, 可以參考下面的博客:Introducing Publisher Confirms.
記錄通道關(guān)閉事件
1.5版本中引入了允許用戶控制日志級別的機(jī)制.
CachingConnectionFactory
使用默認(rèn)的策略來記錄通道關(guān)閉事件:
- 不記錄通道正常關(guān)閉事件 (200 OK).
- 如果通道是因?yàn)槭〉谋粍拥年?duì)列聲明關(guān)閉的,將記錄為debug級別.
- 如果通道關(guān)閉是因?yàn)?span style="color: #6d180b; font-family: Monaco, Consolas, Courier, 'Lucida Console', monospace; background-color: #f2f2f2;">basic.consume因專用消費(fèi)者條件而拒絕引起的,將被記錄為INFO級別.
- 所有其它的事件將記錄為ERROR級別.
要修改此行為,需要在CachingConnectionFactory的closeExceptionLogger屬性中注入一個自定義的ConditionalExceptionLogger.
也可參考the section called “Consumer Failure Events”.
運(yùn)行時緩存屬性
從1.6版本開始, CachingConnectionFactory
通過getCacheProperties()方法提供了緩存統(tǒng)計(jì). 這些統(tǒng)計(jì)數(shù)據(jù)可用來在生產(chǎn)環(huán)境中優(yōu)化緩存.例如, 最高水位標(biāo)記可用來確定是否需要加大緩存.如果它等于緩存大小,你也許應(yīng)該考慮進(jìn)一步加大.
Table 3.1. CacheMode.CHANNEL的緩存屬性
Property | Meaning |
---|
channelCacheSize | 當(dāng)前配置的允許空閑的最大通道數(shù)量. |
localPort | 連接的本地端口(如果可用的話). 在可以在RabbitMQ 管理界面中關(guān)聯(lián) connections/channels. |
idleChannelsTx | 當(dāng)前空閑(緩存的)的事務(wù)通道的數(shù)目. |
idleChannelsNotTx | 當(dāng)前空閑(緩存的)的非事務(wù)通道的數(shù)目. |
idleChannelsTxHighWater | 同時空閑(緩存的)的事務(wù)通道的最大數(shù)目 |
idleChannelsNotTxHighWater | 同時空閑(緩存的)的非事務(wù)通道的最大數(shù)目. |
Table 3.2. CacheMode.CONNECTION的緩存屬性
Property | Meaning |
---|
openConnections | 表示連接到brokers上連接對象的數(shù)目. |
channelCacheSize | 當(dāng)前允許空閑的最大通道數(shù)目 |
connectionCacheSize | 當(dāng)前允許空閑的最大連接數(shù)目. |
idleConnections | 當(dāng)前空閑的連接數(shù)目. |
idleConnectionsHighWater | 目前已經(jīng)空閑的最大連接數(shù)目. |
idleChannelsTx:<localPort> | 在當(dāng)前連接上目前空閑的事務(wù)通道的數(shù)目. 屬性名的localPort部分可用來在RabbitMQ 管理界面中關(guān)聯(lián)connections/channels. |
idleChannelsNotTx:<localPort> | 在當(dāng)前連接上目前空閑和非事務(wù)通道的數(shù)目.屬性名的localPort部分可用來在RabbitMQ管理界面中關(guān)聯(lián)connections/channels |
idleChannelsTxHighWater:
<localPort> | 已同時空閑的事務(wù)通道的最大數(shù)目. 屬性名的 localPort部分可用來在RabbitMQ管理界面中關(guān)聯(lián)connections/channels. |
idleChannelsNotTxHighWater:
<localPort> | 憶同時空閑的非事務(wù)通道的最大數(shù)目.屬性名的localPort部分可用來RabbitMQ管理界面中關(guān)聯(lián)connections/channels. |
cacheMode
屬性 (包含CHANNEL
或 CONNECTION
).
Figure 3.1. JVisualVM Example

3.1.3 添加自定義Client 連接屬性
CachingConnectionFactory
現(xiàn)在允許你訪問底層連接工廠,例如, 設(shè)置自定義client 屬性:
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");
當(dāng)在RabbitMQ管理界面中查看連接時,將會看到這些屬性.
3.1.4 AmqpTemplate
介紹
像其它Spring Framework提供的高級抽象一樣, Spring AMQP 提供了扮演核心角色的模板. 定義了主要操作的接口稱為AmqpTemplate
. 這些操作包含了發(fā)送和接收消息的一般行為.換句話說,它們不是針對某個特定實(shí)現(xiàn)的,從其名稱"AMQP"就可看出.另一方面,接口的實(shí)現(xiàn)會盡量作為AMQP協(xié)議的實(shí)現(xiàn).不像JMS,它只是接口級別的API實(shí)現(xiàn), AMQP是一個線路級協(xié)議.協(xié)議的實(shí)現(xiàn)可提供它們自己的client libraries, 因此模板接口的實(shí)現(xiàn)都依賴特定的client library.目前,只有一個實(shí)現(xiàn):RabbitTemplate
. 在下面的例子中,你會經(jīng)??吹?AmqpTemplate",但當(dāng)你查看配置例子或者任何實(shí)例化或調(diào)用setter方法的代碼時,你都會看到實(shí)現(xiàn)類型(如."RabbitTemplate").
正如上面所提到的, AmqpTemplate
接口定義了所有發(fā)送和接收消息的基本操作. 我們將分別在以下兩個部分探索消息發(fā)送和接收。
也可參考the section called “AsyncRabbitTemplate”.
從1.3版本開始, 你可為RabbitTemplate
配置使用 RetryTemplate
來幫助處理broker連接的問題. 參考spring-retry 項(xiàng)目來了解全部信息;下面就是一個例子,它使用指數(shù)回退策略(exponential back off policy)和默認(rèn)的 SimpleRetryPolicy
(向調(diào)用者拋出異常前,會做三次嘗試).
使用XML命名空間:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval"value="10000" />
</bean>
</property>
</bean>
使用 @Configuration
:
@Bean
public AmqpTemplate rabbitTemplate();
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
從1.4版本開始,除了retryTemplate
屬性外,RabbitTemplate 上也支持recoveryCallback
選項(xiàng). 它可用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T>recoveryCallback)第二個參數(shù)
.
RecoveryCallback
會有一些限制,因?yàn)樵趓etry context只包含lastThrowable
字段.在更復(fù)雜的情況下,你應(yīng)該使用外部RetryTemplate,這樣你就可以通過上下文屬性傳遞更多信息給
RecoveryCallback
.
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Overridepublic Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在這種情況下,你不需要在RabbitTemplate中注入RetryTemplate
.
發(fā)布者確認(rèn)和返回
AmqpTemplate的RabbitTemplate
實(shí)現(xiàn)支持發(fā)布者確認(rèn)和返回.
對于返回消息,模板的 mandatory
屬性必須設(shè)置為true
, 或者對于特定消息,其 mandatory-expression
必須評估為true
.
此功能需要將CachingConnectionFactory
的publisherReturns
屬性設(shè)置為true (參考 the section called “Publisher Confirms and Returns”).
返回是通過注冊在RabbitTemplate.ReturnCallback(通過調(diào)用setReturnCallback(ReturnCallback callback))來返回給客戶端的
. 回調(diào)必須實(shí)現(xiàn)下面的方法:
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
每個RabbitTemplate只支持一個ReturnCallback
.也可參考the section called “Reply Timeout”.
對于發(fā)布者確認(rèn)(又名發(fā)布者應(yīng)答), 模板需要將 CachingConnectionFactory
中的publisherConfirms
屬性設(shè)置為true.
確認(rèn)是通過注冊在RabbitTemplate.ConfirmCallback(通過調(diào)用setConfirmCallback(ConfirmCallback callback))
發(fā)送給client的. 回調(diào)必須實(shí)現(xiàn)下面的方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
對象是在發(fā)送原始消息的時候,由client提供的. ack
為true 表示確認(rèn),為false時,表示不確認(rèn)(nack). 對于nack
, cause可能會包含nack的原因(如果生成nack時,它可用的話).
一個例子是當(dāng)發(fā)送消息到一個不存在的交換器時.在那種情況下,broker會關(guān)閉通道; 關(guān)閉的原因會包含在cause中
. cause
是1.4版本中加入的.
RabbitTemplate中只支持一個ConfirmCallback
.
當(dāng)rabbit模板完成發(fā)送操作時,會關(guān)閉通道; 這可以排除當(dāng)連接工廠緩存滿時(緩存中還有空間,通道沒有物理關(guān)閉,返回/確認(rèn)正常處理)確認(rèn)和返回的接待問題.
當(dāng)緩存滿了的時候, 框架會延遲5秒來關(guān)閉,以為接收確認(rèn)/返回消息留有時間.當(dāng)使用確認(rèn)時,通道會在收到最后一個確認(rèn)時關(guān)閉.
當(dāng)使用返回時,通道會保持5秒的打開狀態(tài).一般建議將連接工廠的channelCacheSize
設(shè)為足夠大,這樣發(fā)布消息的通道就會返回到緩存中,而不是被關(guān)閉.
你可以使用RabbitMQ管理插件來監(jiān)控通道的使用情況;如果你看到通道打開/關(guān)閉的非常迅速,那么你必須考慮加大緩存,從而減少服務(wù)器的開銷.
Messaging 集成
從1.4版本開始, 構(gòu)建于RabbitTemplate上的RabbitMessagingTemplate提供了與
Spring Framework消息抽象的集成(如.org.springframework.messaging.Message)
.
This allows you to create the message to send in generic manner.
驗(yàn)證 User Id
從1.6版本開始,模板支持user-id-expression
(當(dāng)使用Java配置時,為userIdExpression
). 如果發(fā)送消息,user id屬性的值將在評估表達(dá)式后進(jìn)行設(shè)置.評價(jià)的根對象是要發(fā)送的消息。
例子:
<rabbit:template...user-id-expression="'guest'" />
<rabbit:template...user-id-expression="@myConnectionFactory.username" />
第一個示例是一個文本表達(dá)式;第二個例子將獲取上下文中連接工廠bean的username
屬性.
3.1.5 發(fā)送消息
介紹
當(dāng)發(fā)送消息時,可使用下面的任何一種方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我們將使用上面列出的最后一個方法來討論,因?yàn)樗鼘?shí)際是最清晰的.它允許在運(yùn)行時提供一個AMQP Exchange 名稱和路由鍵(routing key).最后一個參數(shù)是負(fù)責(zé)初建創(chuàng)建Message實(shí)例的回調(diào).使用此方法來發(fā)送消息的示例如下:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
new Message("12.34".getBytes(), someProperties));
如果你打算使用模板實(shí)例來多次(或多次)向同一個交換器發(fā)送消息時,"exchange" 可設(shè)置在模板自已身上.在這種情況中,可以使用上面列出的第二個方法. 下面的例子在功能上等價(jià)于前面那個:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果在模塊上設(shè)置"exchange"和"routingKey"屬性,那么方法就只接受Message
參數(shù):
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
關(guān)于交換器和路由鍵更好的想法是明確的參數(shù)將總是會覆蓋模板默認(rèn)值.事實(shí)上, 即使你不在模板上明確設(shè)置這些屬性, 總是有默認(rèn)值的地方. 在兩種情況中,默認(rèn)值是空字符串,這是合情合理的.
就路由鍵而言,它并不總是首先需要的 (如. Fanout 交換器). 此外,綁定的交換器上的隊(duì)列可能會使用空字符串. 這些在模板的路由鍵中都是合法的.
就交換器名稱而言,空字符串也是常常使用的,因?yàn)锳MQP規(guī)范定義了無名稱的"默認(rèn)交換器".
由于所有隊(duì)列可使用它們的隊(duì)列名稱作為路由鍵自動綁定到默認(rèn)交換器上(它是Direct交換器e) ,上面的第二個方法可通過默認(rèn)的交換器將簡單的點(diǎn)對點(diǎn)消息傳遞到任何隊(duì)列.
只需要簡單的將隊(duì)列名稱作為路由鍵-在運(yùn)行時提供方法參數(shù):
RabbitTemplate template = new RabbitTemplate(); // 使用默認(rèn)的無名交換器
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,如果你喜歡創(chuàng)建一個模板用于主要或?qū)iT向一個隊(duì)列發(fā)送消息, 以下是完全合理的:
RabbitTemplate template = new RabbitTemplate(); // 使用默認(rèn)無名交換器
template.setRoutingKey("queue.helloWorld"); // 但我們總是向此隊(duì)列發(fā)送消息
template.send(new Message("Hello World".getBytes(), someProperties));
Message Builder API
從1.3版本開始,通過 MessageBuilder
和 MessagePropertiesBuilder提供了消息構(gòu)建API
; 它們提供了更加方便地創(chuàng)建消息和消息屬性的方法:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
或
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
每個MessageProperies上定義的屬性都可以被設(shè)置. 其它方法包括setHeader(String key, String value)
,removeHeader(String key)
, removeHeaders()
, 和copyProperties(MessageProperties properties)
.
每個屬性方法都有一個set*IfAbsent()
變種. 在默認(rèn)的初始值存在的情況下, 方法名為set*IfAbsentOrDefault()
.
提供了五個靜態(tài)方法來創(chuàng)建初始message builder:
public static MessageBuilder withBody(byte[] body) 
public static MessageBuilder withClonedBody(byte[] body) 
public static MessageBuilder withBody(byte[] body, int from, int to) 
public static MessageBuilder fromMessage(Message message) 
public static MessageBuilder fromClonedMessage(Message message) 

| builder創(chuàng)建的消息body是參數(shù)的直接引用. |

| builder創(chuàng)建的消息body是包含拷貝原字節(jié)數(shù)組的新數(shù)組. |

| build創(chuàng)建的消息body是包含原字節(jié)數(shù)組范圍的新數(shù)組.查看Arrays.copyOfRange() 來了解更多信息. |

| builder創(chuàng)建的消息body是原body參數(shù)的直接引用. 參數(shù)的屬性將拷貝到新MessageProperties對象中 . |

| builer創(chuàng)建的消息body包含參數(shù)body的新數(shù)組.參數(shù)的屬性將拷貝到新的MessageProperties 對象中. |
public static MessagePropertiesBuilder newInstance() 
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 

| 新消息屬性將使用默認(rèn)值進(jìn)行初始化 |

| builder會使用提供的properties對象進(jìn)行初始化,build() 方法也會返回參數(shù)properties對象. |

| 參數(shù)的屬性會拷貝到新的MessageProperties對象中 . |
在AmqpTemplate的RabbitTemplate
實(shí)現(xiàn)中, 每個send()
方法的重載版本都接受一個額外的CorrelationData
對象.
當(dāng)啟用了發(fā)布者確認(rèn)時,此對象會在3.1.4, “AmqpTemplate”的回調(diào)中返回.這允許發(fā)送者使用確認(rèn)(ack或nack)來關(guān)聯(lián)發(fā)送的消息.
發(fā)布者返回
當(dāng)模板的mandatory
屬性為true時,返回消息將由 Section 3.1.4, “AmqpTemplate”描述的回調(diào)來返回.
從1.4版本開始,RabbitTemplate
支持 SpEL mandatoryExpression
屬性,它將對每個請求消息進(jìn)行評估,作為根評估對象來解析成布爾值. Bean引用,如"@myBean.isMandatory(#root)"
可用在此表達(dá)式中.
發(fā)布者返回內(nèi)部也可用于RabbitTemplate
的發(fā)送和接收操作中. 參考the section called “Reply Timeout” 來了解更多信息.
批量
從1.4.2版本開始,引入了BatchingRabbitTemplate
.它是RabbitTemplate
的子類,覆蓋了send
方法,此方法可根據(jù)BatchingStrategy來批量發(fā)送消息
; 只有當(dāng)一個批次完成時才會向RabbitMQ發(fā)送消息。
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
警告
成批的數(shù)據(jù)是保存在內(nèi)存中的,如果出現(xiàn)系統(tǒng)故障,未發(fā)送的消息將會丟失.
這里提供了一個 SimpleBatchingStrategy
.它支持將消息發(fā)送到單個 exchange/routing key.它有下面的屬性:
batchSize
- 發(fā)送前一個批次中消息的數(shù)量bufferLimit
- 批量消息的最大大小;如果超過了此值,它會取代batchSize
, 并導(dǎo)致要發(fā)送的部分批處理timeout
- 當(dāng)沒有新的活動添加到消息批處理時之后,將發(fā)送部分批處理的時間(a time after which a partial batch will be sent when there is no new activity adding messages to the batch)
SimpleBatchingStrategy
通過在每個消息的前面嵌入4字節(jié)二進(jìn)制長度來格式化批次消息. 這是通過設(shè)置springBatchFormat消息屬性為lengthHeader4向接收系統(tǒng)傳達(dá)的.
重要
批量消息自動由監(jiān)聽器容器來分批(de-batched)(使用springBatchFormat
消息頭).拒絕批量消息中的任何一個會將導(dǎo)致拒絕整個批次消息.
3.1.6 接收消息
介紹
Message 接收總是比發(fā)送稍顯復(fù)雜.有兩種方式來接收Message
. 最簡單的選擇是在輪詢方法調(diào)用中一次只接收一個消息. 更復(fù)雜的更常見的方法是注冊一個偵聽器,按需異步的接收消息。
在下面兩個子章節(jié)中,我們將看到這兩種方法的示例.
Polling Consumer
AmqpTemplate
自身可用來輪詢消息接收.默認(rèn)情況下,如果沒有可用消息,將會立即返回 null
;它是無阻塞的.
從1.5版本開始,你可以設(shè)置receiveTimeout
,以毫秒為單位, receive方法會阻塞設(shè)定的時間來等待消息.小于0的值則意味著無限期阻塞 (或者至少要等到與broker的連接丟失).
1.6版本引入了receive
方法的變種,以允許在每個調(diào)用上都可設(shè)置超時時間.
警告
由于接收操作會為每個消息創(chuàng)建一個新的QueueingConsumer
,這種技術(shù)并不適用于大容量環(huán)境,可考慮使用異步消費(fèi)者,或?qū)?/span>receiveTimeout
設(shè)為0來應(yīng)對這種情況.
這里有四個簡單可用的receive 方法.同發(fā)送方的交換器一樣, 有一種方法需要直接在模板本身上設(shè)置的默認(rèn)隊(duì)列屬性, 還有一種方法需要在運(yùn)行接受隊(duì)列參數(shù).
版本1.6 引入了接受timeoutMillis
的變種,基于每個請求重寫了receiveTimeout
方法.
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
與發(fā)送消息的情況類似, AmqpTemplate
有一些便利的方法來接收POJOs 而非Message
實(shí)例, 其實(shí)現(xiàn)可提供一種方法來定制MessageConverter
以用于創(chuàng)建返回的Object
:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Message receiveAndConvert(long timeoutMillis) throws AmqpException;
Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
類似于sendAndReceive
方法,從1.3版本開始, AmqpTemplate
有多個便利的receiveAndReply
方法同步接收,處理,以及回應(yīng)消息:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
實(shí)現(xiàn)會負(fù)責(zé)receive 和 reply 階段.在大多數(shù)情況下,如果有必要,你只需要提供ReceiveAndReplyCallback
的實(shí)現(xiàn)來為收到的消息執(zhí)行某些業(yè)務(wù)邏輯或?yàn)槭盏降南?gòu)建回應(yīng)對象.
注意,ReceiveAndReplyCallback
可能返回null
. 在這種情況下,將不會發(fā)送回應(yīng),receiveAndReply
的工作類似于receive
方法. 這允許相同的隊(duì)列用于消息的混合物,其中一些可能不需要答復(fù)。
自動消息(請求和應(yīng)答)轉(zhuǎn)換只能適應(yīng)于提供的回調(diào)不是ReceiveAndReplyMessageCallback 實(shí)例的情況下- 它提供了一個原始的消息交換合同。
ReplyToAddressCallback
只在這種情況中有用,需要根據(jù)收到的信息通過自定義邏輯來決定replyTo
地址,并在ReceiveAndReplyCallback中進(jìn)行回應(yīng)的情況
. 默認(rèn)情況下,請求消息中的 replyTo
信息用來路由回復(fù).
下面是一個基于POJO的接收和回復(fù)…
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}
異步消費(fèi)者
消息監(jiān)聽器
對于異步消息接收, 會涉及到一個專用組件(不是AmqpTemplate
).此組件可作為消息消費(fèi)回調(diào)的容器.
稍后,我們會講解這個容器和它的屬性,但首先讓我們來看一下回調(diào),因?yàn)檫@里是你的應(yīng)用程序代碼與消息系統(tǒng)集成的地方. MessageListener
接口:
public interface MessageListener {
void onMessage(Message message);
}
如果出于任何理由,你的回調(diào)邏輯需要依賴于AMQP Channel實(shí)例,那么你可以使用ChannelAwareMessageListener
. 它看起來是很相似的,但多了一個額外的參數(shù):
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
MessageListenerAdapter
如果您希望在應(yīng)用程序邏輯和消息API之間保持嚴(yán)格的分離,則可以依賴于框架所提供的適配器實(shí)現(xiàn)。
這是通常被稱為“消息驅(qū)動的POJO”支持。當(dāng)使用適配器時,只需要提供一個適配器本身應(yīng)該調(diào)用的實(shí)例引用即可。
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
你也可以繼承適配器,并實(shí)現(xiàn)getListenerMethodName()
方法(基于消息來動態(tài)選擇不同的方法). 這個方法有兩個參數(shù):originalMessage
和extractedMessage
, 后者是轉(zhuǎn)換后的結(jié)果.默認(rèn)情況下,需要配置SimpleMessageConverter
;
參考the section called “SimpleMessageConverter” 來了解更多信息以及其它轉(zhuǎn)換器的信息.
從1.4.2開始,原始消息包含consumerQueue
和 consumerTag
屬性,這些屬性可用來確定消息是從那個隊(duì)列中收到的.
從1.5版本開始,你可以配置消費(fèi)者queue/tag到方法名稱的映射(map)以動態(tài)選擇要調(diào)用的方法.如果map中無條目,我們將退回到默認(rèn)監(jiān)聽器方法.
容器
你已經(jīng)看過了消息監(jiān)聽回調(diào)上的各種各樣的選項(xiàng),現(xiàn)在我們將注意力轉(zhuǎn)向容器. 基本上,容器處理主動(active)的職責(zé),這樣監(jiān)聽器回調(diào)可以保持被動(passive). 容器是“生命周期”組件的一個例子。
它提供了啟動和停止的方法.當(dāng)配置容器時,你本質(zhì)上縮短了AMQP Queue和 MessageListener
實(shí)例之間的距離.你必須提供一個ConnectionFactory
的引用,隊(duì)列名稱或隊(duì)列實(shí)例.
下面是使用默認(rèn)實(shí)現(xiàn)SimpleMessageListenerContainer
的最基礎(chǔ)的例子:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作為一個主動組件, 最常見的是使用bean定義來創(chuàng)建監(jiān)聽器容器,這樣它就可以簡單地運(yùn)行于后臺.這可以通過XML來完成:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
或者你可以@Configuration 風(fēng)格:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
returnnew MessageListener() {
publicvoid onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
從RabbitMQ Version 3.2開始, broker支持消費(fèi)者優(yōu)先級了(參考 Using Consumer Priorities with RabbitMQ).
這可以通過在消費(fèi)者設(shè)置x-priority
參數(shù)來啟用.
SimpleMessageListenerContainer
現(xiàn)在支持設(shè)置消費(fèi)者參數(shù):
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
為了方便,命名空間在listener元素上提供了priority
屬性:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
從1.3版本開始,容器監(jiān)聽的隊(duì)列可在運(yùn)行時進(jìn)行修改,參考 Section 3.1.18, “Listener Container Queues”.
auto-delete 隊(duì)列
當(dāng)容器配置為監(jiān)聽auto-delete
隊(duì)列或隊(duì)列有x-expires
選項(xiàng)或者broker配置了Time-To-Live 策略,隊(duì)列將在容器停止時(最后的消費(fèi)者退出時)由broker進(jìn)行刪除.
在1.3版本之前,容器會因隊(duì)列缺失而不能重啟; 當(dāng)連接關(guān)閉/打開時,RabbitAdmin
只能自動重新聲明隊(duì)列.
從1.3版本開始, 在啟動時,容器會使用RabbitAdmin
來重新聲明缺失的隊(duì)列.
您也可以使用條件聲明(the section called “Conditional Declaration”) 與auto-startup="false"
來管理隊(duì)列的延遲聲明,直到容器啟動.
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo"queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
在這種情況下,隊(duì)列和交換器是由 containerAdmin
來聲明的,auto-startup="false"
因此在上下文初始化期間不會聲明元素.同樣,出于同樣原因,容器也不會啟動.當(dāng)容器隨后啟動時,它會使用containerAdmin引用來聲明元素
.
批量消息
批量消息會自動地通過監(jiān)聽器容器 (使用springBatchFormat
消息頭)來解批(de-batched). 拒絕批量消息中的任何一個都將導(dǎo)致整批消息被拒絕. 參考the section called “Batching” 來了解更多關(guān)于批量消息的詳情.
消費(fèi)者失敗事件
從1.5版本開始,無論時候,當(dāng)監(jiān)聽器(消費(fèi)者)經(jīng)歷某種失敗時,SimpleMessageListenerContainer
會發(fā)布應(yīng)用程序事件. 事件ListenerContainerConsumerFailedEvent
有下面的屬性:
container
- 消費(fèi)者經(jīng)歷問題的監(jiān)聽容器.reason
- 失敗的文本原因。fatal
- 一個表示失敗是否是致命的boolean值;對于非致命異常,容器會根據(jù)retryInterval值嘗試重新啟動消費(fèi)者.throwable
-捕捉到的Throwable
.
這些事件能通過實(shí)現(xiàn)ApplicationListener<ListenerContainerConsumerFailedEvent>來消費(fèi)
.
當(dāng) concurrentConsumers
大于1時,系統(tǒng)級事件(如連接失敗)將發(fā)布到所有消費(fèi)者.
如果消費(fèi)者因隊(duì)列是專有使用而失敗了,默認(rèn)情況下,在發(fā)布事件的時候,也會發(fā)出WARN
日志. 要改變?nèi)罩拘袨?需要在SimpleMessageListenerContainer的exclusiveConsumerExceptionLogger屬性中提供自定義的ConditionalExceptionLogger
.
也可參考the section called “Logging Channel Close Events”.
致命錯誤都記錄在ERROR級別中,這是不可修改的。
posted on 2016-08-13 12:38
胡小軍 閱讀(6251)
評論(0) 編輯 收藏 所屬分類:
RabbitMQ