1 JMS
??? 在介紹ActiveMQ之前,首先簡要介紹一下JMS規范。
1.1 JMS的基本構件
1.1.1 連接工廠
??? 連接工廠是客戶用來創建連接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory。
1.1.2 連接
??? JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連接。
1.1.3 會話
??? JMS Session是生產和消費消息的一個單線程上下文。會話用于創建消息生產者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組發送和接收被組合到了一個原子操作中。
1.1.4 目的地
??? 目的地是客戶用來指定它生產的消息的目標和它消費的消息的來源的對象。JMS1.0.2規范中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發布/訂閱消息傳遞域。
點對點消息傳遞域的特點如下:
- 每個消息只能有一個消費者。
- 消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處于運行狀態,它都可以提取消息。
?? 發布/訂閱消息傳遞域的特點如下:
- 每個消息可以有多個消費者。
- 生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱之后發布的消息。JMS規范允許客戶創建持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處于激活狀態時發送的消息。
? 在點對點消息傳遞域中,目的地被成為隊列(queue);在發布/訂閱消息傳遞域中,目的地被成為主題(topic)。
?
1.1.5 消息生產者
??? 消息生產者是由會話創建的一個對象,用于把消息發送到一個目的地。
1.1.6 消息消費者
??? 消息消費者是由會話創建的一個對象,它用于接收發送到目的地的消息。消息的消費可以采用以下兩種方法之一:
- 同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。
- 異步消費??蛻艨梢詾橄M者注冊一個消息監聽器,以定義在消息到達時所采取的動作。
1.1.7 消息
??? JMS消息由以下三部分組成:
- 消息頭。每個消息頭字段都有相應的getter和setter方法。
- 消息屬性。如果需要除消息頭字段以外的值,那么可以使用消息屬性。
- 消息體。JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。
1.2 JMS的可靠性機制
1.2.1 確認
??? JMS消息只有在被確認之后,才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。
??? 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息何時被確認取決于創建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:
- Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。
- Session.CLIENT_ACKNOWLEDGE。
客戶通過消息的acknowledge方法確認消息。需要注意的是,在這種模式中,確認是在會話層上進行:確認一個被消費的消息將自動確認所有已被會話消
費的消息。例如,如果一個消息消費者消費了10個消息,然后確認第5個消息,那么所有10個消息都被確認。
- Session.DUPS_ACKNOWLEDGE。
該選擇只是會話遲鈍第確認消息的提交。如果JMS provider失敗,那么可能會導致一些重復的消息。如果是重復的消息,那么JMS
provider必須把消息頭的JMSRedelivered字段設置為true。
1.2.2 持久性
??? JMS 支持以下兩種消息提交模式:
- PERSISTENT。指示JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失。
- NON_PERSISTENT。不要求JMS provider持久保存消息。
1.2.3 優先級
??? 可以使用消息優先級來指示JMS provider首先提交緊急的消息。優先級分10個級別,從0(最低)到9(最高)。如果不指定優先級,默認級別是4。需要注意的是,JMS provider并不一定保證按照優先級的順序提交消息。
1.2.4 消息過期
??? 可以設置消息在一定時間后過期,默認是永不過期。
1.2.5 臨時目的地
??? 可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來創建臨時目的地。它們的存在時間只限于創建它們的連接所保持的時間。只有創建該臨時目的地的連接上的消息消費者才能夠從臨時目的地中提取消息。
1.2.6 持久訂閱
??? 首先消息生產者必須使用PERSISTENT提交消息??蛻艨梢酝ㄟ^會話上的createDurableSubscriber方法來創建一個持久訂閱,該方法的第一個參數必須是一個topic。第二個參數是訂閱的名稱。
???
JMS
provider會存儲發布到持久訂閱對應的topic上的消息。如果最初創建持久訂閱的客戶或者任何其它客戶使用相同的連接工廠和連接的客戶ID、相同
的主題和相同的訂閱名再次調用會話上的createDurableSubscriber方法,那么該持久訂閱就會被激活。JMS
provider會象客戶發送客戶處于非激活狀態時所發布的消息。
??? 持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在創建之后會一直保留,直到應用程序調用會話上的unsubscribe方法。
1.2.7 本地事務
???
在一個JMS客戶端,可以使用本地事務來組合消息的發送和接收。JMS
Session接口提供了commit和rollback方法。事務提交意味著生產的所有消息被發送,消費的所有消息被確認;事務回滾意味著生產的所有消
息被銷毀,消費的所有消息被恢復并重新提交,除非它們已經過期。
??? 事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被調用,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。
需要注意的是,如果使用請求/回復機制,即發送一個消息,同時希望在同一個事務中等待接收該消息的回復,那么程序將被掛起,因為知道事務提交,發送操作才會真正執行。
??? 需要注意的還有一個,消息的生產和消費不能包含在同一個事務中。
1.3 JMS 規范的變遷
??? JMS的最新版本的是1.1。它和同1.0.2版本之間最大的差別是,JMS1.1通過統一的消息傳遞域簡化了消息傳遞。這不僅簡化了JMS API,也有利于開發人員靈活選擇消息傳遞域,同時也有助于程序的重用和維護。
以下是不同消息傳遞域的相應接口:
JMS 公共
|
點對點域
|
發布/訂閱域
|
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
?
2 ActiveMQ
2.1 Broker
2.1.1 Running Broker
??? ActiveMQ5.0 的二進制發布包中bin目錄中包含一個名為activemq的腳本,直接運行這個腳本就可以啟動一個broker。
??? 此外也可以通過Broker Configuration URI或Broker XBean URI對broker進行配置,以下是一些命令行參數的例子:
Example
|
Description
|
activemq
|
Runs a broker using the default 'xbean:activemq.xml' as the broker configuration file.
|
activemq xbean:myconfig.xml
|
Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.
|
activemq xbean:file:./conf/broker1.xml
|
Runs
a broker using the file broker1.xml as the broker configuration file
that is located in the relative file path ./conf/broker1.xml
|
activemq xbean:file:C:/ActiveMQ/conf/broker2.xml
|
Runs
a broker using the file broker2.xml as the broker configuration file
that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml
|
activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true
|
Runs a broker with two transport connectors and JMX enabled.
|
activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false
|
Runs a broker with 1 transport connector and 1 network connector with persistence disabled.
|
?
2.1.2 Embedded Broker
??? 可以通過在應用程序中以編碼的方式啟動broker,例如:
-
BrokerService?broker?=?
new
?BrokerService();??
-
broker.addConnector("tcp://localhost:61616");??
-
broker.start();??
??? 如果需要啟動多個broker,那么需要為broker設置一個名字。例如:
- BrokerService?broker?=?new?BrokerService();??
- broker.setName("fred");??
- broker.addConnector("tcp://localhost:61616");??
- broker.start();??
??? 如果希望在同一個JVM內訪問這個broker,那么可以使用VM Transport,URI是:vm://brokerName。關于更多的broker屬性,可以參考Apache的官方文檔。
??? 此外,也可以通過BrokerFactory來創建broker,例如:
- BrokerService?broker?=?BrokerFactory.createBroker(new?URI(someURI));??
??? someURI的可選值如下:
URI scheme | Example | Description |
xbean: | xbean:activemq.xml | Searches
the classpath for an XML document with the given URI (activemq.xml in
this case) which will then be used as the Xml Configuration |
file: | file:foo/bar/activemq.xml | Loads the given file (in this example foo/bar/activemq.xml) as the Xml Configuration |
broker: | broker:tcp://localhost:61616 | Uses the Broker Configuration URI to configure the broker |
?? 當使用XBean的配置方式的時候,需要指定一個xml配置文件,例如:
- BrokerService?broker?=?BrokerFactory.createBroker(new?URI("xbean:com/test/activemq.xml"));??
??? 使用Spring的配置方式如下:
- <bean?id="broker"?class="org.apache.activemq.xbean.BrokerFactoryBean">??
- ??<property?name="config"?value="classpath:org/apache/activemq/xbean/activemq.xml"?/>??
- ??<property?name="start"?value="true"?/>??
- </bean>??
?
2.1.3 Monitoring Broker
2.1.3.1 JMX
??? 在使用JMX監控broker之前,首先要啟用broker的JMX監控功能,例如在配置文件中設置useJmx="true",如下:
- <broker?useJmx="true"?brokerName="broker1>??
- ??<managementContext>??
- ?????<managementContext?createConnector="true"/>??
- ??</managementContext>??
- ??...??
- </broker>??
???
接下來運行JDK自帶的jconsole。在運行了jconsole后,它會彈出對話框來選擇需要連接到的agent。如果是在啟動broker的主機上
運行jconsole,那么ActiveMQ broker會出現在jconsole的Local
標簽中。如果要連接到遠程的broker,那么可以在Advanced標簽中指定JMX URL,以下是一個連接到本機的JMX URL:
??? service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
??
在jconsole的MBeans標簽中,可以查看詳細信息,也可以執行相應的operation。需要注意的是,在jconsole連接到broker
的時候,并不需要輸入用戶名和密碼,如果這存在潛在的安全問題,那么就需要為JMX
Connector配置密碼保護(需要使用1.5以上版本的JDK)。??
?? 首先要禁止ActiveMQ創建自己的connector,例如:
- <broker?xmlns="http://activemq.org/config/1.0"?brokerName="localhost"useJmx="true">??
- ??<managementContext>??
- ?????<managementContext?createConnector="false"/>??
- ??</managementContext>??
- </broker>??
??? 然后在ActiveMQ的conf目錄下創建一個訪問控制文件和密碼文件,如下:
conf/jmx.access:
# The "monitorRole" role has readonly access.
# The "controlRole" role has readwrite access.
monitorRole readonly
controlRole readwrite
?
conf/jmx.password:
# The "monitorRole" role has password "abc123".
# The "controlRole" role has password "abcd1234".
monitorRole abc123
controlRole abcd1234
?
?? 然后修改ActiveMQ的bin目錄下activemq的啟動腳本,查找包含"SUNJMX="的一行如下:
REM
set SUNJMX=-Dcom.sun.management.jmxremote.port=1616
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
??? 把它替換成
set
SUNJMX=-Dcom.sun.management.jmxremote.port=1616
-Dcom.sun.management.jmxremote.authenticate=true
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password
-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access
???
最后重啟ActiveMQ和jconsole,這時候需要強制login。如果在啟動activemq的過程中出現以下錯誤,那么需要為這個文件增加訪問
控制。Windows平臺上的具體解決方法請參考如下網址:http://java.sun.com/j2se/1.5.0/docs/guide
/management/security-windows.html
Error: Password file read access must be restricted: D:\apache-activemq-5.0.0\bin\../conf/jmx.password
?
2.1.3.2 Web Console
??? Web Console被集成到了ActiveMQ的二進制發布包中,因此缺省訪問http://localhost:8161/admin即可訪問Web Console。
??? 在配置文件中,可以通過修改nioConnector的port屬性來修改Web console的缺省端口:
- <jetty?xmlns="http://mortbay.com/schemas/jetty/1.0">??
- ??<connectors>??
- ????<nioConnector?port="8161"?/>??
- ??</connectors>??
- ??...??
- </jetty>??
???
出于安全性或者可靠性的考慮,Web Console
可以被部署到不同于ActiveMQ的進程中。例如把activemq-web-console.war部署到一個單獨的web容器中
(Tomcat,Jetty等)。在ActiveMQ5.0的二進制發布包中不包含activemq-web-console.war,因此需要下載
ActiveMQ的源碼,然后進入到${activemq.base}/src/activemq-web-console目錄中執行mvn
instanll。如果一切正常,那么缺省會在${activemq.base}/src/activemq-web-console/target目錄
中生成activemq-web-console-5.0.0.war。然后將activemq-web-console-5.0.0.war拷貝到
Tomcat的webapps目錄中,并重命名成activemq-web-console.war。
?? 需要注意的是,要將activemq-all-5.0.0.jar拷貝到WEB-INF\lib目錄中(可能還需要拷貝jms.jar)。還要為Tomcat設置以下五個系統屬性(修改catalina.bat文件):
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.type="properties"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jms.url="tcp://localhost:61616"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.role=""
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.password=""
??
如果JMX沒有配置密碼保護,那么webconsole.jmx.role和webconsole.jmx.password設置成""即可。如果
broker被配置成了Master/Slave模式,那么可以配置成使用failover transport,例如:
-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)
??
順便說一下,由于webconsole.type 屬性是properties,因此實際上起作用的Web
Console的配置文件是WEB-INF/
webconsole-properties.xml。最后啟動被監控的ActiveMQ,訪問http://localhost:8080
/activemq-web-console/,查看顯示是否正常。
?
2.1.3.3 Advisory Message
??? ActiveMQ 支持Advisory Messages,它允許你通過標準的JMS 消息來監控系統。目前的Advisory Messages支持:
- consumers, producers and connections starting and stopping
- temporary destinations being created and destroyed
- messages expiring on topics and queues
- brokers sending messages to destinations with no consumers.
- connections starting and stopping
?
Advisory Messages可以被想象成某種的管理通道,通過它你可以得到關于JMS
provider、producers、consumers和destinations的信息。Advisory
topics都使用ActiveMQ.Advisory.這個前綴,以下是目前支持的topics: ?
?? Client based advisories
Advisory Topics | Description |
ActiveMQ.Advisory.Connection | Connection start & stop messages |
ActiveMQ.Advisory.Producer.Queue | Producer start & stop messages on a Queue |
ActiveMQ.Advisory.Producer.Topic | Producer start & stop messages on a Topic |
ActiveMQ.Advisory.Consumer.Queue | Consumer start & stop messages on a Queue
|
ActiveMQ.Advisory.Consumer.Topic | Consumer start & stop messages on a Topic |
?
??? 在消費者啟動/停止的Advisory Messages的消息頭中有個consumerCount屬性,他用來指明目前desination上活躍的consumer的數量。
???? Destination and Message based advisories
Advisory Topics | Description |
ActiveMQ.Advisory.Queue | Queue create & destroy |
ActiveMQ.Advisory.Topic | Topic create & destroy |
ActiveMQ.Advisory.TempQueue | Temporary Queue create & destroy |
ActiveMQ.Advisory.TempTopic | Temporary Topic create & destroy |
ActiveMQ.Advisory.Expired.Queue | Expired messages on a Queue |
ActiveMQ.Advisory.Expired.Topic | Expired messages on a Topic |
ActiveMQ.Advisory.NoConsumer.Queue | No consumer is available to process messages being sent on a Queue |
ActiveMQ.Advisory.NoConsumer.Topic | No consumer is available to process messages being sent on a Topic |
??
以上的這些destnations都可以用來作為前綴,在其后面追加其它的重要信息,例如topic、queue、clientID、
producderID和consumerID等。這令你可以利用Wildcards 和 Selectors 來過濾Advisory
Messages(關于Wildcard和Selector會在稍后介紹)。
??
例如,如果你希望訂閱FOO.BAR這個queue上Consumer的start/stop的消息,那么可以訂閱
ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望訂閱所有queue上的start/stop消息,那么可
以訂閱ActiveMQ.Advisory.Consumer.Queue.>;如果希望訂閱所有queue或者topic上的
start/stop消息,那么可以訂閱ActiveMQ.Advisory.Consumer. >。
??? org.apache.activemq.advisory.AdvisorySupport類上有如下的helper methods,用來在程序中得到advisory destination objects。
- AdvisorySupport.getConsumerAdvisoryTopic()??
- AdvisorySupport.getProducerAdvisoryTopic()??
- AdvisorySupport.getDestinationAdvisoryTopic()??
- AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()??
- AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()??
- AdvisorySupport.getNoTopicConsumersAdvisoryTopic()??
- AdvisorySupport.getNoQueueConsumersAdvisoryTopic()??
?? 以下是段使用Advisory Messages的程序代碼:
- Destination?advisoryDestination?=?AdvisorySupport.getProducerAdvisoryTopic(destination)??
- MessageConsumer?consumer?=?session.createConsumer(advisoryDestination);??
- consumer.setMessageListener(this);??
- ...??
- public?void?onMessage(Message?msg){??
- ????if?(msg?instanceof?ActiveMQMessage){??
- ????????try?{??
- ?????????????ActiveMQMessage?aMsg?=??(ActiveMQMessage)msg;??
- ?????????????ProducerInfo?prod?=?(ProducerInfo)?aMsg.getDataStructure();??
- ????????}?catch?(JMSException?e)?{??
- ????????????log.error("Failed?to?process?message:?"?+?msg);??
- ????????}??
- ????}??
- }??
?
2.1.3.4 Command Agent
??? 在介紹Command Agent前首先簡要介紹一下XMPP(Jabber)協議,XMPP是一種基于XML的即時通信協議,它由Jabber軟件基金會開發。在配置文件中通過增加transportConnector來支持XMPP協議:
- <broker?xmlns="http://activemq.org/config/1.0">??
- ??<transportConnectors>??
- ?????...??
- ?????<transportConnector?name="xmpp"?????uri="xmpp://localhost:61222"/>??
- ??</transportConnectors>??
- </broker>??
??? ActiveMQ提供了ActiveMQ messages和XMPP之間的雙向橋接:
- 如果客戶加入了一個聊天室,那么這個聊天室的名字會被映射到一個JMS topic。
- 嘗試在聊天室內發送消息會導致一個JMS消息被發送到這個topic。
- 呆在一個聊天室中意味著這將保持一個對相應JMS topic的訂閱。因此發送到這個topic的JMS消息也會被發送到聊天室。
?? 推薦XMPP客戶端Spark(http://www.igniterealtime.org/)。
?? 從4.2版本起,ActiveMQ支持Command Agent。在配置文件中,通過設置commandAgent來啟用Command Agent:
- <beans>??
- ??<broker?useJmx="true"?xmlns="http://activemq.org/config/1.0">??
- ????...??
- ??</broker>??
- ??<commandAgent?xmlns="http://activemq.org/config/1.0"/>??
- </beans>??
???
啟用了Command Agent的broker上會有一個來自Command Agent的連接,它同時訂閱topic:
ActiveMQ.Agent。在你啟動XMPP客戶端,加入到ActiveMQ.Agent聊天室后,就可以同broker進行交談了。通過在XMPP
客戶端中鍵入help,可以得到幫助信息。
??? 需要注意的是,ActiveMQ5.0版本有個小bug,如果broker沒有采用缺省的用戶名和密碼,那么Command Agent便無法正常啟動。Apache官方文檔說,此bug已經被修正,預定在5.2.0版本上體現。修改方式如下:
- <commandAgent?xmlns="http://activemq.org/config/1.0"?brokerUser="user"?brokerPassword="passward"/>??
?
2.1.3.5 Visualization plugin
??? ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer來查看),以圖表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:?
- <broker?xmlns="http://activemq.org/config/1.0"?brokerName="localhost"?useJmx="true">??
- ??????...??
- ??????<plugins>??
- ??????????<connectionDotFilePlugin??file="connection.dot"/>??
- ??????????<destinationDotFilePlugin?file="destination.dot"/>??
- ??????</plugins>??
- </broker>??
??
需要注意的是,筆者認為ActiveMQ5.0版本的Visualization
Plugin尚不穩定,存在諸多問題。例如:如果使用connectionDotFilePlugin,那么brokerName必須是
localhost;如果使用destinationDotFilePlugin可能會導致ArrayStoreException。