2.6.7 Wildcards
??? Wildcards用來支持聯合的名字分層體系(federated name hierarchies)。它不是JMS規范的一部分,而是ActiveMQ的擴展。ActiveMQ支持以下三種wildcards:
- "." 用于作為路徑上名字間的分隔符。
- "*" 用于匹配路徑上的任何名字。
- ">" 用于遞歸地匹配任何以這個名字開始的destination。
?? 作為一種組織事件和訂閱感興趣那部分信息的一種方法,這個概念在金融市場領域已經流行了一段時間了。設想你有以下兩個destination:
- PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股價)
- PRICE.STOCK.NYSE.SUNW (SUN在紐約證券交易所的股價)
?? 訂閱者可以明確地指定destination的名字來訂閱消息,或者它也可以使用wildcards來定義一個分層的模式來匹配它希望訂閱的destination。例如:
Subscription
|
Meaning
|
PRICE.> |
Any price for any product on any exchange |
PRICE.STOCK.> |
Any price for a stock on any exchange |
PRICE.STOCK.NASDAQ.* |
Any stock price on NASDAQ |
PRICE.STOCK.*.IBM |
Any IBM stock price on any exchange |
?
2.6.8 Async Sends
??? ActiveMQ支持以同步(sync)方式或者異步(async)方式向broker發送消息。 使用何種方式對send方法的延遲有巨大的影響。對于生產者來說,既然延遲是決定吞吐量的重要因素,那么使用異步發送方式會極大地提高系統的性能。
???
ActiveMQ缺省使用異步傳輸方式。但是按照JMS規范,當在事務外發送持久化消息的時候,ActiveMQ會強制使用同步發送方式。在這種情況下,
每一次發送都是同步的,而且阻塞到收到broker的應答。這個應答保證了broker已經成功地將消息持久化,而且不會丟失。但是這樣作也嚴重地影響了
性能。
??? 如果你的系統可以容忍少量的消息丟失,那么可以在事務外發送持久消息的時候,選擇使用異步方式。以下是幾種不同的配置方式:
-
cf?=?
new
?ActiveMQConnectionFactory(
"tcp://locahost:61616?jms.useAsyncSend=true"
);??
-
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);??
-
((ActiveMQConnection)connection).setUseAsyncSend(true);??
?
2.6.9 Dispatch Policies
2.6.9.1 Round Robin Dispatch Policy
???
在2.6.4小節介紹過ActiveMQ的prefetch機制,ActiveMQ的缺省參數是針對處理大量消息時的高性能和高吞吐量而設置的。所以缺省
的prefetch參數比較大,而且缺省的dispatch
policies會嘗試盡可能快的填滿prefetch緩沖。然而在有些情況下,例如只有少量的消息而且單個消息的處理時間比較長,那么在缺省的
prefetch和dispatch
policies下,這些少量的消息總是傾向于被分發到個別的consumer上。這樣就會因為負載的不均衡分配而導致處理時間的增加。
??? Round robin dispatch policy會嘗試平均分發消息,以下是ActiveMQ配置文件的一個例子:
-
<
destinationPolicy
>
??
-
??<policyMap>??
-
????<policyEntries>??
-
??????<policyEntry?topic="FOO.>">??
-
????????<dispatchPolicy>??
-
??????????<roundRobinDispatchPolicy?/>??
-
????????</dispatchPolicy>??
-
??????</policyEntry>??
-
????</policyEntries>??
-
??</policyMap>??
-
</
destinationPolicy
>
??
?
2.6.9.2 Strict Order Dispatch Policy
???
有時候需要保證不同的topic consumer以相同的順序接收消息。通常ActiveMQ會保證topic
consumer以相同的順序接收來自同一個producer的消息。然而,由于多線程和異步處理,不同的topic
consumer可能會以不同的順序接收來自不同producer的消息。例如有兩個producer,分別是P和Q。差不多是同一時間內,P發送了
P1、P2和P3三個消息;Q發送了Q1和Q2兩個消息。兩個不同的consumer可能會以以下順序接收到消息:
?? consumer1: P1 P2 Q1 P3 Q2
??? consumer2: P1 Q1 Q2 P2 P3
???
Strict order dispatch policy 會保證每個topic
consumer會以相同的順序接收消息,代價是性能上的損失。以下是采用了strict order dispatch
policy后,兩個不同的consumer可能以以下的順序接收消息:
??? consumer1: P1 P2 Q1 P3 Q2
??? consumer2: P1 P2 Q1 P3 Q2
?? 以下是ActiveMQ配置文件的一個例子:
-
<
destinationPolicy
>
??
-
??<policyMap>??
-
????<policyEntries>??
-
??????<policyEntry?topic=""FOO.>">??
-
????????<dispatchPolicy>??
-
??????????<strictOrderDispatchPolicy?/>??
-
????????</dispatchPolicy>??
-
??????</policyEntry>??
-
????</policyEntries>??
-
??</policyMap>??
-
</
destinationPolicy
>
??
?
2.6.10 Message Cursors
???
當producer發送的持久化消息到達broker之后,broker首先會把它保存在持久存儲中。接下來,如果發現當前有活躍的consumer,而
且這個consumer消費消息的速度能跟上producer生產消息的速度,那么ActiveMQ會直接把消息傳遞給broker內部跟這個
consumer關聯的dispatch
queue;如果當前沒有活躍的consumer或者consumer消費消息的速度跟不上producer生產消息的速度,那么ActiveMQ會使用
Pending Message Cursors保存對消息的引用。在需要的時候,Pending Message
Cursors把消息引用傳遞給broker內部跟這個consumer關聯的dispatch queue。以下是兩種Pending Message
Cursors:
- VM Cursor。在內存中保存消息的引用。
- File Cursor。首先在內存中保存消息的引用,如果內存使用量達到上限,那么會把消息引用保存到臨時文件中。
?? 在缺省情況下,ActiveMQ 5.0根據使用的Message Store來決定使用何種類型的Message Cursors,但是你可以根據destination來配置Message Cursors。
???
對于topic,可以使用的pendingSubscriberPolicy
有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有
vmDurableCursor 和 fileDurableSubscriberCursor。以下是ActiveMQ配置文件的一個例子:
- <destinationPolicy>??
- ??<policyMap>??
- ????<policyEntries>??
- ??????<policyEntry?topic="org.apache.>">??
- ????????<pendingSubscriberPolicy>??
- ??????????<vmCursor?/>??
- ????????</pendingSubscriberPolicy>??
- ????????<PendingDurableSubscriberMessageStoragePolicy>??
- ??????????<vmDurableCursor/>??
- ????????</PendingDurableSubscriberMessageStoragePolicy>??
- ??????</policyEntry>??
- ????</policyEntries>??
- ??</policyMap>??
- </destinationPolicy>??
?? 對于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。以下是ActiveMQ配置文件的一個例子:
- <destinationPolicy>??
- ??<policyMap>??
- ????<policyEntries>??
- ??????<policyEntry?queue="org.apache.>">??
- ????????<pendingQueuePolicy>??
- ??????????<vmQueueCursor?/>??
- ????????</pendingQueuePolicy>??
- ??????</policyEntry>??
- ????</policyEntries>??
- ??</policyMap>??
- </destinationPolicy>??
?
2.6.11 Optimized Acknowledgement
??? ActiveMQ缺省支持批量確認消息。由于批量確認會提高性能,因此這是缺省的確認方式。如果希望在應用程序中禁止經過優化的確認方式,那么可以采用如下方法:
- cf?=?new?ActiveMQConnectionFactory?("tcp://locahost:61616?jms.optimizeAcknowledge=false");??
- ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(false);??
- ((ActiveMQConnection)connection).setOptimizeAcknowledge(false);??
?
2.6.12 Producer Flow Control
???
同步發送消息的producer會自動使用producer flow control
;對于異步發送消息的producer,要使用producer flow
control,你先要為connection配置一個ProducerWindowSize參數,如下:
- ((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);??
??? ProducerWindowSize是producer在發送消息的過程中,收到broker對于之前發送消息的確認之前, 能夠發送消息的最大字節數。你也可以禁用producer flow control,以下是ActiveMQ配置文件的一個例子:
- <destinationPolicy>??
- ??<policyMap>??
- ????<policyEntries>??
- ??????<policyEntry?topic="FOO.>"?producerFlowControl="false">??
- ????????<dispatchPolicy>??
- ??????????<strictOrderDispatchPolicy/>??
- ????????</dispatchPolicy>??
- ??????</policyEntry>??
- ????</policyEntries>??
- ??</policyMap>??
- </destinationPolicy>??
?
2.6.13 Message Transformation
??? 有時候需要在JMS provider內部進行message的轉換。從4.2版本起,ActiveMQ 提供了一個MessageTransformer 接口用于進行消息轉換,如下:
- public?interface?MessageTransformer?{??
- ????Message?producerTransform(Session?session,?MessageProducer?producer,?Message?message)?throws?JMSException;??
- ????Message?consumerTransform(Session?session,?MessageConsumer?consumer,?Message?message)throws?JMSException;??
- }??
??? 通過在以下對象上通過調用setTransformer方法來設置MessageTransformer:
- ActiveMQConnectionFactory
- ActiveMQConnection
- ActiveMQSession
- ActiveMQMessageConsumer
- ActiveMQMessageProducer
?? MessageTransformer接口支持:
- 在消息被發送到JMS provider的消息總線前進行轉換。通過producerTransform方法。
- 在消息到達消息總線后,但是在consumer接收到消息前進行轉換。通過consumerTransform方法。
?? 以下是個簡單的例子:??
?
- public?class?SimpleMessage?implements?Serializable?{??
- ??????
- ????private?static?final?long?serialVersionUID?=?2251041841871975105L;??
- ??????
- ??????
- ????private?String?id;??
- ????private?String?text;??
- ??????
- ????public?String?getId()?{??
- ????????return?id;??
- ????}??
- ????public?void?setId(String?id)?{??
- ????????this.id?=?id;??
- ????}??
- ????public?String?getText()?{??
- ????????return?text;??
- ????}??
- ????public?void?setText(String?text)?{??
- ????????this.text?=?text;??
- ????}??
- }??
??? 在producer內發送ObjectMessage,如下:
- SimpleMessage?sm?=?new?SimpleMessage();??
- sm.setId("1");??
- sm.setText("this?is?a?sample?message");??
- ObjectMessage?message?=?session.createObjectMessage();??
- message.setObject(sm);??
- producer.send(message);??
?? 在consumer的session上設置一個MessageTransformer用于將ObjectMessage轉換成TextMessage,如下:
- ((ActiveMQSession)session).setTransformer(new?MessageTransformer()?{??
- public?Message?consumerTransform(Session?session,?MessageConsumer?consumer,?Message?message)?throws?JMSException?{??
- ????ObjectMessage?om?=?(ObjectMessage)message;??
- ????XStream?xstream?=?new?XStream();??
- ????xstream.alias("simple?message",?SimpleMessage.class);??
- ????String?xml?=?xstream.toXML(om.getObject());??
- ????return?session.createTextMessage(xml);??
- }??
- ??
- public?Message?producerTransform(Session?session,?MessageProducer?consumer,?Message?message)?throws?JMSException?{??
- ????return?null;??
- }??
- });