<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Terry.Li-彬

    虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業。

      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks

    2.6.7 Wildcards
    ??? Wildcards用來支持聯合的名字分層體系(federated name hierarchies)。它不是JMS規范的一部分,而是ActiveMQ的擴展。ActiveMQ支持以下三種wildcards:

    • "." 用于作為路徑上名字間的分隔符。
    • "*" 用于匹配路徑上的任何名字。
    • ">" 用于遞歸地匹配任何以這個名字開始的destination。

    ?? 作為一種組織事件和訂閱感興趣那部分信息的一種方法,這個概念在金融市場領域已經流行了一段時間了。設想你有以下兩個destination:

    • PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股價)
    • PRICE.STOCK.NYSE.SUNW (SUN在紐約證券交易所的股價)

    ?? 訂閱者可以明確地指定destination的名字來訂閱消息,或者它也可以使用wildcards來定義一個分層的模式來匹配它希望訂閱的destination。例如:

    Subscription Meaning
    PRICE.> Any price for any product on any exchange
    PRICE.STOCK.> Any price for a stock on any exchange
    PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
    PRICE.STOCK.*.IBM Any IBM stock price on any exchange

    ?

    2.6.8 Async Sends
    ??? ActiveMQ支持以同步(sync)方式或者異步(async)方式向broker發送消息。 使用何種方式對send方法的延遲有巨大的影響。對于生產者來說,既然延遲是決定吞吐量的重要因素,那么使用異步發送方式會極大地提高系統的性能。
    ??? ActiveMQ缺省使用異步傳輸方式。但是按照JMS規范,當在事務外發送持久化消息的時候,ActiveMQ會強制使用同步發送方式。在這種情況下, 每一次發送都是同步的,而且阻塞到收到broker的應答。這個應答保證了broker已經成功地將消息持久化,而且不會丟失。但是這樣作也嚴重地影響了 性能。
    ??? 如果你的系統可以容忍少量的消息丟失,那么可以在事務外發送持久消息的時候,選擇使用異步方式。以下是幾種不同的配置方式:

    Java代碼
    1. cf?=? new ?ActiveMQConnectionFactory( "tcp://locahost:61616?jms.useAsyncSend=true" );??
    2. ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);??
    3. ((ActiveMQConnection)connection).setUseAsyncSend(true);??

    ?

    2.6.9 Dispatch Policies
    2.6.9.1 Round Robin Dispatch Policy
    ??? 在2.6.4小節介紹過ActiveMQ的prefetch機制,ActiveMQ的缺省參數是針對處理大量消息時的高性能和高吞吐量而設置的。所以缺省 的prefetch參數比較大,而且缺省的dispatch policies會嘗試盡可能快的填滿prefetch緩沖。然而在有些情況下,例如只有少量的消息而且單個消息的處理時間比較長,那么在缺省的 prefetch和dispatch policies下,這些少量的消息總是傾向于被分發到個別的consumer上。這樣就會因為負載的不均衡分配而導致處理時間的增加。
    ??? Round robin dispatch policy會嘗試平均分發消息,以下是ActiveMQ配置文件的一個例子:

    Xml代碼
    1. < destinationPolicy > ??
    2. ??<policyMap>??
    3. ????<policyEntries>??
    4. ??????<policyEntry?topic="FOO.>">??
    5. ????????<dispatchPolicy>??
    6. ??????????<roundRobinDispatchPolicy?/>??
    7. ????????</dispatchPolicy>??
    8. ??????</policyEntry>??
    9. ????</policyEntries>??
    10. ??</policyMap>??
    11. </ destinationPolicy > ??

    ?

    2.6.9.2 Strict Order Dispatch Policy
    ??? 有時候需要保證不同的topic consumer以相同的順序接收消息。通常ActiveMQ會保證topic consumer以相同的順序接收來自同一個producer的消息。然而,由于多線程和異步處理,不同的topic consumer可能會以不同的順序接收來自不同producer的消息。例如有兩個producer,分別是P和Q。差不多是同一時間內,P發送了 P1、P2和P3三個消息;Q發送了Q1和Q2兩個消息。兩個不同的consumer可能會以以下順序接收到消息:

    ?? consumer1: P1 P2 Q1 P3 Q2
    ??? consumer2: P1 Q1 Q2 P2 P3
    ??? Strict order dispatch policy 會保證每個topic consumer會以相同的順序接收消息,代價是性能上的損失。以下是采用了strict order dispatch policy后,兩個不同的consumer可能以以下的順序接收消息:
    ??? consumer1: P1 P2 Q1 P3 Q2
    ??? consumer2: P1 P2 Q1 P3 Q2

    ?? 以下是ActiveMQ配置文件的一個例子:

    Xml代碼
    1. < destinationPolicy > ??
    2. ??<policyMap>??
    3. ????<policyEntries>??
    4. ??????<policyEntry?topic=""FOO.>">??
    5. ????????<dispatchPolicy>??
    6. ??????????<strictOrderDispatchPolicy?/>??
    7. ????????</dispatchPolicy>??
    8. ??????</policyEntry>??
    9. ????</policyEntries>??
    10. ??</policyMap>??
    11. </ destinationPolicy > ??

    ?

    2.6.10 Message Cursors
    ??? 當producer發送的持久化消息到達broker之后,broker首先會把它保存在持久存儲中。接下來,如果發現當前有活躍的consumer,而 且這個consumer消費消息的速度能跟上producer生產消息的速度,那么ActiveMQ會直接把消息傳遞給broker內部跟這個 consumer關聯的dispatch queue;如果當前沒有活躍的consumer或者consumer消費消息的速度跟不上producer生產消息的速度,那么ActiveMQ會使用 Pending Message Cursors保存對消息的引用。在需要的時候,Pending Message Cursors把消息引用傳遞給broker內部跟這個consumer關聯的dispatch queue。以下是兩種Pending Message Cursors:

    • VM Cursor。在內存中保存消息的引用。
    • File Cursor。首先在內存中保存消息的引用,如果內存使用量達到上限,那么會把消息引用保存到臨時文件中。

    ?? 在缺省情況下,ActiveMQ 5.0根據使用的Message Store來決定使用何種類型的Message Cursors,但是你可以根據destination來配置Message Cursors。

    ??? 對于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有 vmDurableCursor 和 fileDurableSubscriberCursor。以下是ActiveMQ配置文件的一個例子:
    Xml代碼
    1. <destinationPolicy>??
    2. ??<policyMap>??
    3. ????<policyEntries>??
    4. ??????<policyEntry?topic="org.apache.>">??
    5. ????????<pendingSubscriberPolicy>??
    6. ??????????<vmCursor?/>??
    7. ????????</pendingSubscriberPolicy>??
    8. ????????<PendingDurableSubscriberMessageStoragePolicy>??
    9. ??????????<vmDurableCursor/>??
    10. ????????</PendingDurableSubscriberMessageStoragePolicy>??
    11. ??????</policyEntry>??
    12. ????</policyEntries>??
    13. ??</policyMap>??
    14. </destinationPolicy>??

    ?? 對于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。以下是ActiveMQ配置文件的一個例子:

    Xml代碼
    1. <destinationPolicy>??
    2. ??<policyMap>??
    3. ????<policyEntries>??
    4. ??????<policyEntry?queue="org.apache.>">??
    5. ????????<pendingQueuePolicy>??
    6. ??????????<vmQueueCursor?/>??
    7. ????????</pendingQueuePolicy>??
    8. ??????</policyEntry>??
    9. ????</policyEntries>??
    10. ??</policyMap>??
    11. </destinationPolicy>??

    ?

    2.6.11 Optimized Acknowledgement
    ??? ActiveMQ缺省支持批量確認消息。由于批量確認會提高性能,因此這是缺省的確認方式。如果希望在應用程序中禁止經過優化的確認方式,那么可以采用如下方法:

    Java代碼
    1. cf?=?new?ActiveMQConnectionFactory?("tcp://locahost:61616?jms.optimizeAcknowledge=false");??
    2. ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(false);??
    3. ((ActiveMQConnection)connection).setOptimizeAcknowledge(false);??

    ?

    2.6.12 Producer Flow Control
    ??? 同步發送消息的producer會自動使用producer flow control ;對于異步發送消息的producer,要使用producer flow control,你先要為connection配置一個ProducerWindowSize參數,如下:

    Java代碼
    1. ((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);??
    ??? ProducerWindowSize是producer在發送消息的過程中,收到broker對于之前發送消息的確認之前, 能夠發送消息的最大字節數。你也可以禁用producer flow control,以下是ActiveMQ配置文件的一個例子:
    Java代碼
    1. <destinationPolicy>??
    2. ??<policyMap>??
    3. ????<policyEntries>??
    4. ??????<policyEntry?topic="FOO.>"?producerFlowControl="false">??
    5. ????????<dispatchPolicy>??
    6. ??????????<strictOrderDispatchPolicy/>??
    7. ????????</dispatchPolicy>??
    8. ??????</policyEntry>??
    9. ????</policyEntries>??
    10. ??</policyMap>??
    11. </destinationPolicy>??

    ?

    2.6.13 Message Transformation
    ??? 有時候需要在JMS provider內部進行message的轉換。從4.2版本起,ActiveMQ 提供了一個MessageTransformer 接口用于進行消息轉換,如下:

    Java代碼
    1. public?interface?MessageTransformer?{??
    2. ????Message?producerTransform(Session?session,?MessageProducer?producer,?Message?message)?throws?JMSException;??
    3. ????Message?consumerTransform(Session?session,?MessageConsumer?consumer,?Message?message)throws?JMSException;??
    4. }??
    ??? 通過在以下對象上通過調用setTransformer方法來設置MessageTransformer:
    • ActiveMQConnectionFactory
    • ActiveMQConnection
    • ActiveMQSession
    • ActiveMQMessageConsumer
    • ActiveMQMessageProducer

    ?? MessageTransformer接口支持:

    • 在消息被發送到JMS provider的消息總線前進行轉換。通過producerTransform方法。
    • 在消息到達消息總線后,但是在consumer接收到消息前進行轉換。通過consumerTransform方法。

    ?? 以下是個簡單的例子:??

    ?

    Java代碼
    1. public?class?SimpleMessage?implements?Serializable?{??
    2. ????//??
    3. ????private?static?final?long?serialVersionUID?=?2251041841871975105L;??
    4. ??????
    5. ????//??
    6. ????private?String?id;??
    7. ????private?String?text;??
    8. ??????
    9. ????public?String?getId()?{??
    10. ????????return?id;??
    11. ????}??
    12. ????public?void?setId(String?id)?{??
    13. ????????this.id?=?id;??
    14. ????}??
    15. ????public?String?getText()?{??
    16. ????????return?text;??
    17. ????}??
    18. ????public?void?setText(String?text)?{??
    19. ????????this.text?=?text;??
    20. ????}??
    21. }??
    ??? 在producer內發送ObjectMessage,如下:
    Java代碼
    1. SimpleMessage?sm?=?new?SimpleMessage();??
    2. sm.setId("1");??
    3. sm.setText("this?is?a?sample?message");??
    4. ObjectMessage?message?=?session.createObjectMessage();??
    5. message.setObject(sm);??
    6. producer.send(message);??

    ?? 在consumer的session上設置一個MessageTransformer用于將ObjectMessage轉換成TextMessage,如下:

    Java代碼
    1. ((ActiveMQSession)session).setTransformer(new?MessageTransformer()?{??
    2. public?Message?consumerTransform(Session?session,?MessageConsumer?consumer,?Message?message)?throws?JMSException?{??
    3. ????ObjectMessage?om?=?(ObjectMessage)message;??
    4. ????XStream?xstream?=?new?XStream();??
    5. ????xstream.alias("simple?message",?SimpleMessage.class);??
    6. ????String?xml?=?xstream.toXML(om.getObject());??
    7. ????return?session.createTextMessage(xml);??
    8. }??
    9. ??
    10. public?Message?producerTransform(Session?session,?MessageProducer?consumer,?Message?message)?throws?JMSException?{??
    11. ????return?null;??
    12. }??
    13. });
    posted on 2010-09-01 22:29 禮物 閱讀(1045) 評論(0)  編輯  收藏 所屬分類: ActiveMQ
    主站蜘蛛池模板: 色婷婷六月亚洲综合香蕉| 无码国产精品一区二区免费16| 亚洲另类激情专区小说图片| 中文字幕a∨在线乱码免费看| 亚洲狠狠狠一区二区三区| 国产精品美女自在线观看免费 | 亚洲国产一区二区a毛片| 台湾一级毛片永久免费 | 野花高清在线电影观看免费视频| 免费精品国自产拍在线播放| 亚洲av永久无码制服河南实里| 成年性羞羞视频免费观看无限| 东北美女野外bbwbbw免费| 亚洲精品第一综合99久久| 亚洲欧洲日产国码无码网站| 在线观看免费宅男视频| 国产va在线观看免费| 羞羞漫画登录页面免费| 亚洲精彩视频在线观看| 国产精品亚洲综合一区| 一个人免费观看视频www| 国产午夜无码精品免费看| 相泽南亚洲一区二区在线播放| 久久久久亚洲精品日久生情| 亚洲精品麻豆av| 成年女人免费v片| 91精品全国免费观看含羞草| 69国产精品视频免费| 九一在线完整视频免费观看| 国产精品亚洲专区在线观看| 亚洲91av视频| 亚洲中文久久精品无码ww16| 日韩精品免费电影| 69式国产真人免费视频| 国产精品区免费视频| 91av免费在线视频| 成人久久久观看免费毛片| 亚洲精品又粗又大又爽A片| 亚洲啪啪免费视频| 亚洲精品福利网泷泽萝拉| 亚洲精品亚洲人成人网|