http://news.ccidnet.com/pub/article/c1060_a59896_p2.html
提綱:
一、JMS基本概念
1.1 P2P通信
1.2 Pub/Sub通信
二、JMS消息
三、JMS P2P編程
3.1 使用JMS QueueConnection對象 3.2 處理回退事件
3.3 關閉JMS對象
3.4 接收消息
3.5 消息驅動的Bean
3.6 消息持久化
3.7 消息選擇器
四、JMS Pub/Sub編程
五、二階段提交的事務
━━━━━━━━━━━━━━━━━━━━━━━━━━
EJB 2.0和J2EE 1.3規范開始提供對Java消息服務(JMS)的支持。在J2EE 1.3加入JMS之前,J2EE環境中的組件通過RMI-IIOP協議通信,J2EE是一個完全同步的平臺。由于在J2EE 1.3規范中引入了JMS,J2EE環境開始具備一項極其重要的功能--異步通信。
● 說明:RMI-IIOP是Java遠程方法調用(RMI,Remote Method Invocation)的一個兼容CORBA的版本,CORBA是Common Object Request Broker Architecture的縮寫,即公用對象請求代理(調度)體系結構。RMI-IIOP通過CORBA平臺和語言中立的通信協議發送RMI消息。
為了支持異步通信,J2EE 1.3規范還引入了一種新的EJB類型:消息驅動的Bean,即Message Driven Bean,簡稱MDB。如前所述,在JMS之前,J2EE原來是一個建立在Java RMI-IIOP通信協議基礎上的同步環境,但MDB卻具有接收異步消息的能力。
異步通信使得企業應用能夠建立在一種全新的通信機制之上,它具有如下重要優點:
■ 異步通信程序不直接交換信息。由于這個原因,通信的任意一方能夠在對方停止響應時繼續自己的操作,例如當接收消息的一方不能響應時,發送方仍能夠發出消息。
■ 異步通信有助于提高可靠性。支持異步通信的中間件軟件提供了有保證的(可靠的)消息遞送機制,即使整個環境出現問題也能夠保證消息傳遞到目的地。Internet是一種不可靠的通信媒體,對于通過Internet通信的應用來說,異步通信的這一特點非常富有吸引力。
■ 發送異步消息的程序不會在等待應答的時候被鎖定,從而極大地有利于提高性能。
JMS本身不是一個通信軟件,而是一個標準的應用編程接口(API),用來建立廠商中立的異步通信機制。從這個意義上說,JMS類似于JDBC和JNDI,例如就JDBC來說,它要求有一個底層的數據庫提供者,JMS則要求有一個支持JMS標準的底層異步通信中間件提供者,一般稱為面向消息的中間件(Message-Oriented Middleware,MOM)。
MOM是一種支持應用程序通過交換消息實現異步通信的技術。在某種程度上,異步通信有點象是人們通過email進行通信;類似地,同步通信的程序就象是人們通過電話進行通信。在異步通信過程中,程序的結合方式是寬松的,換句話說,異步通信的程序并不直接相互聯系,而是通過稱為隊列(Queue)或主題(Topic)的虛擬通道聯系。
同時,異步通信還意味著消息通過一個中轉站,按照存儲-轉發的方式傳遞,即使通信的接收方當時并未運行,另一方(發送方)的程序也能夠順利發出消息,一旦接收方的程序后來開始運行,消息就會傳遞給它。
JMS通信主要的優點是提供了一個環境,在這個環境中各個程序通過標準的API進行通信,開發者不必再面對各種不同操作系統、數據表示方式、底層協議所帶來的復雜性。
本文討論了JMS編程的基本概念以及兩種JMS通信方式:第一種是端對端通信(Point-to-Point,P2P)方式,第二種是出版/訂閱(Publish/Subscribe,Pub/Sub)方式,另外還涵蓋了JMS消息結構、JMS主要對象、MDB、JMS和WebSphere編程、消息持久化以及JMS事務支持方面的問題。
一、JMS基本概念 鑒于JMS是一種比較新的技術,所以本文將首先詳細介紹JMS編程的一般概念,然后再探討WSAD 5.0環境下的JMS開發。如前所示,JMS程序本身并不直接相互通信,消息被發送給一個臨時中轉站--隊列或主題。隊列和主題都是能夠收集消息的中轉對象,但兩者支持的消息傳遞機制又有所不同,分別對應兩種不同的通信方式--P2P和Pub/Sub。
1.1 P2P通信 P2P消息傳遞又可以按照推(Push)和拉(Pull)兩種方式運作。在P2P拉方式中,程序通過稱為隊列的虛擬通道通信:在通信會話的發送方,發送程序把一個消息"放入"隊列,在接收方,接收程序定期掃描隊列,尋找它希望接收和處理的消息。和推方式相比,拉方式的消息傳遞效率較低,因為它需要周而復始地檢查消息是否到達,這個過程會消耗一定的資源。另外必須注意的一點是,當接收方發現一個需要處理的消息時,它就會"提取"消息,從效果上看等于從隊列刪除了消息。
因此,即使有多個接收程序在處理同一個隊列,對于某一特定的消息來說,總是只有一個接收者。JMS程序可以使用多個隊列,每一個隊列可以由多個程序處理,但是只有一個程序才會收到某個特定的消息。
在P2P推方式的消息傳遞中,發送程序的工作原理也一樣,它把消息發送到隊列,但接收程序的工作原理有所不同。接收程序實現了一個Listener接口,包括實現了該接口中的onMessage回調方法,在J2EE環境中監聽隊列接收消息的任務交給了容器,每當一個新的消息達到隊列,容器就調用onMessage方法,將消息作為參數傳遞給onMessage方法。
P2P通信最重要的特點(不管是推還是拉)是:每一個消息總是只由一個程序接收。一般而言,P2P程序在通信過程中參與的活動較多,例如,發送程序可以向接收程序指出應答消息應當放入哪一個隊列,它還可以要求返回一個確認或報告消息。
1.2 Pub/Sub通信 在Pub/Sub通信方式中,程序之間通過一個主題(Topic)實現通信,用主題作為通信媒介要求有Pub/Sub代理程序的支持(稍后詳細介紹)。在消息發送方,生產消息的程序向主題發送消息;在接收方,消息的消費程序向感興趣的主題訂閱消息。當一個消息到達主題,所有向該主題訂閱的消費程序都會通過onMessage方法的參數收到消息。
這是一種推式的消息傳遞機制。可以設想,會有一個以上的消費程序收到同一消息的副本。相比之下,程序在Pub/Sub通信過程中參與的活動較少,當生產者程序向某個特定的隊列發送消息,它不知道到底會有多少程序接收到該消息(可能有多個,可能沒有)。通過訂閱主題實現的通信是一種比較靈活的通信方式,主題的訂閱者可以動態地改變,卻不需要改動底層的通信機制,而且它對整個通信過程完全透明。
Pub/Sub方式的通信要求有Pub/Sub代理的支持。Pub/Sub代理是一種協調、控制消息傳遞過程,保證消息傳遞到接收方的程序。P2P通信方式中程序利用一個隊列作為通信的中轉站,相比之下,Pub/Sub通信方式中程序直接交互的是特殊的代理隊列,這就是我們要在WebSphere MQ常規安裝方式之上再裝一個MA0C的原因(只有用WebSphere MQ作為JMS提供者時才必須如此。要求使用MQ 5.3.1或更高的版本),MA0C就是一個支持Pub/Sub方式通信的代理軟件。
QueueConnectionFactory和TopicConnectionFactory對象是創建相應的QueueConnection對象和TopicConnection對象的JMS工廠類對象,JMS程序正是通過QueueConnection對象和TopicConnection對象連接到底層的MOM技術。
二、JMS消息 基于JMS的程序通過交換JMS消息實現通信,JMS消息由三部分構成:消息頭,消息屬性(可選的),消息正文。消息頭有多個域構成,包含了消息遞送信息和元數據。
消息屬性部分包含一些標準的以及應用特定的域,消息選擇器依據這些信息來過濾收到的消息。JMS定義了一組標準的屬性,要求MOM提供者支持(如表1所示)。消息正文部分包含應用特定的數據(也就是要求遞送到目標位置的內容)。
表1 JMS標準消息屬性 可選的屬性包括JMSXUserID、JMSXAppID、JMSXProducerTXID、ConsumerTXID、JMSXRcvTimestamp、JMSXDeliveryCount以及JMSXState。消息頭為JMS消息中間件提供了描述性信息,諸如消息遞送的目標、消息的創建者、保留消息的時間,等等,如表2所示。
表2 JMS消息頭的域
*在所有這些消息類型中,TextMessage是最常用的,它不僅簡單,而且能夠用來傳遞XML數據。
JMS支持多種消息正文類型,包括:
■ textMessage:消息正文包含一個java.lang.String對象。這是最簡單的消息格式,但可以用來傳遞XML文檔。
■ ObjectMessage:消息正文中包含了一個串行化之后的Java對象,這類Java對象必須是可串行化的。
■ MapMessage:消息正文包含一系列"名字-值"形式的數據元素,通常用來傳送一些按鍵-值形式編碼的數據。設置數據元素可以用setInt、setFloat、setString等方法;在接收方,提取數據元素使用相應的getInt、getFloat、getString等方法。
■ BytesMessage:消息正文包含一個字節數組。如果需要發送應用生成的原始數據,通常采用這一消息類型。
■ StreamMessage:消息正文包含一個Java基本數據類型(int,char,double,等等)的流。接收方讀取數據的次序跟發送方寫入數據的次序一樣,讀寫消息元素分別使用readInt和writeInt、readString和writeString之類的方法。
JMS消息對象可以用JMS會話對象(參見本文"使用JMS QueueConnection對象"部分)創建。下面的例子顯示了如何創建各類消息對象:
TextMessage textMsg = session.createTextMessage();
MapMessage mapMsg = session.createMapMessage();
ObjectMessage objectMsg = session.createObjectMessage();
BytesMessage byteMsg = session.createBytesMessage(); |
消息對象提供了設置、提取各個消息頭域的方法,下面是幾個設置和提取JMS消息頭域的例子:
// 獲取和設置消息ID
String messageID = testMsg.getJMSMessageID();
testMsg.setJMSCorrelationID(messageID);
// 獲取和設置消息優先級
int messagePriority = mapMsg.getJMSPriority();
mapMsg.setJMSPriority(1); |
另外,消息對象還為標準屬性和應用特有的屬性提供了類似的設置和提取方法。下面的例子顯示了如何設置、提取JMS消息標準屬性和應用特有屬性域:
int groupSeq = objectMsg.getIntProperty("JMSGroupSeq");
objectMsg.setStringProperty("myName", "孫悟空"); |
JMS為讀寫消息正文內容提供了許多方法。下面略舉數例,說明如何操作不同類型的消息正文:
// Text Message
TextMessage textMsg = session.createTextMessage();
textMsg.setText("消息內容文本");
// Map Message
MapMessage mapMsg = session.createMapMessage();
mapMsg.setInt(BookCatalogNumber, 100);
mapMsg.setString(BookTitle, "書籍題目");
mapMsg.setLong(BookCost, 50.00);
String bookTitle = mapMsg.getString("BookTitle");
// Object Message
ObjectMessage objectMsg = session.createObjectMessage();
Book book = new Book("WinSocks 2.0");
objectMsg.setObject(book); |
三、JMS P2P編程 在JMS P2P通信方式中,發送程序將消息放入一個隊列,根據通信要求,發送程序可以要求一個應答信息(請求-應答模式),也可以不要求立即獲得應答(發送-遺忘模式)。如果需要應答信息,發送程序通過消息頭的JMSReplayTo域向消息的接收程序聲明應答信息應當放入哪一個本地隊列。
在請求-應答模式中,發送程序可以按照兩種方式操作。一種稱為偽同步方式,發送程序在等待應答消息時會被阻塞;另一種是異步方式,發送程序發送消息后不被阻塞,可以照常執行其他處理任務,它可以在以后適當的時機檢查隊列,查看有沒有它希望得到的應答信息。下面的代碼片斷顯示了JMS程序發送消息的過程。
// 發送JMS消息
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.naming.Context;
public class MyJMSSender {
private String requestMessage;
private String messageID;
private int requestRetCode = 1;
private QueueConnectionFactory queueConnectionFactory = null;
private Queue requestQueue = null;
private Queue responseQueue = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private QueueSender queueSender = null;
private TextMessage textMsg = null;
// 其他代碼...
public int processOutputMessages(String myMessage) {
// 查找管理對象
try {
InitialContext initContext = new InitialContext();
Context env = (Context) initContext.lookup
("java:comp/env");
queueConnectionFactory = (QueueConnectionFactory)
env.lookup("tlQCF");
requestQueue = (Queue) env.lookup("tlReqQueue");
responseQueue = (Queue) env.lookup("tlResQueue");
queueConnection = queueConnectionFactory.
createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession
(true, 0);
queueSender = queueSession.createSender(requestQueue);
textMsg = queueSession.createTextMessage();
textMsg.setText(myMessage);
textMsg.setJMSReplyTo(responseQueue);
// 處理消息的代碼邏輯...
queueSender.send(textMsg);
queueConnection.stop();
queueConnection.close();
queueConnection = null;
} catch (Exception e) {
// 異常處理代碼...
}
return requestRetCode = 0;
}
} |
下面來分析一下這段代碼。JMS程序的第一個任務是找到JNDI名稱上下文的位置。對于WSAD開發環境來說,如果JMS程序屬于J2EE項目,則JNDI名稱空間的位置由WSAD測試服務器管理,運行時環境能夠自動獲知這些信息。在這種情況下,我們只要調用InitialContext類默認的構造函數創建一個實例就可以了,即:
InitialContext initContext = new InitialContext(); |
對于WSAD環境之外的程序,或者程序不是使用WSAD JNDI名稱空間,例如使用LDAP(輕量級目錄訪問協議),程序尋找JNDI名稱的操作稍微復雜一點,必須在一個Properties或Hashtable對象中設定INITIAL_CONTEXT_FACTORY和PROVIDER_URL,然后將該Properties對象或Hashtable對象作為參數調用InitialContext的構造函數。下面我們來看幾個創建InitialContext對象的例子,第一個例子顯示了運行在WSAD之外的程序如何找到WSAD InitialContext對象。
// 例一:運行在WSAD之外的程序尋找WSAD InitialContext對象
// 說明:將localhost替換為JNDI服務運行的服務器名稱
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"com.ibm.websphere.naming.WsnInitialContextFactory");
props.put(Context.PROVIDER_URL, "iiop://localhost/");
InitialContext initialContext = InitialContext(props);
// 例二:下面的例子顯示了如何找到基于文件的JNDI InitialContext
Hashtable hashTab = new Hashtable ();
hashTab.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
hashTab.put(Context.PROVIDER_URL, "file://c:/temp");
InitialContext initialContext = InitialContext(hashTab);
// 例三:下面的例子顯示了如何找到基于LDAP的JNDI InitialContext
Hashtable hashTab = new Hashtable ();
hashTab.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.ldap.LdapCtxFactory");
hashTab.put(Context.PROVIDER_URL,
"file://server.company.com/o=provider_name, c=us");
InitialContext initialContext = InitialContext(hashTab); |
獲得InitialContext對象之后,下一步是要查找java:comp/env子上下文(Subcontext),例如:
Context env = (Context) initContext.lookup("java:comp/env"; |
java:comp/env是J2EE規范推薦的保存環境變量的JNDI名稱子上下文。在這個子上下文中,JMS程序需要找到幾個由JMS管理的對象,包括QueueConnectionFactory對象、Queue對象等。
下面的代碼尋找由JMS管理的幾個對象,這些對象是JMS程序執行操作所必需的。
queueConnectionFactory = (QueueConnectionFactory) env.lookup("QCF");
requestQueue = (Queue) env.lookup("requestQueue"); |
接下來,用QueueConnectionFactory對象構造出QueueConnection對象。
queueConnection = queueConnectionFactory.createQueueConnection(); |
3.1 使用JMS QueueConnection對象 JMS QueueConnection提供了一個通向底層MOM(就本文而言,它是指WebSphere MQ隊列管理器)的連接,按照這種方式創建的連接使用默認的Java綁定傳輸端口來連接到本地的隊列管理器。
對于MQ客戶程序來說(運行在不帶本地隊列管理器的機器上的MQ程序),QueueConnectionFactory對象需要稍作調整,以便使用客戶端傳輸端口:
QueueConn.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); |
另外,剛剛創建的QueueConnection對象總是處于停止狀態,需要調用"queueConnection.start();"將它啟動。
建立連接之后,用QueueConnection對象的createQueueSession方法來創建一次會話。必須注意的是,QueueSession對象有一個單線程的上下文,不是線程安全的,因此會話對象本身以及在會話對象基礎上創建的對象都不是線程安全的,在多線程環境中必須加以保護。createQueueSession方法有兩個參數,構造一個QueueSession對象的語句類如:
queueSession = queueConnection.createQueueSession
(false, Session.AUTO_ACKNOWLEDGE); |
createQueueSession方法的第一個參數是boolean類型,它指定了JMS事務類型--也就是說,當前的隊列會話是事務化的(true)還是非事務化的(false)。JMS事務類型主要用于控制消息傳遞機制,不要將它與EJB的事務類型(NotSupported,Required,等等)混淆了,EJB的事務類型設定的是EJB模塊本身的事務上下文。createQueueSession方法的第二個參數是一個整數,指定了確認模式,也就是決定了如何向服務器證實消息已經傳到。
如果隊列會話是事務化的(調用createQueueSession方法的第一個參數是true),第二個參數的值將被忽略,因為提交事務時應答總是自動執行的。如果由于某種原因事務被回退,則不會有應答出現,視同消息尚未遞送,JMS服務器將嘗試再次發送消息。如果有多個消息參與到同一會話上下文,它們被作為一個組處理,確認最后一個消息也就自動確認了此前所有尚未確認的消息。如果發生回退,情況也相似,整個消息組將被視為尚未遞送,JMS服務器將試圖再次遞送這些消息。
下面說明一下底層的工作機制。當發送程序發出一個消息,JMS服務器接收該消息;如果消息是持久性的,服務器先將消息寫入磁盤,然后確認該消息。自此之后,JMS服務器負責把消息發送到目的地,除非它從客戶程序收到了確認信息,否則不會從臨時存儲位置刪除消息。對于非持久性的消息,收到消息并保存到內存之后確認信息就立即發出了。
如果隊列會話是非事務化的(調用createQueueSession方法的第一個參數是false),則應答模式由第二個參數決定。第二個參數的可能取值包括:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,以及CLIENT_ACKNOWLEDGE。
■ 對于非事務化的會話來說,AUTO_ACKNOWLEDGE確認模式是最常用的確認模式。對于事務化的會話,系統總是假定使用AUTO_ACKNOWLEDGE確認模式。
■ DUPS_OK_ACKNOWLEDGE模式是一種"懶惰的"確認方式。可以想到,這種模式可能導致消息提供者傳遞的一些重復消息出錯。這種確認模式只用于程序可以容忍重復消息存在的情況。
■ 在CLIENT_ACKNOWLEDGE模式中,消息的傳遞通過調用消息對象的acknowledge方法獲得確認。
在AUTO_ACKNOWLEDGE模式中,消息的確認通常在事務結束處完成。CLIENT_ACKNOWLEDGE使得應用程序能夠加快這一過程,只要處理過程一結束就立即予以確認,恰好在整個事務結束之前。當程序正在處理多個消息時,這種確認模式也是非常有用的,因為在這種模式中程序可以在收到所有必需的消息后立即予以確認。
對于非事務化的會話,一旦把消息放入了隊列,所有接收程序立即能夠看到該消息,且不能回退。對于事務化的會話,JMS事務化上下文把多個消息組織成一個工作單元,消息的發送和接收都是成組執行。事務化上下文會保存事務執行期間產生的所有消息,但不會把消息立即發送到目的地。
只有當事務化的隊列會話執行提交時,保存的消息才會作為一個整體發送給目的地,這時接收程序才可以看到這些消息。如果在事務化隊列會話期間出現錯誤,在錯誤出現之前已經成功處理的消息也會被撤銷。定義成事務化的隊列會話總是有一個當前事務,不需要顯式地用代碼來開始一個事務。當然,事務化的環境總是存在一定的代價--事務化會話總是要比非事務化會話慢。
● 注意:JMS隊列會話的事務化是一個概念,實現了JMS邏輯的Java方法的事務屬性是另一個概念,不要將兩者混淆了。TX_REQUIRED屬性表示的是方法在一個事務上下文之內運行,從而確保數據庫更新和隊列中消息的處理作為一個不可分割的工作單元執行(要么都成功提交,要么都回退)。順便說明一下,在容器管理的事務環境中,一個全局性的兩階段提交(Two-Phase Commit)事務上下文會被激活(參見本文后面的詳細說明),這時參與全局性事務的數據源應當用XA版的數據庫驅動程序構建。
相比之下,在createQueueSession方法的第一個參數中指定true建立的是JMS事務上下文:多個消息被視為一個工作單元。在消息的接收方,執行queueSession.commit()之前,即使已經收到了多個消息也不會給出確認信息;一旦queueSession.commit()執行,它就確認收到了在此之前尚未提交的所有消息;消息發送方的情況也相似。
3.2 處理回退事件 如前所述,如果事務異常終止,收到的消息會被發回到原來的隊列。接收消息的程序下一次再來處理該隊列時,它還會再次得到同一個消息,而且這一次事務很有可能還是要失敗,必須再次把消息發送回輸入隊列--如此不斷重復,就形成了無限循環的情況。
為防止這種情況出現,我們可以設置監聽端口上的Max_retry計數器,超過Max_retry計數器規定的值,接收程序就不會再收到該消息;或者對于推式的會話,消息不會再被傳遞。另外,在推式會話中,重新傳遞的事務會被設置JMSRedelivered標記,程序可以調用消息對象的getJMSRedelivered方法來檢查該標記。
消息通過QueueSender JMS對象發送,QueueSender對象利用QueueSession對象的createSender方法創建,每一個隊列都要創建一個QueueSender對象:
queueSender = queueSession.createSender(requestQueue); |
接下來創建一個消息(TextMessage類型),根據myMessage字符串的值設置消息的內容,myMessage變量作為輸入參數傳遞給queueSession.createTextMessag方法。
textMsg = queueSession.createTextMessage(myMessage); |
指定接收隊列,告訴接收消息的程序要把應答放入哪一個隊列:
textMsg.setJMSReplyTo(responseQueue); |
最后,用Sender對象發送消息,然后停止并關閉連接。
queueSender.send(textMsg);
queueConnection.stop();
queueConnection.close(); |
發出消息之后,可以調用消息對象的getJMSMessageID方法獲得JMS賦予消息的ID(即提取JMSMessageID消息頭域),以后就可以通過這一ID尋找應答消息:
String messageID = message.getJMSMessageID(); |
如果有JMS連接池,釋放后的會話不會被拆除,而是被返回給連接池以供重用。
3.3 關閉JMS對象 垃圾收集器不一定會及時關閉JMS對象,如果程序要創建大量短期生存的對象,可能會引起問題,至少會浪費大量寶貴的資源,所以顯式地釋放所有不用的資源是很重要的。
if (queueConn != null) {
queueConn.stop();
queueConn.close();
queueConn = null;
} |
關閉隊列連接對象將自動關閉所有利用該連接對象創建的對象。但個別JMS提供者例外,這時請按照下面代碼所示的次序關閉所有JMS對象。
// 關閉JMS對象
if (queueReceiver != null) {
queueReceiver.close();
queueReceiver = null;
}
if (queueSender != null) {
queueSender.close();
queueSender = null;
}
if (queueSession != null) {
queueSession.close();
queueSession = null;
}
if (queueConn != null) {
queueConn.stop();
queueConn.close();
queueConn = null;
} |
3.4 接收消息 消息接收方的處理邏輯也和發送方的相似。消息由JMS QueueReceiver對象接收,QueueReceiver對象建立在為特定隊列創建的QueueSession對象的基礎上。差別在于QueueReceiver接收消息的方式--QueueReceiver對象能夠按照偽同步或異步方式接收消息。下面的代碼顯示了如何用偽同步方式接收消息。
// 偽同步方式接收消息
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.Exception;
import javax.naming.InitialContext;
public class MyJMSReceiver {
private String responseMessage;
private String messageID;
private int replyRetCode = 1;
private QueueConnectionFactory queueConnectionFactory = null;
private Queue inputQueue = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private QueueReceiver queueReceiver = null;
private TextMessage textMsg = null;
public void processIncomingMessages() {
// 查找管理對象
InitialContext initContext = new InitialContext();
Context env = (Context) initContext.lookup("java:comp/env");
queueConnectionFactory = (QueueConnectionFactory)
env.lookup("tlQCF");
inputQueue = (Queue) env.lookup("tlQueue");
queueConnection = queueConnectionFactory.
createQueueConnection();
queueConnection.start();
queueSession=queueConnection.createQueueSession(true, 0);
queueReceiver = queueSession.createReceiver(inputQueue);
// 等一秒鐘,看看是否有消息到達
TextMessage inputMessage = queueReceiver.receive(1000);
// 其他處理代碼...
queueConnection.stop();
queueConnection.close();
}
} |
下面分析一下上面的代碼。消息由QueueReceiver對象執行receive方法接收。receive方法有一個參數,它指定了接收消息的等待時間(以毫秒計)。在上面的代碼中,QueueReceiver對象在一秒之后超出期限,解除鎖定,把控制返回給程序。如果調用receive方法時指定了等待時間,QueueReceiver對象在指定的時間內被鎖定并等待消息,如果超出了等待時間仍無消息到達,QueueReceiver對象超時并解除鎖定,把控制返回給程序。
接收消息的方法還有一個"不等待"的版本,使用這個方法時QueueReceiver對象檢查是否有消息之后立即返回,將控制交還給程序。下面是一個例子:
TextMessage message = queueReceiver.receiveNoWait(); |
如果調用receive方法時不指定參數,QueueReceiver對象會無限期地等待消息。采用這種用法時應當非常小心,因為程序會被無限期地鎖定。下面是一個無限期等待消息的例子:
TextMessage message = queueReceiver.receive(); |
不管等待期限的參數設置了多少,這都屬于拉式消息傳遞,如前所述,這種消息傳遞方式的效率較低。不僅如此,這種方法還不適合于J2EE EJB層,不能用于EJB組件之內,原因稍后就會說明。不過,這種處理方式適合在Servlet、JSP和普通Java JMS應用中使用。
接收消息的第二種辦法是異步接收。用異步接收方式時,QueueReceiver對象必須用setMessageListener(class_name)方法注冊一個MessageListener類,其中class_name參數可以是任何實現了onMessage接口方法的類。在下面這個例子中,onMessage方法由同一個類實現(為簡單計,這里沒有給出try/catch塊的代碼)。
● 注意:接下來的消息接收方式不適用于EJB組件,這些代碼僅適用于Servlet、JSP和普通的Java JMS應用程序。
// 消息監聽類的例子
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.Queue;
import javax.jms.Exception;
import javax.naming.InitialContext;
public class MyListenerClass implements javax.jms.MessageListener {
private int responseRetCode = 1;
private boolean loopFlag = true;
private QueueConnectionFactory queueConnectionFactory = null;
private Queue responseQueue = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private QueueSender queueSender = null;
public void prepareEnvironment(String myMessage) {
// 查找管理對象
InitialContext initContext = new InitialContext();
Context env = (Context) initContext.lookup("java:comp/env");
queueConnectionFactory = (QueueConnectionFactory)
env.lookup("tlQCF");
responseQueue = (Queue) env.lookup("tlResQueue");
queueConnection = queueConnectionFactory.
createQueueConnection();
queueSession = queueConnection.createQueueSession
(true, 0);
queueReceiver = queueSession.createReceiver
(responseQueue);
queueReceiver.setMessageListener(this)
queueConnection.start();
}
public void onMessage(Message message) {
// 希望收到一個文本消息...
if (message instanceof TextMessage) {
String responseText =
"確認消息傳遞:" + ((TextMessage) message).getText();
// 當收到一個以@字符開頭的消息時,循環結束,
// MessageListener終止
if (responseText.charAt(0) == '@') {
loopFlag = 1; // 結束處理;
} else {t
// 繼續處理消息
// 本例中我們知道應答消息的隊列,并非真的要用到
//message.getJMSReplyTo
// 這只是一個如何獲取應答消息隊列的例子
Destination replyToQueue=message.getJMSReplyTo();
// 設置應答消息
TextMessage responseMessage =
responseSession.createTextMessage(responseText);
// 使CorrelationID等于消息ID,
//這樣客戶程序就能將應答消息和原來的請求
// 消息對應起來
messageID = message.getJMSMessageID();
responseMessage.setJMSCorrelationID(messageID);
// 設置消息的目的地
responseMessage.setJMSDestination(
replyToQueue)
queueSender.send(
responseMessage);
}
}
}
// 保持監聽器活動
while (loopFlag) {
// 將控制轉移到其他任務(休眠2秒)
System.out.println("正處于監聽器循環之內...");
Thread.currentThread().sleep(2000);
}
// 當loopFlag域設置成flase時,結束處理過程
queueConn.stop();
queueConnection.close();
} |
注冊一個MessageListener對象時,一個實現了MessageListener邏輯的新線程被創建。我們要保持這個線程處于活動狀態,因此使用了一個while循環,首先讓線程休眠一定的時間(這里是2秒),將處理器的控制權轉讓給其他任務。當線程被喚醒時,它檢查隊列,然后再次進入休眠狀態。當一個消息到達當前注冊的MessageListener對象所監視的隊列時,JMS調用MessageListener對象的onMessage(message)方法,將消息作為參數傳遞給onMessage方法。
這是一種推式的消息接收方式,程序效率較高,但仍不適合在EJB組件之內使用。下面將探討為什么這些接收消息的方式都不能用于EJB組件之內,然后給出解決辦法。雖然這部分內容放入了P2P通信方式中討論,但其基本思路同樣適用于Pub/Sub通信方式。
3.5 消息驅動的Bean 在前文討論JMS消息接收處理邏輯的過程中,我們看到的代碼僅僅適用于Servlet、JSP以及普通的Java應用程序,但不適用于EJB,因為在JMS的接收端使用EJB存在一些技術問題。一般地,JMS程序的交互模式分兩種:
■ 發送-遺忘:JMS客戶程序發出消息,但不需要應答。從性能的角度來看,這是最理想的處理模式,發送程序不需要等待對方響應請求,可以繼續執行自己的任務。
■ 同步請求-答復:JMS客戶程序發出消息并等待答復。在JMS中,這種交互方式通過執行一個偽同步的接收方法(前文已經討論)實現。然而,這里可能出現問題。如果EJB模塊在一個事務上下文之內執行(通常總是如此),單一事務之內無法執行請求-答復處理,這是因為發送者發出一個消息之后,只有當發送者提交了事務,接收者才能收到消息。因此,在單一事務之內是不可能得到應答的,因為在一個未提交的事務上下文之內接收程序永遠收不到消息,當然也不可能作出應答了。解決的辦法是請求-答復必須通過兩個不同的事務執行。
對于EJB來說,在JMS通信的接收端還可能出現另一個EJB特有的問題。在異步通信中,何時能夠獲得應答是不可預料的,異步通信的主要優點之一就是發送方能夠在發出消息之后繼續當前的工作,即使接收方當時根本不能接收消息也一樣,但請求-答復模式卻隱含地假定了EJB組件(假定是一個會話Bean)應該在發出消息之后等待答復。J2EE實際上是一個基于組件技術的事務處理環境,它的設計目標是處理大量短期生存的任務,卻不是針對那些可能會被長期阻塞來等待應答的任務。
為了解決這個問題,Sun在EJB 2.0規范中加入了一種新的EJB類型,即MDB。MDB是專門為JMS異步消息環境中(接收端)EJB組件面臨的問題而設計的,其設計思路是把監聽消息是否到達的任務從EJB組件移到容器,由于這個原因,MDB組件總是在容器的控制之下運行,容器代替MDB監聽著特定的隊列或主題,當消息到達該隊列或者主題,容器就激活MDB,調用MDB的onMessage方法,將收到的消息作為參數傳遞給onMessage方法。
MDB是一種
異步組件,它的工作方式和其他同步工作的EJB組件(會話Bean,實體Bean)不同,MDB沒有Remote接口和Home接口,客戶程序不能直接激活MDB,MDB只能通過收到的消息激活。MDB的另一個重要特點是它對事務和安全上下文的處理方式與眾不同,
MDB已經徹底脫離了客戶程序,因此也不使用客戶程序的事務和安全上下文。
發送JMS消息的遠程客戶程序完全有可能運行在不同的環境之中--非J2EE的環境,例如普通的Java應用程序,可能根本沒有任何安全或事務上下文。因此,發送方的事務和安全上下文永遠不會延續到接收方的MDB組件。由于MDB永遠不會由客戶程序直接激活,所以它們也永遠不可能在客戶程序的事務上下文之內運行。由于這個原因,下面這些事務屬性對于MDB來說沒有任何意義--Supports,RequiresNew,Mandatory,以及None,因為這些事務屬性隱含地意味著延用客戶程序的事務上下文。
MDB可以使用的事務屬性只有兩個:NotSupported和Required。如果一個MDB組件有NotSupported事務屬性,則它的消息處理過程不屬于任何事務上下文。
就象其他類型的EJB一樣,MDB可能參與到兩種類型的事務:Bean管理的事務,容器管理的事務。MDB組件的所有方法中,只有onMessage方法可以參與到事務上下文。如果讓MDB參與到Bean管理的事務上下文,則表示允許MDB在onMessage方法之內開始和結束一個事務。
這種策略的問題在于,收到的消息總是位于onMessage方法之內開始的事務之外(要讓消息參與其中已經太遲了)。在這種情況下,如果由于某種原因必須回退,則消息必須手工處理。
如果讓MDB參與到容器管理的事務上下文,情況就完全不同了。
只要設置了Required事務屬性,容器就可以在收到消息時開始一個事務,這樣,消息就成為事務的一部分,當事務成功提交時,或者當事務被回退而消息被返回到發送隊列時,都可以獲得明確的確認信息。
事務可能在兩種情形下回退:第一種情形是程序顯式地調用
setRollBackOnly方法,第二種情形是在onMessage方法之內拋出一個
系統異常(注意,拋出應用程序異常不會觸發回退動作)。當事務回退時,消息就被返回到原來的隊列,監聽器將再次發送消息要求處理。一般地,這種情況會引起無限循環,最后導致應用運行不正常。
Max_retries屬性可以用來控制監聽器再次提取重發消息的次數(這個屬性在配置監聽器端口的時候設置),超過Max_retries規定的限制值之后,監聽器將停止處理該消息(顯然,這算不上最理想的處理方式)。
如果用WebSphere MQ作為JMS提供者,還有一種更好的解決辦法。
我們可以配置WebSphere MQ,使其嘗試一定的次數之后停止發送同一消息,并將消息放入一個特定的錯誤隊列或Dead.Letter.Queue。記住,MDB是無狀態的組件,也就是說它不會在兩次不同的方法調用之間維持任何狀態信息(就象Web服務器處理HTTP請求的情況一樣),同時,由于客戶程序永遠不會直接調用MDB組件,所以
MDB組件不能識別任何特定的客戶程序,在這個意義上,可以認為MDB組件是"匿名"運行的。所有MDB組件都必須實現javax.ejb.MessageDrivenBean接口和javax.jms.MessageListener接口。
除了onMessage方法之外,MDB還有其他幾個
回調方法--由容器調用的方法:
■ ejbCreate方法:容器調用該方法來創建MDB的實例。在ejbCreate方法中可以放入一些初始化的邏輯。
■ setMessageDrivenContext方法:當Bean第一次被加入到MDB Bean的緩沖池時容器將調用該方法。setMessageDrivenContext方法通常用來捕獲MessageDrivenContext,并將setMessageDrivenContext保存到類里面的變量,例如:
public void setMessageDrivenContext(java.ejb.
MessageDrivenContext mdbContext) {
messageDrivenContext = mdbContext;
} |
■ ejbRemove方法:容器把Bean從緩沖池移出并拆除時會調用該方法。一般地,ejbRemove方法會執行一些清理操作。
一般而言,在onMessage方法之內執行業務邏輯是不推薦的,業務方面的操作最好委派給其他EJB組件執行。
MDB容器會自動控制好并發處理多個消息的情形。每一個MDB實例處理一個消息,在onMessage方法把控制返回給容器之前,不會要求MDB同時處理另一個消息。如果有多個消息必須并行處理,容器會激活同一MDB的多個實例。
從WebSphere 5.0開始,開發環境(WSAD 5.0)和運行環境(WAS 5.0)都開始全面支持MDB。
下面的代碼片斷顯示了一個MDB的概念框架。
// MDB概念框架
// package 聲明略
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.naming.InitialContext;
import java.util.*;
public class LibraryNotificationBean
implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
MessageDrivenContext messageDrivenContext;
Context jndiContext;
public void setMessageDrivenContext(MessageDrivenContext
msgDrivenContext) {
messageDrivenContext = msgDrivenContext;
try {
jndiContext = new InitialContext();
} catch (NamingException ne) {
throw new EJBException(ne);
}
}
public void ejbCreate() {
}
public void onMessage(Message notifyMsg) {
try {
MapMessage notifyMessage = (MapMessage) notifyMsg;
String bookTitle = (String) notifyMessage.
getString("BookTitle");
String bookAuthor = (String) notifyMessage.
getString("BookAuthor");
String bookCatalogNumber =
(String) notifyMessage.getString
("bookCatalogNumber");
Integer bookQuantity =
(Integer) notifyMessage.getInteger("BookQuantity");
// 處理消息...(調用其他EJB組件)
} catch (Exception e) {
throw new EJBException(e);
}
}
public void ejbRemove() {
try {
jndiContext.close();
jndiContext = null;
} catch (NamingException ne) {
// 異常處理
}
}
} |
3.6 消息持久化 消息可以是持久性的,也可以是非持久性的,持久性的消息和非持久性的消息可以放入同一個隊列。持久性的消息會被寫入磁盤,即使系統出現故障也可以恢復。當然,正象其他場合的持久化操作一樣,消息的持久化也會增加一定的開銷,持久性消息大約要慢7%。控制消息持久化的辦法之一是定義隊列時設置其屬性,如果沒有顯式地設置持久化屬性,系統將采用默認值。另外,JMS應用程序本身也可以定義持久化屬性:
■ PERSISTENCE(QDEF):繼承隊列的默認持久化屬性值。
■ PERSISTENCE(PERS):持久性的消息。
■ PERSISTENCE(NON):非持久性的消息。
消息的持久性還可以通過消息屬性頭
JMSDeliveryMode來調節,JMSDeliveryMode可以取值DeliveryMode.PERSISTENT或DeliveryMode.NON_PERSISTENT,前者表示消息必須持久化,后者表示消息不需持久化。在事務化會話中處理的消息總是持久性的。
3.7 消息選擇器 JMS提供了從隊列選取一個消息子集的機制,能夠過濾掉所有不滿足選擇條件的消息。選擇消息的條件不僅可以引用消息頭的域,還可以引用消息屬性的域。下面是如何利用這一功能的例子:
QueueReceiver queueReceiver =
queueSession.createReceiver(requestQueue,
"BookTitle = 'Windows 2000'");
QueueBrowser queueBrowser = queueSession.createBrowser
(requestQueue, "BookTitle = 'Windows 2000'
AND BookAuthor = 'Robert Lee'"); |
注意,字符串(例如'Windows 2000')被一個雙引號之內的單引號對包圍。
四、JMS Pub/Sub編程 Pub/Sub通信方式的編程與P2P通信編程相似,兩者最主要的差別是消息發送的目的地對象。在Pub/Sub通信方式中,發布消息的目的地和消費消息的消息源是一個稱為主題(Topic)的JMS對象。主題對象的作用就象是一個虛擬通道,其中封裝了一個Pub/Sub的目的地(Destination)對象。
在P2P通信方式中,(發送到隊列的)消息只能由一個消息消費者接收;但在Pub/Sub通信方式中,消息生產者發布到主題的消息可以分發到多個消息消費者,而且,消息的生產者和消費者之間的結合是如此寬松,以至于
生產者根本不必了解任何有關消息消費者的情況,消息生產者和消費者都只要知道一個公共的目的地(也就是雙方"交談"的主題)。
由于這個原因,消息的生產者通常稱為出版者,消息的消費者則相應地稱為訂閱者。出版者為某一主題發布的所有消息都會傳遞到該主題的所有訂閱者,訂閱者將收到它訂閱的主題上的所有消息,每一個訂閱者都會收到一個消息的副本。訂閱可以是
耐久性的(Durable)或非耐久性(Nondurable)。非耐久性的訂閱者只能收到訂閱之后才發布的消息。
耐久性訂閱者則不同,它可以中斷之后再連接,仍能收到在它斷線期間發布的消息。Put/Sub通信方式中耐久性連接(在某種程度上)類似于P2P通信中的持久性消息/隊列。出版者和訂閱者永遠不會直接通信,Pub/Sub代理的作用就象是一個信息發布中心,把所有消息發布給它們的訂閱者。
● 注意:從WebSphere MQ 5.2開始,加裝了MA88和MA0C擴展的WebSphere MQ可以當作JMS Pub/Sub通信代理。此外,WebSphere MQ Integrator也能夠作為一個代理用。從MQ 5.3開始,MA88成了MQ基本軟件包的一部分,因此只要在MQ 5.3的基礎上安裝MA0C就可以了。在MQ JMS環境中,要讓Pub/Sub通信順利運行,運行Pub/Sub代理的隊列管理器上必須創建一組系統隊列。
MQ JMS MA0C擴展模塊提供了一個工具來構造所有必需的Pub/Sub系統隊列,這個工具就是MQJMS_PSQ.mqsc,位于
\java\bin目錄之下。要構造Pub/Sub通信方式所需的系統隊列,只需從上述目錄執行命令:runmqsc < MQJMS_PSQ.mqsc。
多個主題可以組織成一種樹形的層次結構,樹形結構中主題的名稱之間用一個斜杠(/)分隔--例如,Books/UnixBooks/SolarisBooks。如果要訂閱一個以上的主題,可以在指定主題名字的時候使用通配符,例如,"Books/#"就是一個使用通配符的例子。
下面給出了一個JMS Pub/Sub通信的例子(為簡單計,這里省略了try/catch代碼塊)。在這個例子中,Books/ UnixBooks/SolarisBooks主題的訂閱者將收到所有發布到SolarisBooks的消息,Books/#主題的訂閱者將收到所有有關Books的消息(包括發布到UnixBooks和SolarisBooks主題的消息)。
// JMS Pub/Sub通信
import javsx.jms.*;
import javax.naming.*;
import javax.ejb.*;
public class PublisherExample implements javax.ejb.SessionBean {
private TopicConnectionFactory topicConnFactory = null;
private TopicConnection topicConnection = null;
private TopicPublisher topicPublisher = null;
private TopicSession topicSession = null;
private SessionContext sessionContext = null;
public void setSessionContext(SessionContext ctx) {
sessionContext = cts;
}
public void ejbCreate() throws CreateException {
InitialContext initContext = new InitialContext();
// 從JNDI查找主題的連接工廠
topicConnFactory =(TopicConnectionFactory)
initContext.lookup("java:comp/env/TCF");
// 從JNDI查找主題
Topic unixBooksTopic =
(Topic) initContext.lookup("java:comp/env/UnixBooks");
Topic javaBooksTopic =
(Topic) initContext.lookup("java:comp/env/JavaBooks");
Topic linuxBooksTopic =
(Topic) initContext.lookup("java:comp/env/LinuxBooks");
Topic windowsBooksTopic =
(Topic) initContext.lookup("java:comp/env/WindowsBooks");
Topic allBooksTopic =
(Topic) initContext.lookup("java:comp/env/AllBooks");
// 創建連接
topicConnection = topicConnFactory.createTopicConnection();
topicConn.start();
// 創建會話
topicSession =
topicConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
}
public void publishMessage(String workMessage,
String topicToPublish) {
// 創建一個消息
TextMessage message = topicSession.createTextMessage
(workMessage); // 創建出版者,發布消息
if ((topicToPublish.toLowerCase()).equals("java")) {
TopicPublisher javaBooksPublisher =
topicSession.createPublisher(javaBooksTopic);
javaBooksPublisher.publish(message);
}
if ((topicToPublish.toLowerCase()).equals("unix")) {
TopicPublisher unixBooksPublisher =
topicSession.createPublisher(unixBooksTopic);
J2EE Enterprise Messaging 475 unixBooksPublisher.
publish(message);
}
if ((topicToPublish.toLowerCase()).equals("linux")) {
TopicPublisher linuxBooksPublisher =
topicSession.createPublisher(linuxBooksTopic);
linuxBooksPublisher.publish(message);
}
if ((topicToPublish.toLowerCase()).equals("windows")) {
TopicPublisher windowsBooksPublisher =
topicSession.createPublisher(windowsBooksTopic);
windowsBooksPublisher.publish(message);
}
TopicPublisher allBooksPublisher =
topicSession.createPublisher(allBooksTopic);
allBooksPublisher.publish(message);
}
public void ejbActivate() {
}
public void ejbPassivate() {
}
public void ejbRemove() {
// 清理工作...
if (javaBooksPublisher != null) {
javaBooksPublisher.close();
javaBooksPublisher = null;
}
if (unixBooksPublisher != null) {
unixBooksPublisher.close();
Chapter 9 476 unixBooksPublisher = null;
}
if (linuxBooksPublisher != null) {
linuxBooksPublisher.close();
linuxBooksPublisher = null;
}
if (windowsBooksPublisher != null) {
windowsBooksPublisher.close();
windowsBooksPublisher = null;
}
if (allBooksPublisher != null) {
allBooksPublisher.close();
allBooksPublisher = null;
}
if (topicSession != null) {
topicSession.close();
topicSession = null;
}
if (topicConnection != null) {
topicConnection.stop();
topicConnection.close();
topicConnection = null;
}
} |
這段代碼比較簡單,想來不需要多余的解釋了。唯一值得一提的地方是如何將一個消息發布到不同的主題:對于每一個特定的主題,分別創建一個對應的出版者,然后用它將消息發布到主題。
如果一個MDB組件只負責接收消息,把所有其他的消息處理操作都委托給專用業務組件(這意味著MDB之內不包含消息發送或發布邏輯),MDB的代碼就相當于P2P通信方式中的處理代碼,使用MDB唯一的改變之處是將監聽端口從監聽一個隊列改為監聽一個主題。有興趣的讀者可以自己試驗一下雙重監聽的實現方式。
五、二階段提交的事務
在企業級處理任務中,為了保證JMS或非JMS代碼處理業務邏輯過程的完整性(對于一個單元的工作,要么成功提交所有的處理步驟,要么全部回退所有處理步驟),操作一般要在一個事務上下文之內進行。除了將消息放入隊列的過程之外,如果還要向數據庫插入記錄(二階段提交,要么全部成功,要么全部失敗),事務上下文的重要性尤其突出。
為了支持二階段提交,JMS規范定義了下列XA版的JMS對象:XAConnectionFactory、XAQueueConnectionFactory、XASession、XAQueueSession、XATopicConnectionFactory、XATopicConnection,以及XATopicSession。另外,凡是參與全局事務的所有資源均應該使用其XA版。特別地,對于JDBC資源,必須使用JDBC XADataSource。最后一點是,全局事務由JTA TransactionManager控制。下面的代碼顯示了建立全局事務所需的步驟。
// 配置全局事務
// 從JNDI名稱空間獲取一個JTA TransactionManager
TransactionManager globalTxnManager =
jndiContext.lookup("java:comp/env/txt/txnmgr");
// 啟動全局事務
globalTxnManager.begin();
// 獲取事務對象
Transaction globalTxn = globalTxnManager.getTransaction();
// 獲取XA數據資源
XADatasource xaDatasource = jndiContext.lookup
("java:comp/env/jdbc/datasource");
// 獲取一個連接
XAConnection jdbcXAConn = xaDatasource.getConnection();
// 從XA連接獲取XAResource
XAResource jdbcXAResource = jdbcXAConn.getXAResource();
// 在全局事務中"征募"XAResource
globalTxn.enlist(jdbcXAResource);
// 獲取XA隊列連接
XAQueueConnectionFactory xaQueueConnectionFactory =
JndiContext.lookup("java:comp/env/jms/xaQCF")
XAQueueConnection xaQueueConnection =
XaQueueConnectionFactory.createXAQueueConnection();
// 獲取XA隊列會話
XAQueueSession xaQueueSession = xaQueueConnection.
createXAQueueSession();
// 從會話獲取XA資源
XAResource jmsXAResource = xaQueueSession.getXAResource();
// 在全局事務中"征募"該XAResource
globalTxn.enlist(jmsXAResource);
// 其他處理工作...
// 提交全局事務
globalTxn.commit(); |
總結:本文介紹了JMS及其在WSAD環境下的編程,探討了JMS異步通信的主要優點,以及兩種通信方式(P2P和Pub/Sub)、MDB、JMS事務、二階段提交全局事務等方面的概念。希望本文的介紹對你有所幫助。