JMS是一種應用于異步消息傳遞的標準API,作為Java平臺的一部分,JMS可以允許不同應用、不同模塊之間實現可靠、異步數據通信。
一些概念
JMS provider
An implementation of the JMS interface for a Message Oriented Middleware (MOM). Providers are implemented as either a Java JMS implementation or an adapter to a non-Java MOM.
JMS client
An application or process that produces and/or receives messages.
JMS producer/publisher
A JMS client that creates and sends messages.
JMS consumer/subscriber
A JMS client that receives messages.
JMS message
An object that contains the data being transferred between JMS clients.
JMS queue
A staging area that contains messages that have been sent and are waiting to be read. Note that, contrary to what the name queue suggests, messages don't have to be delivered in the order sent. A JMS queue only guarantees that each message is processed only once.
JMS topic
A distribution mechanism for publishing messages that are delivered to multiple subscribers.
在JMS中,支持兩種消息模型,點對點(Point-to-point)和發布-訂閱(Publish and subscribe),這兩種模式分別對應于JMS中的兩種消息目標(Message Destination):隊列及主題。
在點對點模型中,每個消息都有一個發送者和一個接收者,消息中介(broker)收到發送者的消息,會將消息放入隊列中,而接收者請求并接收隊列中的一條消息后,這條消息就會從隊列中刪除。消息隊列中的每條消息只能投遞給一個接收者,但并不意味著只能使用一個接收者從隊列中取消息,根據業務需要,可以使用多個接收者同時從隊列中請求消息,分擔處理壓力。但是需要注意的是,單個接收者收到的消息是按照發送順序的,多個接收者因為多線程的關系,并不能保證收到的消息一定是原序的。
在發布-訂閱模式中,消息會發送給一個主題,但是與點對點模式不同的是消息不再只被投遞給一個接收者,而是所有此主題的訂閱者都會收到該消息。
JMS消息類型
在JMS1.1規范中,定義了五種消息類型,分別為:
1.StreamMessage :消息體是 Java 流,寫入和讀出都是順序的
2.MapMessage :消息體包含 key-value 對, key 為 String , value 為基本類型,可以通過迭代器訪問
3.TextMessage :消息體是 String
4.ObjectMessage :消息體是可序列化的 Java 對象
5.BytesMessage :消息體是字節數組
可以通過 message.clearBody() 來清除消息體;但在消費端,消息體是只讀的,針對消息的寫操作都會拋出 MessageNotWritableException 異常
JMS消息頭所有消息的消息頭都具體相同的字段,用于 JMS Client 以及 JMS Provider 對它們進行區別以及進行消息路由
1.JMSDestination
消息發送的目的地(隊列或主題);創建消息時可以設置 JMSDestination ,但是在發送完成時其值會更新為發送方所指定的 JMSDestination ,也就是說發送前該字段會被忽略;當消息被消費時,該字段的值與在它被發送時被設置的值是相同的
以下所有示例均基于ActiveMQ
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建2個目的地
Destination destination = session.createQueue("JMS.DEMO");
Destination destination2 = session.createQueue("JMS.DEMO2");
// 創建生產者
MessageProducer publisher = session.createProducer(destination);
// 設置傳輸模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 創建消息
TextMessage message = session.createTextMessage("Test Message");
// 設置消息的目的地為destination2
message.setJMSDestination(destination2);
// 發送消息
publisher.send(message);
System.out.println(message.getJMSDestination());
代碼中,通過 message.setJMSDestination(destination2); 設置了 message 的 JMSDestination 消息頭屬性值,我們再看看其輸出結果
queue://JMS.DEMO
通過這個例子可以看出,雖然在發送前設置了消息的目的地,但是發送后消息的目的地被重置了
2.JMSDeliveryMode
指明消息的傳輸模式,有兩種:
DeliveryMode.PERSISTENT :保證消息僅傳一次, JMS Provider 服務停止后消息不會丟失;
DeliveryMode.NON_PERSISTENT :消息最多傳一次,消息會因 JMS Provider 停止后丟失;
同 JMSDestination 一樣,在發送前設置的會被忽略
看下面的例子
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地
Destination destination = session.createQueue("JMS.DEMO");
// 創建生產者
MessageProducer publisher = session.createProducer(destination);
// 設置傳輸模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 發送PERSISTENT消息
publisher.send(session.createTextMessage("PERSISTENT MESSAGE"));
// 設置傳輸模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 發送PERSISTENT消息
publisher.send(session.createTextMessage("NON_PERSISTENT MESSAGE"));
例子中分別發送了一條 PERSISTENT 的消息和一條 NON_PERSISTENT 的消息;當 Active MQ 重啟后,啟動消費端,收到的消息如下
PERSISTENT MESSAGE
該例子說明,在 JMS Provider 重啟后, NON_PERSISTENT 消息丟失了,而 PERSISTENT 消息能正常被消費者消費
3.JMSMessageID
由 JMS Provider 指定的消息的唯一標識符;同上面的字段一樣,在發送前設置的會被忽略,在發送完成時,由 JMS Provider 重置該字段
4.JMSReplyTo
發送端在發送消息時,可以指定該屬性(為一個 JMSDestination ),表示期望收到客戶端的響應;是否響應由消費端決定
如下面的例子:
發送端:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地
Destination destination = session.createQueue("JMS.DEMO");
Destination destination2 = session.createQueue("JMS.DEMO3");
// 創建生產者
MessageProducer publisher = session.createProducer(destination);
// 設置傳輸模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 創建消息
TextMessage message = session.createTextMessage("Test Message");
message.setJMSReplyTo(destination2);
// 發送消息
publisher.send(message);
接收端(可以根據情況決定是否需要回復)
public void onMessage(Message message) {
try {
System.out.println("Receive message: " + message);
if (message.getJMSReplyTo() != null) {
session.createProducer(message.getJMSReplyTo()).send(session.createTextMessage("This is a reply to"
+ message.getJMSReplyTo()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
5.JMSRedelivered
當消費者收到帶有 JMSRedelivered 的消息頭時,表明該消息在過去傳輸過但沒有被確認
JMS Provider 必須對該字段進行設置,當為 true 時即告知消費者該消息是重傳的,消費者需要自行處理重復的消息
6.JMSExpiration
消息的過期時間,其值為當前時間加上存活時間(毫秒);當存活時間設置為 0 時,該字段的值也被設置為 0 ,表示永不過期;
消費端在一般情況下都不會接收到過期的消息,但 JMS Provider 并不保證這一點;
下面的例子說明了如何設置消息的過期時間
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地
Destination destination = session.createQueue("JMS.DEMO");
// 創建生產者
MessageProducer publisher = session.createProducer(destination);
// 設置傳輸模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 創建消息
TextMessage message = session.createTextMessage("Test Message");
// 發送消息
publisher.setTimeToLive(5000);
publisher.send(message);
7.JMSPriority
消息的優先級, 0 代表最低優先級, 9 代表最高優先級;一般 0~4 為普通優先級, 5~9 為加快優先級
JMS 規范里并沒有要求 JMS Provider 嚴格按這個優先級來實現,但是盡可能實現加快優先級消息的傳輸在普通消息的前面
同 JMSDestination 一樣,該字段在發送前被忽略,在發送完成時重置
消息屬性除了前面提到的消息頭以外, JMS 消息還提供了對“屬性值對”的支持,以對消息頭進行擴展;消息屬性主要用于消息選擇器 (message selector 詳見下文 )
1.屬性名
屬性名必須服務消息選擇器的命名規則2.屬性值
可以是基本類型及其對象類型以及 Map 、 List 和 String
下面的例子中,消息帶 HashMap 的屬性
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地
Destination destination = session.createQueue("JMS.DEMO");
// 創建生產者
MessageProducer publisher = session.createProducer(destination);
// 設置傳輸模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 創建消息
TextMessage message = session.createTextMessage("Test Message");
// 發送消息
message.setObjectProperty("myProp", new HashMap() {
{
this.put("key1", "value1");
this.put("key2", "value2");
}
});
publisher.send(message);
3.清除屬性
JMS 不能清除單個屬性,但可以通過 Message.clearProperties() 方法清除所有消息屬性
JMS實現(Provider implementations)要使用JMS,必須要有相應的實現來管理session以及隊列,從Java EE1.4開始,所有的Java EE應用服務器必須包含一個JMS實現。
以下是一些JMS實現:
Apache ActiveMQ
Apache Qpid, using AMQP
BEA Weblogic (part of the Fusion Middleware suite) and Oracle AQ from Oracle
EMS from TIBCO
FFMQ, GNU LGPL licensed
JBoss Messaging and HornetQ from JBoss
JORAM, from the OW2 Consortium
Open Message Queue, from Sun Microsystems
OpenJMS, from The OpenJMS Group
RabbitMQ, using AMQP
Solace JMS from Solace Systems
SonicMQ from Progress Software
StormMQ, using AMQP
SwiftMQ
Tervela
Ultra Messaging from 29 West (acquired by Informatica)
webMethods from Software AG
WebSphere Application Server from IBM, which provides an inbuilt default messaging provider known as the Service Integration Bus (SIBus), or which can connect to WebSphere MQ as a JMS provider [5]
WebSphere MQ (formerly MQSeries) from IBM
Ref from:
1.http://en.wikipedia.org/wiki/Java_Message_Service
2.Spring in Action
3.http://goldendoc.iteye.com/blog/1155647