http://news.ccidnet.com/pub/article/c1060_a59896_p2.html
提綱:
一、JMS基本概念
1.1 P2P通信
1.2 Pub/Sub通信
二、JMS消息
三、JMS P2P編程
3.1 使用JMS QueueConnection對(duì)象 3.2 處理回退事件
3.3 關(guān)閉JMS對(duì)象
3.4 接收消息
3.5 消息驅(qū)動(dòng)的Bean
3.6 消息持久化
3.7 消息選擇器
四、JMS Pub/Sub編程
五、二階段提交的事務(wù)
━━━━━━━━━━━━━━━━━━━━━━━━━━
EJB 2.0和J2EE 1.3規(guī)范開始提供對(duì)Java消息服務(wù)(JMS)的支持。在J2EE 1.3加入JMS之前,J2EE環(huán)境中的組件通過RMI-IIOP協(xié)議通信,J2EE是一個(gè)完全同步的平臺(tái)。由于在J2EE 1.3規(guī)范中引入了JMS,J2EE環(huán)境開始具備一項(xiàng)極其重要的功能--異步通信。
● 說明:RMI-IIOP是Java遠(yuǎn)程方法調(diào)用(RMI,Remote Method Invocation)的一個(gè)兼容CORBA的版本,CORBA是Common Object Request Broker Architecture的縮寫,即公用對(duì)象請(qǐng)求代理(調(diào)度)體系結(jié)構(gòu)。RMI-IIOP通過CORBA平臺(tái)和語言中立的通信協(xié)議發(fā)送RMI消息。
為了支持異步通信,J2EE 1.3規(guī)范還引入了一種新的EJB類型:消息驅(qū)動(dòng)的Bean,即Message Driven Bean,簡(jiǎn)稱MDB。如前所述,在JMS之前,J2EE原來是一個(gè)建立在Java RMI-IIOP通信協(xié)議基礎(chǔ)上的同步環(huán)境,但MDB卻具有接收異步消息的能力。
異步通信使得企業(yè)應(yīng)用能夠建立在一種全新的通信機(jī)制之上,它具有如下重要優(yōu)點(diǎn):
■ 異步通信程序不直接交換信息。由于這個(gè)原因,通信的任意一方能夠在對(duì)方停止響應(yīng)時(shí)繼續(xù)自己的操作,例如當(dāng)接收消息的一方不能響應(yīng)時(shí),發(fā)送方仍能夠發(fā)出消息。
■ 異步通信有助于提高可靠性。支持異步通信的中間件軟件提供了有保證的(可靠的)消息遞送機(jī)制,即使整個(gè)環(huán)境出現(xiàn)問題也能夠保證消息傳遞到目的地。Internet是一種不可靠的通信媒體,對(duì)于通過Internet通信的應(yīng)用來說,異步通信的這一特點(diǎn)非常富有吸引力。
■ 發(fā)送異步消息的程序不會(huì)在等待應(yīng)答的時(shí)候被鎖定,從而極大地有利于提高性能。
JMS本身不是一個(gè)通信軟件,而是一個(gè)標(biāo)準(zhǔn)的應(yīng)用編程接口(API),用來建立廠商中立的異步通信機(jī)制。從這個(gè)意義上說,JMS類似于JDBC和JNDI,例如就JDBC來說,它要求有一個(gè)底層的數(shù)據(jù)庫(kù)提供者,JMS則要求有一個(gè)支持JMS標(biāo)準(zhǔn)的底層異步通信中間件提供者,一般稱為面向消息的中間件(Message-Oriented Middleware,MOM)。
MOM是一種支持應(yīng)用程序通過交換消息實(shí)現(xiàn)異步通信的技術(shù)。在某種程度上,異步通信有點(diǎn)象是人們通過email進(jìn)行通信;類似地,同步通信的程序就象是人們通過電話進(jìn)行通信。在異步通信過程中,程序的結(jié)合方式是寬松的,換句話說,異步通信的程序并不直接相互聯(lián)系,而是通過稱為隊(duì)列(Queue)或主題(Topic)的虛擬通道聯(lián)系。
同時(shí),異步通信還意味著消息通過一個(gè)中轉(zhuǎn)站,按照存儲(chǔ)-轉(zhuǎn)發(fā)的方式傳遞,即使通信的接收方當(dāng)時(shí)并未運(yùn)行,另一方(發(fā)送方)的程序也能夠順利發(fā)出消息,一旦接收方的程序后來開始運(yùn)行,消息就會(huì)傳遞給它。
JMS通信主要的優(yōu)點(diǎn)是提供了一個(gè)環(huán)境,在這個(gè)環(huán)境中各個(gè)程序通過標(biāo)準(zhǔn)的API進(jìn)行通信,開發(fā)者不必再面對(duì)各種不同操作系統(tǒng)、數(shù)據(jù)表示方式、底層協(xié)議所帶來的復(fù)雜性。
本文討論了JMS編程的基本概念以及兩種JMS通信方式:第一種是端對(duì)端通信(Point-to-Point,P2P)方式,第二種是出版/訂閱(Publish/Subscribe,Pub/Sub)方式,另外還涵蓋了JMS消息結(jié)構(gòu)、JMS主要對(duì)象、MDB、JMS和WebSphere編程、消息持久化以及JMS事務(wù)支持方面的問題。
一、JMS基本概念 鑒于JMS是一種比較新的技術(shù),所以本文將首先詳細(xì)介紹JMS編程的一般概念,然后再探討WSAD 5.0環(huán)境下的JMS開發(fā)。如前所示,JMS程序本身并不直接相互通信,消息被發(fā)送給一個(gè)臨時(shí)中轉(zhuǎn)站--隊(duì)列或主題。隊(duì)列和主題都是能夠收集消息的中轉(zhuǎn)對(duì)象,但兩者支持的消息傳遞機(jī)制又有所不同,分別對(duì)應(yīng)兩種不同的通信方式--P2P和Pub/Sub。
1.1 P2P通信 P2P消息傳遞又可以按照推(Push)和拉(Pull)兩種方式運(yùn)作。在P2P拉方式中,程序通過稱為隊(duì)列的虛擬通道通信:在通信會(huì)話的發(fā)送方,發(fā)送程序把一個(gè)消息"放入"隊(duì)列,在接收方,接收程序定期掃描隊(duì)列,尋找它希望接收和處理的消息。和推方式相比,拉方式的消息傳遞效率較低,因?yàn)樗枰芏鴱?fù)始地檢查消息是否到達(dá),這個(gè)過程會(huì)消耗一定的資源。另外必須注意的一點(diǎn)是,當(dāng)接收方發(fā)現(xiàn)一個(gè)需要處理的消息時(shí),它就會(huì)"提取"消息,從效果上看等于從隊(duì)列刪除了消息。
因此,即使有多個(gè)接收程序在處理同一個(gè)隊(duì)列,對(duì)于某一特定的消息來說,總是只有一個(gè)接收者。JMS程序可以使用多個(gè)隊(duì)列,每一個(gè)隊(duì)列可以由多個(gè)程序處理,但是只有一個(gè)程序才會(huì)收到某個(gè)特定的消息。
在P2P推方式的消息傳遞中,發(fā)送程序的工作原理也一樣,它把消息發(fā)送到隊(duì)列,但接收程序的工作原理有所不同。接收程序?qū)崿F(xiàn)了一個(gè)Listener接口,包括實(shí)現(xiàn)了該接口中的onMessage回調(diào)方法,在J2EE環(huán)境中監(jiān)聽隊(duì)列接收消息的任務(wù)交給了容器,每當(dāng)一個(gè)新的消息達(dá)到隊(duì)列,容器就調(diào)用onMessage方法,將消息作為參數(shù)傳遞給onMessage方法。
P2P通信最重要的特點(diǎn)(不管是推還是拉)是:每一個(gè)消息總是只由一個(gè)程序接收。一般而言,P2P程序在通信過程中參與的活動(dòng)較多,例如,發(fā)送程序可以向接收程序指出應(yīng)答消息應(yīng)當(dāng)放入哪一個(gè)隊(duì)列,它還可以要求返回一個(gè)確認(rèn)或報(bào)告消息。
1.2 Pub/Sub通信 在Pub/Sub通信方式中,程序之間通過一個(gè)主題(Topic)實(shí)現(xiàn)通信,用主題作為通信媒介要求有Pub/Sub代理程序的支持(稍后詳細(xì)介紹)。在消息發(fā)送方,生產(chǎn)消息的程序向主題發(fā)送消息;在接收方,消息的消費(fèi)程序向感興趣的主題訂閱消息。當(dāng)一個(gè)消息到達(dá)主題,所有向該主題訂閱的消費(fèi)程序都會(huì)通過onMessage方法的參數(shù)收到消息。
這是一種推式的消息傳遞機(jī)制。可以設(shè)想,會(huì)有一個(gè)以上的消費(fèi)程序收到同一消息的副本。相比之下,程序在Pub/Sub通信過程中參與的活動(dòng)較少,當(dāng)生產(chǎn)者程序向某個(gè)特定的隊(duì)列發(fā)送消息,它不知道到底會(huì)有多少程序接收到該消息(可能有多個(gè),可能沒有)。通過訂閱主題實(shí)現(xiàn)的通信是一種比較靈活的通信方式,主題的訂閱者可以動(dòng)態(tài)地改變,卻不需要改動(dòng)底層的通信機(jī)制,而且它對(duì)整個(gè)通信過程完全透明。
Pub/Sub方式的通信要求有Pub/Sub代理的支持。Pub/Sub代理是一種協(xié)調(diào)、控制消息傳遞過程,保證消息傳遞到接收方的程序。P2P通信方式中程序利用一個(gè)隊(duì)列作為通信的中轉(zhuǎn)站,相比之下,Pub/Sub通信方式中程序直接交互的是特殊的代理隊(duì)列,這就是我們要在WebSphere MQ常規(guī)安裝方式之上再裝一個(gè)MA0C的原因(只有用WebSphere MQ作為JMS提供者時(shí)才必須如此。要求使用MQ 5.3.1或更高的版本),MA0C就是一個(gè)支持Pub/Sub方式通信的代理軟件。
QueueConnectionFactory和TopicConnectionFactory對(duì)象是創(chuàng)建相應(yīng)的QueueConnection對(duì)象和TopicConnection對(duì)象的JMS工廠類對(duì)象,JMS程序正是通過QueueConnection對(duì)象和TopicConnection對(duì)象連接到底層的MOM技術(shù)。
二、JMS消息 基于JMS的程序通過交換JMS消息實(shí)現(xiàn)通信,JMS消息由三部分構(gòu)成:消息頭,消息屬性(可選的),消息正文。消息頭有多個(gè)域構(gòu)成,包含了消息遞送信息和元數(shù)據(jù)。
消息屬性部分包含一些標(biāo)準(zhǔn)的以及應(yīng)用特定的域,消息選擇器依據(jù)這些信息來過濾收到的消息。JMS定義了一組標(biāo)準(zhǔn)的屬性,要求MOM提供者支持(如表1所示)。消息正文部分包含應(yīng)用特定的數(shù)據(jù)(也就是要求遞送到目標(biāo)位置的內(nèi)容)。
表1 JMS標(biāo)準(zhǔn)消息屬性 可選的屬性包括JMSXUserID、JMSXAppID、JMSXProducerTXID、ConsumerTXID、JMSXRcvTimestamp、JMSXDeliveryCount以及JMSXState。消息頭為JMS消息中間件提供了描述性信息,諸如消息遞送的目標(biāo)、消息的創(chuàng)建者、保留消息的時(shí)間,等等,如表2所示。
表2 JMS消息頭的域
*在所有這些消息類型中,TextMessage是最常用的,它不僅簡(jiǎn)單,而且能夠用來傳遞XML數(shù)據(jù)。
JMS支持多種消息正文類型,包括:
■ textMessage:消息正文包含一個(gè)java.lang.String對(duì)象。這是最簡(jiǎn)單的消息格式,但可以用來傳遞XML文檔。
■ ObjectMessage:消息正文中包含了一個(gè)串行化之后的Java對(duì)象,這類Java對(duì)象必須是可串行化的。
■ MapMessage:消息正文包含一系列"名字-值"形式的數(shù)據(jù)元素,通常用來傳送一些按鍵-值形式編碼的數(shù)據(jù)。設(shè)置數(shù)據(jù)元素可以用setInt、setFloat、setString等方法;在接收方,提取數(shù)據(jù)元素使用相應(yīng)的getInt、getFloat、getString等方法。
■ BytesMessage:消息正文包含一個(gè)字節(jié)數(shù)組。如果需要發(fā)送應(yīng)用生成的原始數(shù)據(jù),通常采用這一消息類型。
■ StreamMessage:消息正文包含一個(gè)Java基本數(shù)據(jù)類型(int,char,double,等等)的流。接收方讀取數(shù)據(jù)的次序跟發(fā)送方寫入數(shù)據(jù)的次序一樣,讀寫消息元素分別使用readInt和writeInt、readString和writeString之類的方法。
JMS消息對(duì)象可以用JMS會(huì)話對(duì)象(參見本文"使用JMS QueueConnection對(duì)象"部分)創(chuàng)建。下面的例子顯示了如何創(chuàng)建各類消息對(duì)象:
TextMessage textMsg = session.createTextMessage();
MapMessage mapMsg = session.createMapMessage();
ObjectMessage objectMsg = session.createObjectMessage();
BytesMessage byteMsg = session.createBytesMessage(); |
消息對(duì)象提供了設(shè)置、提取各個(gè)消息頭域的方法,下面是幾個(gè)設(shè)置和提取JMS消息頭域的例子:
// 獲取和設(shè)置消息ID
String messageID = testMsg.getJMSMessageID();
testMsg.setJMSCorrelationID(messageID);
// 獲取和設(shè)置消息優(yōu)先級(jí)
int messagePriority = mapMsg.getJMSPriority();
mapMsg.setJMSPriority(1); |
另外,消息對(duì)象還為標(biāo)準(zhǔn)屬性和應(yīng)用特有的屬性提供了類似的設(shè)置和提取方法。下面的例子顯示了如何設(shè)置、提取JMS消息標(biāo)準(zhǔn)屬性和應(yīng)用特有屬性域:
int groupSeq = objectMsg.getIntProperty("JMSGroupSeq");
objectMsg.setStringProperty("myName", "孫悟空"); |
JMS為讀寫消息正文內(nèi)容提供了許多方法。下面略舉數(shù)例,說明如何操作不同類型的消息正文:
// Text Message
TextMessage textMsg = session.createTextMessage();
textMsg.setText("消息內(nèi)容文本");
// Map Message
MapMessage mapMsg = session.createMapMessage();
mapMsg.setInt(BookCatalogNumber, 100);
mapMsg.setString(BookTitle, "書籍題目");
mapMsg.setLong(BookCost, 50.00);
String bookTitle = mapMsg.getString("BookTitle");
// Object Message
ObjectMessage objectMsg = session.createObjectMessage();
Book book = new Book("WinSocks 2.0");
objectMsg.setObject(book); |
三、JMS P2P編程 在JMS P2P通信方式中,發(fā)送程序?qū)⑾⒎湃胍粋€(gè)隊(duì)列,根據(jù)通信要求,發(fā)送程序可以要求一個(gè)應(yīng)答信息(請(qǐng)求-應(yīng)答模式),也可以不要求立即獲得應(yīng)答(發(fā)送-遺忘模式)。如果需要應(yīng)答信息,發(fā)送程序通過消息頭的JMSReplayTo域向消息的接收程序聲明應(yīng)答信息應(yīng)當(dāng)放入哪一個(gè)本地隊(duì)列。
在請(qǐng)求-應(yīng)答模式中,發(fā)送程序可以按照兩種方式操作。一種稱為偽同步方式,發(fā)送程序在等待應(yīng)答消息時(shí)會(huì)被阻塞;另一種是異步方式,發(fā)送程序發(fā)送消息后不被阻塞,可以照常執(zhí)行其他處理任務(wù),它可以在以后適當(dāng)?shù)臅r(shí)機(jī)檢查隊(duì)列,查看有沒有它希望得到的應(yīng)答信息。下面的代碼片斷顯示了JMS程序發(fā)送消息的過程。
// 發(fā)送JMS消息
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.naming.Context;
public class MyJMSSender {
private String requestMessage;
private String messageID;
private int requestRetCode = 1;
private QueueConnectionFactory queueConnectionFactory = null;
private Queue requestQueue = null;
private Queue responseQueue = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private QueueSender queueSender = null;
private TextMessage textMsg = null;
// 其他代碼...
public int processOutputMessages(String myMessage) {
// 查找管理對(duì)象
try {
InitialContext initContext = new InitialContext();
Context env = (Context) initContext.lookup
("java:comp/env");
queueConnectionFactory = (QueueConnectionFactory)
env.lookup("tlQCF");
requestQueue = (Queue) env.lookup("tlReqQueue");
responseQueue = (Queue) env.lookup("tlResQueue");
queueConnection = queueConnectionFactory.
createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession
(true, 0);
queueSender = queueSession.createSender(requestQueue);
textMsg = queueSession.createTextMessage();
textMsg.setText(myMessage);
textMsg.setJMSReplyTo(responseQueue);
// 處理消息的代碼邏輯...
queueSender.send(textMsg);
queueConnection.stop();
queueConnection.close();
queueConnection = null;
} catch (Exception e) {
// 異常處理代碼...
}
return requestRetCode = 0;
}
} |
下面來分析一下這段代碼。JMS程序的第一個(gè)任務(wù)是找到JNDI名稱上下文的位置。對(duì)于WSAD開發(fā)環(huán)境來說,如果JMS程序?qū)儆贘2EE項(xiàng)目,則JNDI名稱空間的位置由WSAD測(cè)試服務(wù)器管理,運(yùn)行時(shí)環(huán)境能夠自動(dòng)獲知這些信息。在這種情況下,我們只要調(diào)用InitialContext類默認(rèn)的構(gòu)造函數(shù)創(chuàng)建一個(gè)實(shí)例就可以了,即:
InitialContext initContext = new InitialContext(); |
對(duì)于WSAD環(huán)境之外的程序,或者程序不是使用WSAD JNDI名稱空間,例如使用LDAP(輕量級(jí)目錄訪問協(xié)議),程序?qū)ふ襃NDI名稱的操作稍微復(fù)雜一點(diǎn),必須在一個(gè)Properties或Hashtable對(duì)象中設(shè)定INITIAL_CONTEXT_FACTORY和PROVIDER_URL,然后將該P(yáng)roperties對(duì)象或Hashtable對(duì)象作為參數(shù)調(diào)用InitialContext的構(gòu)造函數(shù)。下面我們來看幾個(gè)創(chuàng)建InitialContext對(duì)象的例子,第一個(gè)例子顯示了運(yùn)行在WSAD之外的程序如何找到WSAD InitialContext對(duì)象。
// 例一:運(yùn)行在WSAD之外的程序?qū)ふ襑SAD InitialContext對(duì)象
// 說明:將localhost替換為JNDI服務(wù)運(yùn)行的服務(wù)器名稱
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"com.ibm.websphere.naming.WsnInitialContextFactory");
props.put(Context.PROVIDER_URL, "iiop://localhost/");
InitialContext initialContext = InitialContext(props);
// 例二:下面的例子顯示了如何找到基于文件的JNDI InitialContext
Hashtable hashTab = new Hashtable ();
hashTab.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
hashTab.put(Context.PROVIDER_URL, "file://c:/temp");
InitialContext initialContext = InitialContext(hashTab);
// 例三:下面的例子顯示了如何找到基于LDAP的JNDI InitialContext
Hashtable hashTab = new Hashtable ();
hashTab.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.ldap.LdapCtxFactory");
hashTab.put(Context.PROVIDER_URL,
"file://server.company.com/o=provider_name, c=us");
InitialContext initialContext = InitialContext(hashTab); |
獲得InitialContext對(duì)象之后,下一步是要查找java:comp/env子上下文(Subcontext),例如:
Context env = (Context) initContext.lookup("java:comp/env"; |
java:comp/env是J2EE規(guī)范推薦的保存環(huán)境變量的JNDI名稱子上下文。在這個(gè)子上下文中,JMS程序需要找到幾個(gè)由JMS管理的對(duì)象,包括QueueConnectionFactory對(duì)象、Queue對(duì)象等。
下面的代碼尋找由JMS管理的幾個(gè)對(duì)象,這些對(duì)象是JMS程序執(zhí)行操作所必需的。
queueConnectionFactory = (QueueConnectionFactory) env.lookup("QCF");
requestQueue = (Queue) env.lookup("requestQueue"); |
接下來,用QueueConnectionFactory對(duì)象構(gòu)造出QueueConnection對(duì)象。
queueConnection = queueConnectionFactory.createQueueConnection(); |
3.1 使用JMS QueueConnection對(duì)象 JMS QueueConnection提供了一個(gè)通向底層MOM(就本文而言,它是指WebSphere MQ隊(duì)列管理器)的連接,按照這種方式創(chuàng)建的連接使用默認(rèn)的Java綁定傳輸端口來連接到本地的隊(duì)列管理器。
對(duì)于MQ客戶程序來說(運(yùn)行在不帶本地隊(duì)列管理器的機(jī)器上的MQ程序),QueueConnectionFactory對(duì)象需要稍作調(diào)整,以便使用客戶端傳輸端口:
QueueConn.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); |
另外,剛剛創(chuàng)建的QueueConnection對(duì)象總是處于停止?fàn)顟B(tài),需要調(diào)用"queueConnection.start();"將它啟動(dòng)。
建立連接之后,用QueueConnection對(duì)象的createQueueSession方法來創(chuàng)建一次會(huì)話。必須注意的是,QueueSession對(duì)象有一個(gè)單線程的上下文,不是線程安全的,因此會(huì)話對(duì)象本身以及在會(huì)話對(duì)象基礎(chǔ)上創(chuàng)建的對(duì)象都不是線程安全的,在多線程環(huán)境中必須加以保護(hù)。createQueueSession方法有兩個(gè)參數(shù),構(gòu)造一個(gè)QueueSession對(duì)象的語句類如:
queueSession = queueConnection.createQueueSession
(false, Session.AUTO_ACKNOWLEDGE); |
createQueueSession方法的第一個(gè)參數(shù)是boolean類型,它指定了JMS事務(wù)類型--也就是說,當(dāng)前的隊(duì)列會(huì)話是事務(wù)化的(true)還是非事務(wù)化的(false)。JMS事務(wù)類型主要用于控制消息傳遞機(jī)制,不要將它與EJB的事務(wù)類型(NotSupported,Required,等等)混淆了,EJB的事務(wù)類型設(shè)定的是EJB模塊本身的事務(wù)上下文。createQueueSession方法的第二個(gè)參數(shù)是一個(gè)整數(shù),指定了確認(rèn)模式,也就是決定了如何向服務(wù)器證實(shí)消息已經(jīng)傳到。
如果隊(duì)列會(huì)話是事務(wù)化的(調(diào)用createQueueSession方法的第一個(gè)參數(shù)是true),第二個(gè)參數(shù)的值將被忽略,因?yàn)樘峤皇聞?wù)時(shí)應(yīng)答總是自動(dòng)執(zhí)行的。如果由于某種原因事務(wù)被回退,則不會(huì)有應(yīng)答出現(xiàn),視同消息尚未遞送,JMS服務(wù)器將嘗試再次發(fā)送消息。如果有多個(gè)消息參與到同一會(huì)話上下文,它們被作為一個(gè)組處理,確認(rèn)最后一個(gè)消息也就自動(dòng)確認(rèn)了此前所有尚未確認(rèn)的消息。如果發(fā)生回退,情況也相似,整個(gè)消息組將被視為尚未遞送,JMS服務(wù)器將試圖再次遞送這些消息。
下面說明一下底層的工作機(jī)制。當(dāng)發(fā)送程序發(fā)出一個(gè)消息,JMS服務(wù)器接收該消息;如果消息是持久性的,服務(wù)器先將消息寫入磁盤,然后確認(rèn)該消息。自此之后,JMS服務(wù)器負(fù)責(zé)把消息發(fā)送到目的地,除非它從客戶程序收到了確認(rèn)信息,否則不會(huì)從臨時(shí)存儲(chǔ)位置刪除消息。對(duì)于非持久性的消息,收到消息并保存到內(nèi)存之后確認(rèn)信息就立即發(fā)出了。
如果隊(duì)列會(huì)話是非事務(wù)化的(調(diào)用createQueueSession方法的第一個(gè)參數(shù)是false),則應(yīng)答模式由第二個(gè)參數(shù)決定。第二個(gè)參數(shù)的可能取值包括:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,以及CLIENT_ACKNOWLEDGE。
■ 對(duì)于非事務(wù)化的會(huì)話來說,AUTO_ACKNOWLEDGE確認(rèn)模式是最常用的確認(rèn)模式。對(duì)于事務(wù)化的會(huì)話,系統(tǒng)總是假定使用AUTO_ACKNOWLEDGE確認(rèn)模式。
■ DUPS_OK_ACKNOWLEDGE模式是一種"懶惰的"確認(rèn)方式。可以想到,這種模式可能導(dǎo)致消息提供者傳遞的一些重復(fù)消息出錯(cuò)。這種確認(rèn)模式只用于程序可以容忍重復(fù)消息存在的情況。
■ 在CLIENT_ACKNOWLEDGE模式中,消息的傳遞通過調(diào)用消息對(duì)象的acknowledge方法獲得確認(rèn)。
在AUTO_ACKNOWLEDGE模式中,消息的確認(rèn)通常在事務(wù)結(jié)束處完成。CLIENT_ACKNOWLEDGE使得應(yīng)用程序能夠加快這一過程,只要處理過程一結(jié)束就立即予以確認(rèn),恰好在整個(gè)事務(wù)結(jié)束之前。當(dāng)程序正在處理多個(gè)消息時(shí),這種確認(rèn)模式也是非常有用的,因?yàn)樵谶@種模式中程序可以在收到所有必需的消息后立即予以確認(rèn)。
對(duì)于非事務(wù)化的會(huì)話,一旦把消息放入了隊(duì)列,所有接收程序立即能夠看到該消息,且不能回退。對(duì)于事務(wù)化的會(huì)話,JMS事務(wù)化上下文把多個(gè)消息組織成一個(gè)工作單元,消息的發(fā)送和接收都是成組執(zhí)行。事務(wù)化上下文會(huì)保存事務(wù)執(zhí)行期間產(chǎn)生的所有消息,但不會(huì)把消息立即發(fā)送到目的地。
只有當(dāng)事務(wù)化的隊(duì)列會(huì)話執(zhí)行提交時(shí),保存的消息才會(huì)作為一個(gè)整體發(fā)送給目的地,這時(shí)接收程序才可以看到這些消息。如果在事務(wù)化隊(duì)列會(huì)話期間出現(xiàn)錯(cuò)誤,在錯(cuò)誤出現(xiàn)之前已經(jīng)成功處理的消息也會(huì)被撤銷。定義成事務(wù)化的隊(duì)列會(huì)話總是有一個(gè)當(dāng)前事務(wù),不需要顯式地用代碼來開始一個(gè)事務(wù)。當(dāng)然,事務(wù)化的環(huán)境總是存在一定的代價(jià)--事務(wù)化會(huì)話總是要比非事務(wù)化會(huì)話慢。
● 注意:JMS隊(duì)列會(huì)話的事務(wù)化是一個(gè)概念,實(shí)現(xiàn)了JMS邏輯的Java方法的事務(wù)屬性是另一個(gè)概念,不要將兩者混淆了。TX_REQUIRED屬性表示的是方法在一個(gè)事務(wù)上下文之內(nèi)運(yùn)行,從而確保數(shù)據(jù)庫(kù)更新和隊(duì)列中消息的處理作為一個(gè)不可分割的工作單元執(zhí)行(要么都成功提交,要么都回退)。順便說明一下,在容器管理的事務(wù)環(huán)境中,一個(gè)全局性的兩階段提交(Two-Phase Commit)事務(wù)上下文會(huì)被激活(參見本文后面的詳細(xì)說明),這時(shí)參與全局性事務(wù)的數(shù)據(jù)源應(yīng)當(dāng)用XA版的數(shù)據(jù)庫(kù)驅(qū)動(dòng)程序構(gòu)建。
相比之下,在createQueueSession方法的第一個(gè)參數(shù)中指定true建立的是JMS事務(wù)上下文:多個(gè)消息被視為一個(gè)工作單元。在消息的接收方,執(zhí)行queueSession.commit()之前,即使已經(jīng)收到了多個(gè)消息也不會(huì)給出確認(rèn)信息;一旦queueSession.commit()執(zhí)行,它就確認(rèn)收到了在此之前尚未提交的所有消息;消息發(fā)送方的情況也相似。
3.2 處理回退事件 如前所述,如果事務(wù)異常終止,收到的消息會(huì)被發(fā)回到原來的隊(duì)列。接收消息的程序下一次再來處理該隊(duì)列時(shí),它還會(huì)再次得到同一個(gè)消息,而且這一次事務(wù)很有可能還是要失敗,必須再次把消息發(fā)送回輸入隊(duì)列--如此不斷重復(fù),就形成了無限循環(huán)的情況。
為防止這種情況出現(xiàn),我們可以設(shè)置監(jiān)聽端口上的Max_retry計(jì)數(shù)器,超過Max_retry計(jì)數(shù)器規(guī)定的值,接收程序就不會(huì)再收到該消息;或者對(duì)于推式的會(huì)話,消息不會(huì)再被傳遞。另外,在推式會(huì)話中,重新傳遞的事務(wù)會(huì)被設(shè)置JMSRedelivered標(biāo)記,程序可以調(diào)用消息對(duì)象的getJMSRedelivered方法來檢查該標(biāo)記。
消息通過QueueSender JMS對(duì)象發(fā)送,QueueSender對(duì)象利用QueueSession對(duì)象的createSender方法創(chuàng)建,每一個(gè)隊(duì)列都要?jiǎng)?chuàng)建一個(gè)QueueSender對(duì)象:
queueSender = queueSession.createSender(requestQueue); |
接下來創(chuàng)建一個(gè)消息(TextMessage類型),根據(jù)myMessage字符串的值設(shè)置消息的內(nèi)容,myMessage變量作為輸入?yún)?shù)傳遞給queueSession.createTextMessag方法。
textMsg = queueSession.createTextMessage(myMessage); |
指定接收隊(duì)列,告訴接收消息的程序要把應(yīng)答放入哪一個(gè)隊(duì)列:
textMsg.setJMSReplyTo(responseQueue); |
最后,用Sender對(duì)象發(fā)送消息,然后停止并關(guān)閉連接。
queueSender.send(textMsg);
queueConnection.stop();
queueConnection.close(); |
發(fā)出消息之后,可以調(diào)用消息對(duì)象的getJMSMessageID方法獲得JMS賦予消息的ID(即提取JMSMessageID消息頭域),以后就可以通過這一ID尋找應(yīng)答消息:
String messageID = message.getJMSMessageID(); |
如果有JMS連接池,釋放后的會(huì)話不會(huì)被拆除,而是被返回給連接池以供重用。
3.3 關(guān)閉JMS對(duì)象 垃圾收集器不一定會(huì)及時(shí)關(guān)閉JMS對(duì)象,如果程序要?jiǎng)?chuàng)建大量短期生存的對(duì)象,可能會(huì)引起問題,至少會(huì)浪費(fèi)大量寶貴的資源,所以顯式地釋放所有不用的資源是很重要的。
if (queueConn != null) {
queueConn.stop();
queueConn.close();
queueConn = null;
} |
關(guān)閉隊(duì)列連接對(duì)象將自動(dòng)關(guān)閉所有利用該連接對(duì)象創(chuàng)建的對(duì)象。但個(gè)別JMS提供者例外,這時(shí)請(qǐng)按照下面代碼所示的次序關(guān)閉所有JMS對(duì)象。
// 關(guān)閉JMS對(duì)象
if (queueReceiver != null) {
queueReceiver.close();
queueReceiver = null;
}
if (queueSender != null) {
queueSender.close();
queueSender = null;
}
if (queueSession != null) {
queueSession.close();
queueSession = null;
}
if (queueConn != null) {
queueConn.stop();
queueConn.close();
queueConn = null;
} |
3.4 接收消息 消息接收方的處理邏輯也和發(fā)送方的相似。消息由JMS QueueReceiver對(duì)象接收,QueueReceiver對(duì)象建立在為特定隊(duì)列創(chuàng)建的QueueSession對(duì)象的基礎(chǔ)上。差別在于QueueReceiver接收消息的方式--QueueReceiver對(duì)象能夠按照偽同步或異步方式接收消息。下面的代碼顯示了如何用偽同步方式接收消息。
// 偽同步方式接收消息
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.Exception;
import javax.naming.InitialContext;
public class MyJMSReceiver {
private String responseMessage;
private String messageID;
private int replyRetCode = 1;
private QueueConnectionFactory queueConnectionFactory = null;
private Queue inputQueue = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private QueueReceiver queueReceiver = null;
private TextMessage textMsg = null;
public void processIncomingMessages() {
// 查找管理對(duì)象
InitialContext initContext = new InitialContext();
Context env = (Context) initContext.lookup("java:comp/env");
queueConnectionFactory = (QueueConnectionFactory)
env.lookup("tlQCF");
inputQueue = (Queue) env.lookup("tlQueue");
queueConnection = queueConnectionFactory.
createQueueConnection();
queueConnection.start();
queueSession=queueConnection.createQueueSession(true, 0);
queueReceiver = queueSession.createReceiver(inputQueue);
// 等一秒鐘,看看是否有消息到達(dá)
TextMessage inputMessage = queueReceiver.receive(1000);
// 其他處理代碼...
queueConnection.stop();
queueConnection.close();
}
} |
下面分析一下上面的代碼。消息由QueueReceiver對(duì)象執(zhí)行receive方法接收。receive方法有一個(gè)參數(shù),它指定了接收消息的等待時(shí)間(以毫秒計(jì))。在上面的代碼中,QueueReceiver對(duì)象在一秒之后超出期限,解除鎖定,把控制返回給程序。如果調(diào)用receive方法時(shí)指定了等待時(shí)間,QueueReceiver對(duì)象在指定的時(shí)間內(nèi)被鎖定并等待消息,如果超出了等待時(shí)間仍無消息到達(dá),QueueReceiver對(duì)象超時(shí)并解除鎖定,把控制返回給程序。
接收消息的方法還有一個(gè)"不等待"的版本,使用這個(gè)方法時(shí)QueueReceiver對(duì)象檢查是否有消息之后立即返回,將控制交還給程序。下面是一個(gè)例子:
TextMessage message = queueReceiver.receiveNoWait(); |
如果調(diào)用receive方法時(shí)不指定參數(shù),QueueReceiver對(duì)象會(huì)無限期地等待消息。采用這種用法時(shí)應(yīng)當(dāng)非常小心,因?yàn)槌绦驎?huì)被無限期地鎖定。下面是一個(gè)無限期等待消息的例子:
TextMessage message = queueReceiver.receive(); |
不管等待期限的參數(shù)設(shè)置了多少,這都屬于拉式消息傳遞,如前所述,這種消息傳遞方式的效率較低。不僅如此,這種方法還不適合于J2EE EJB層,不能用于EJB組件之內(nèi),原因稍后就會(huì)說明。不過,這種處理方式適合在Servlet、JSP和普通Java JMS應(yīng)用中使用。
接收消息的第二種辦法是異步接收。用異步接收方式時(shí),QueueReceiver對(duì)象必須用setMessageListener(class_name)方法注冊(cè)一個(gè)MessageListener類,其中class_name參數(shù)可以是任何實(shí)現(xiàn)了onMessage接口方法的類。在下面這個(gè)例子中,onMessage方法由同一個(gè)類實(shí)現(xiàn)(為簡(jiǎn)單計(jì),這里沒有給出try/catch塊的代碼)。
● 注意:接下來的消息接收方式不適用于EJB組件,這些代碼僅適用于Servlet、JSP和普通的Java JMS應(yīng)用程序。
// 消息監(jiān)聽類的例子
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.Queue;
import javax.jms.Exception;
import javax.naming.InitialContext;
public class MyListenerClass implements javax.jms.MessageListener {
private int responseRetCode = 1;
private boolean loopFlag = true;
private QueueConnectionFactory queueConnectionFactory = null;
private Queue responseQueue = null;
private QueueConnection queueConnection = null;
private QueueSession queueSession = null;
private QueueSender queueSender = null;
public void prepareEnvironment(String myMessage) {
// 查找管理對(duì)象
InitialContext initContext = new InitialContext();
Context env = (Context) initContext.lookup("java:comp/env");
queueConnectionFactory = (QueueConnectionFactory)
env.lookup("tlQCF");
responseQueue = (Queue) env.lookup("tlResQueue");
queueConnection = queueConnectionFactory.
createQueueConnection();
queueSession = queueConnection.createQueueSession
(true, 0);
queueReceiver = queueSession.createReceiver
(responseQueue);
queueReceiver.setMessageListener(this)
queueConnection.start();
}
public void onMessage(Message message) {
// 希望收到一個(gè)文本消息...
if (message instanceof TextMessage) {
String responseText =
"確認(rèn)消息傳遞:" + ((TextMessage) message).getText();
// 當(dāng)收到一個(gè)以@字符開頭的消息時(shí),循環(huán)結(jié)束,
// MessageListener終止
if (responseText.charAt(0) == '@') {
loopFlag = 1; // 結(jié)束處理;
} else {t
// 繼續(xù)處理消息
// 本例中我們知道應(yīng)答消息的隊(duì)列,并非真的要用到
//message.getJMSReplyTo
// 這只是一個(gè)如何獲取應(yīng)答消息隊(duì)列的例子
Destination replyToQueue=message.getJMSReplyTo();
// 設(shè)置應(yīng)答消息
TextMessage responseMessage =
responseSession.createTextMessage(responseText);
// 使CorrelationID等于消息ID,
//這樣客戶程序就能將應(yīng)答消息和原來的請(qǐng)求
// 消息對(duì)應(yīng)起來
messageID = message.getJMSMessageID();
responseMessage.setJMSCorrelationID(messageID);
// 設(shè)置消息的目的地
responseMessage.setJMSDestination(
replyToQueue)
queueSender.send(
responseMessage);
}
}
}
// 保持監(jiān)聽器活動(dòng)
while (loopFlag) {
// 將控制轉(zhuǎn)移到其他任務(wù)(休眠2秒)
System.out.println("正處于監(jiān)聽器循環(huán)之內(nèi)...");
Thread.currentThread().sleep(2000);
}
// 當(dāng)loopFlag域設(shè)置成flase時(shí),結(jié)束處理過程
queueConn.stop();
queueConnection.close();
} |
注冊(cè)一個(gè)MessageListener對(duì)象時(shí),一個(gè)實(shí)現(xiàn)了MessageListener邏輯的新線程被創(chuàng)建。我們要保持這個(gè)線程處于活動(dòng)狀態(tài),因此使用了一個(gè)while循環(huán),首先讓線程休眠一定的時(shí)間(這里是2秒),將處理器的控制權(quán)轉(zhuǎn)讓給其他任務(wù)。當(dāng)線程被喚醒時(shí),它檢查隊(duì)列,然后再次進(jìn)入休眠狀態(tài)。當(dāng)一個(gè)消息到達(dá)當(dāng)前注冊(cè)的MessageListener對(duì)象所監(jiān)視的隊(duì)列時(shí),JMS調(diào)用MessageListener對(duì)象的onMessage(message)方法,將消息作為參數(shù)傳遞給onMessage方法。
這是一種推式的消息接收方式,程序效率較高,但仍不適合在EJB組件之內(nèi)使用。下面將探討為什么這些接收消息的方式都不能用于EJB組件之內(nèi),然后給出解決辦法。雖然這部分內(nèi)容放入了P2P通信方式中討論,但其基本思路同樣適用于Pub/Sub通信方式。
3.5 消息驅(qū)動(dòng)的Bean 在前文討論JMS消息接收處理邏輯的過程中,我們看到的代碼僅僅適用于Servlet、JSP以及普通的Java應(yīng)用程序,但不適用于EJB,因?yàn)樵贘MS的接收端使用EJB存在一些技術(shù)問題。一般地,JMS程序的交互模式分兩種:
■ 發(fā)送-遺忘:JMS客戶程序發(fā)出消息,但不需要應(yīng)答。從性能的角度來看,這是最理想的處理模式,發(fā)送程序不需要等待對(duì)方響應(yīng)請(qǐng)求,可以繼續(xù)執(zhí)行自己的任務(wù)。
■ 同步請(qǐng)求-答復(fù):JMS客戶程序發(fā)出消息并等待答復(fù)。在JMS中,這種交互方式通過執(zhí)行一個(gè)偽同步的接收方法(前文已經(jīng)討論)實(shí)現(xiàn)。然而,這里可能出現(xiàn)問題。如果EJB模塊在一個(gè)事務(wù)上下文之內(nèi)執(zhí)行(通常總是如此),單一事務(wù)之內(nèi)無法執(zhí)行請(qǐng)求-答復(fù)處理,這是因?yàn)榘l(fā)送者發(fā)出一個(gè)消息之后,只有當(dāng)發(fā)送者提交了事務(wù),接收者才能收到消息。因此,在單一事務(wù)之內(nèi)是不可能得到應(yīng)答的,因?yàn)樵谝粋€(gè)未提交的事務(wù)上下文之內(nèi)接收程序永遠(yuǎn)收不到消息,當(dāng)然也不可能作出應(yīng)答了。解決的辦法是請(qǐng)求-答復(fù)必須通過兩個(gè)不同的事務(wù)執(zhí)行。
對(duì)于EJB來說,在JMS通信的接收端還可能出現(xiàn)另一個(gè)EJB特有的問題。在異步通信中,何時(shí)能夠獲得應(yīng)答是不可預(yù)料的,異步通信的主要優(yōu)點(diǎn)之一就是發(fā)送方能夠在發(fā)出消息之后繼續(xù)當(dāng)前的工作,即使接收方當(dāng)時(shí)根本不能接收消息也一樣,但請(qǐng)求-答復(fù)模式卻隱含地假定了EJB組件(假定是一個(gè)會(huì)話Bean)應(yīng)該在發(fā)出消息之后等待答復(fù)。J2EE實(shí)際上是一個(gè)基于組件技術(shù)的事務(wù)處理環(huán)境,它的設(shè)計(jì)目標(biāo)是處理大量短期生存的任務(wù),卻不是針對(duì)那些可能會(huì)被長(zhǎng)期阻塞來等待應(yīng)答的任務(wù)。
為了解決這個(gè)問題,Sun在EJB 2.0規(guī)范中加入了一種新的EJB類型,即MDB。MDB是專門為JMS異步消息環(huán)境中(接收端)EJB組件面臨的問題而設(shè)計(jì)的,其設(shè)計(jì)思路是把監(jiān)聽消息是否到達(dá)的任務(wù)從EJB組件移到容器,由于這個(gè)原因,MDB組件總是在容器的控制之下運(yùn)行,容器代替MDB監(jiān)聽著特定的隊(duì)列或主題,當(dāng)消息到達(dá)該隊(duì)列或者主題,容器就激活MDB,調(diào)用MDB的onMessage方法,將收到的消息作為參數(shù)傳遞給onMessage方法。
MDB是一種
異步組件,它的工作方式和其他同步工作的EJB組件(會(huì)話Bean,實(shí)體Bean)不同,MDB沒有Remote接口和Home接口,客戶程序不能直接激活MDB,MDB只能通過收到的消息激活。MDB的另一個(gè)重要特點(diǎn)是它對(duì)事務(wù)和安全上下文的處理方式與眾不同,
MDB已經(jīng)徹底脫離了客戶程序,因此也不使用客戶程序的事務(wù)和安全上下文。
發(fā)送JMS消息的遠(yuǎn)程客戶程序完全有可能運(yùn)行在不同的環(huán)境之中--非J2EE的環(huán)境,例如普通的Java應(yīng)用程序,可能根本沒有任何安全或事務(wù)上下文。因此,發(fā)送方的事務(wù)和安全上下文永遠(yuǎn)不會(huì)延續(xù)到接收方的MDB組件。由于MDB永遠(yuǎn)不會(huì)由客戶程序直接激活,所以它們也永遠(yuǎn)不可能在客戶程序的事務(wù)上下文之內(nèi)運(yùn)行。由于這個(gè)原因,下面這些事務(wù)屬性對(duì)于MDB來說沒有任何意義--Supports,RequiresNew,Mandatory,以及None,因?yàn)檫@些事務(wù)屬性隱含地意味著延用客戶程序的事務(wù)上下文。
MDB可以使用的事務(wù)屬性只有兩個(gè):NotSupported和Required。如果一個(gè)MDB組件有NotSupported事務(wù)屬性,則它的消息處理過程不屬于任何事務(wù)上下文。
就象其他類型的EJB一樣,MDB可能參與到兩種類型的事務(wù):Bean管理的事務(wù),容器管理的事務(wù)。MDB組件的所有方法中,只有onMessage方法可以參與到事務(wù)上下文。如果讓MDB參與到Bean管理的事務(wù)上下文,則表示允許MDB在onMessage方法之內(nèi)開始和結(jié)束一個(gè)事務(wù)。
這種策略的問題在于,收到的消息總是位于onMessage方法之內(nèi)開始的事務(wù)之外(要讓消息參與其中已經(jīng)太遲了)。在這種情況下,如果由于某種原因必須回退,則消息必須手工處理。
如果讓MDB參與到容器管理的事務(wù)上下文,情況就完全不同了。
只要設(shè)置了Required事務(wù)屬性,容器就可以在收到消息時(shí)開始一個(gè)事務(wù),這樣,消息就成為事務(wù)的一部分,當(dāng)事務(wù)成功提交時(shí),或者當(dāng)事務(wù)被回退而消息被返回到發(fā)送隊(duì)列時(shí),都可以獲得明確的確認(rèn)信息。
事務(wù)可能在兩種情形下回退:第一種情形是程序顯式地調(diào)用
setRollBackOnly方法,第二種情形是在onMessage方法之內(nèi)拋出一個(gè)
系統(tǒng)異常(注意,拋出應(yīng)用程序異常不會(huì)觸發(fā)回退動(dòng)作)。當(dāng)事務(wù)回退時(shí),消息就被返回到原來的隊(duì)列,監(jiān)聽器將再次發(fā)送消息要求處理。一般地,這種情況會(huì)引起無限循環(huán),最后導(dǎo)致應(yīng)用運(yùn)行不正常。
Max_retries屬性可以用來控制監(jiān)聽器再次提取重發(fā)消息的次數(shù)(這個(gè)屬性在配置監(jiān)聽器端口的時(shí)候設(shè)置),超過Max_retries規(guī)定的限制值之后,監(jiān)聽器將停止處理該消息(顯然,這算不上最理想的處理方式)。
如果用WebSphere MQ作為JMS提供者,還有一種更好的解決辦法。
我們可以配置WebSphere MQ,使其嘗試一定的次數(shù)之后停止發(fā)送同一消息,并將消息放入一個(gè)特定的錯(cuò)誤隊(duì)列或Dead.Letter.Queue。記住,MDB是無狀態(tài)的組件,也就是說它不會(huì)在兩次不同的方法調(diào)用之間維持任何狀態(tài)信息(就象Web服務(wù)器處理HTTP請(qǐng)求的情況一樣),同時(shí),由于客戶程序永遠(yuǎn)不會(huì)直接調(diào)用MDB組件,所以
MDB組件不能識(shí)別任何特定的客戶程序,在這個(gè)意義上,可以認(rèn)為MDB組件是"匿名"運(yùn)行的。所有MDB組件都必須實(shí)現(xiàn)javax.ejb.MessageDrivenBean接口和javax.jms.MessageListener接口。
除了onMessage方法之外,MDB還有其他幾個(gè)
回調(diào)方法--由容器調(diào)用的方法:
■ ejbCreate方法:容器調(diào)用該方法來創(chuàng)建MDB的實(shí)例。在ejbCreate方法中可以放入一些初始化的邏輯。
■ setMessageDrivenContext方法:當(dāng)Bean第一次被加入到MDB Bean的緩沖池時(shí)容器將調(diào)用該方法。setMessageDrivenContext方法通常用來捕獲MessageDrivenContext,并將setMessageDrivenContext保存到類里面的變量,例如:
public void setMessageDrivenContext(java.ejb.
MessageDrivenContext mdbContext) {
messageDrivenContext = mdbContext;
} |
■ ejbRemove方法:容器把Bean從緩沖池移出并拆除時(shí)會(huì)調(diào)用該方法。一般地,ejbRemove方法會(huì)執(zhí)行一些清理操作。
一般而言,在onMessage方法之內(nèi)執(zhí)行業(yè)務(wù)邏輯是不推薦的,業(yè)務(wù)方面的操作最好委派給其他EJB組件執(zhí)行。
MDB容器會(huì)自動(dòng)控制好并發(fā)處理多個(gè)消息的情形。每一個(gè)MDB實(shí)例處理一個(gè)消息,在onMessage方法把控制返回給容器之前,不會(huì)要求MDB同時(shí)處理另一個(gè)消息。如果有多個(gè)消息必須并行處理,容器會(huì)激活同一MDB的多個(gè)實(shí)例。
從WebSphere 5.0開始,開發(fā)環(huán)境(WSAD 5.0)和運(yùn)行環(huán)境(WAS 5.0)都開始全面支持MDB。
下面的代碼片斷顯示了一個(gè)MDB的概念框架。
// MDB概念框架
// package 聲明略
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.naming.InitialContext;
import java.util.*;
public class LibraryNotificationBean
implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
MessageDrivenContext messageDrivenContext;
Context jndiContext;
public void setMessageDrivenContext(MessageDrivenContext
msgDrivenContext) {
messageDrivenContext = msgDrivenContext;
try {
jndiContext = new InitialContext();
} catch (NamingException ne) {
throw new EJBException(ne);
}
}
public void ejbCreate() {
}
public void onMessage(Message notifyMsg) {
try {
MapMessage notifyMessage = (MapMessage) notifyMsg;
String bookTitle = (String) notifyMessage.
getString("BookTitle");
String bookAuthor = (String) notifyMessage.
getString("BookAuthor");
String bookCatalogNumber =
(String) notifyMessage.getString
("bookCatalogNumber");
Integer bookQuantity =
(Integer) notifyMessage.getInteger("BookQuantity");
// 處理消息...(調(diào)用其他EJB組件)
} catch (Exception e) {
throw new EJBException(e);
}
}
public void ejbRemove() {
try {
jndiContext.close();
jndiContext = null;
} catch (NamingException ne) {
// 異常處理
}
}
} |
3.6 消息持久化 消息可以是持久性的,也可以是非持久性的,持久性的消息和非持久性的消息可以放入同一個(gè)隊(duì)列。持久性的消息會(huì)被寫入磁盤,即使系統(tǒng)出現(xiàn)故障也可以恢復(fù)。當(dāng)然,正象其他場(chǎng)合的持久化操作一樣,消息的持久化也會(huì)增加一定的開銷,持久性消息大約要慢7%。控制消息持久化的辦法之一是定義隊(duì)列時(shí)設(shè)置其屬性,如果沒有顯式地設(shè)置持久化屬性,系統(tǒng)將采用默認(rèn)值。另外,JMS應(yīng)用程序本身也可以定義持久化屬性:
■ PERSISTENCE(QDEF):繼承隊(duì)列的默認(rèn)持久化屬性值。
■ PERSISTENCE(PERS):持久性的消息。
■ PERSISTENCE(NON):非持久性的消息。
消息的持久性還可以通過消息屬性頭
JMSDeliveryMode來調(diào)節(jié),JMSDeliveryMode可以取值DeliveryMode.PERSISTENT或DeliveryMode.NON_PERSISTENT,前者表示消息必須持久化,后者表示消息不需持久化。在事務(wù)化會(huì)話中處理的消息總是持久性的。
3.7 消息選擇器 JMS提供了從隊(duì)列選取一個(gè)消息子集的機(jī)制,能夠過濾掉所有不滿足選擇條件的消息。選擇消息的條件不僅可以引用消息頭的域,還可以引用消息屬性的域。下面是如何利用這一功能的例子:
QueueReceiver queueReceiver =
queueSession.createReceiver(requestQueue,
"BookTitle = 'Windows 2000'");
QueueBrowser queueBrowser = queueSession.createBrowser
(requestQueue, "BookTitle = 'Windows 2000'
AND BookAuthor = 'Robert Lee'"); |
注意,字符串(例如'Windows 2000')被一個(gè)雙引號(hào)之內(nèi)的單引號(hào)對(duì)包圍。
四、JMS Pub/Sub編程 Pub/Sub通信方式的編程與P2P通信編程相似,兩者最主要的差別是消息發(fā)送的目的地對(duì)象。在Pub/Sub通信方式中,發(fā)布消息的目的地和消費(fèi)消息的消息源是一個(gè)稱為主題(Topic)的JMS對(duì)象。主題對(duì)象的作用就象是一個(gè)虛擬通道,其中封裝了一個(gè)Pub/Sub的目的地(Destination)對(duì)象。
在P2P通信方式中,(發(fā)送到隊(duì)列的)消息只能由一個(gè)消息消費(fèi)者接收;但在Pub/Sub通信方式中,消息生產(chǎn)者發(fā)布到主題的消息可以分發(fā)到多個(gè)消息消費(fèi)者,而且,消息的生產(chǎn)者和消費(fèi)者之間的結(jié)合是如此寬松,以至于
生產(chǎn)者根本不必了解任何有關(guān)消息消費(fèi)者的情況,消息生產(chǎn)者和消費(fèi)者都只要知道一個(gè)公共的目的地(也就是雙方"交談"的主題)。
由于這個(gè)原因,消息的生產(chǎn)者通常稱為出版者,消息的消費(fèi)者則相應(yīng)地稱為訂閱者。出版者為某一主題發(fā)布的所有消息都會(huì)傳遞到該主題的所有訂閱者,訂閱者將收到它訂閱的主題上的所有消息,每一個(gè)訂閱者都會(huì)收到一個(gè)消息的副本。訂閱可以是
耐久性的(Durable)或非耐久性(Nondurable)。非耐久性的訂閱者只能收到訂閱之后才發(fā)布的消息。
耐久性訂閱者則不同,它可以中斷之后再連接,仍能收到在它斷線期間發(fā)布的消息。Put/Sub通信方式中耐久性連接(在某種程度上)類似于P2P通信中的持久性消息/隊(duì)列。出版者和訂閱者永遠(yuǎn)不會(huì)直接通信,Pub/Sub代理的作用就象是一個(gè)信息發(fā)布中心,把所有消息發(fā)布給它們的訂閱者。
● 注意:從WebSphere MQ 5.2開始,加裝了MA88和MA0C擴(kuò)展的WebSphere MQ可以當(dāng)作JMS Pub/Sub通信代理。此外,WebSphere MQ Integrator也能夠作為一個(gè)代理用。從MQ 5.3開始,MA88成了MQ基本軟件包的一部分,因此只要在MQ 5.3的基礎(chǔ)上安裝MA0C就可以了。在MQ JMS環(huán)境中,要讓Pub/Sub通信順利運(yùn)行,運(yùn)行Pub/Sub代理的隊(duì)列管理器上必須創(chuàng)建一組系統(tǒng)隊(duì)列。
MQ JMS MA0C擴(kuò)展模塊提供了一個(gè)工具來構(gòu)造所有必需的Pub/Sub系統(tǒng)隊(duì)列,這個(gè)工具就是MQJMS_PSQ.mqsc,位于
\java\bin目錄之下。要構(gòu)造Pub/Sub通信方式所需的系統(tǒng)隊(duì)列,只需從上述目錄執(zhí)行命令:runmqsc < MQJMS_PSQ.mqsc。
多個(gè)主題可以組織成一種樹形的層次結(jié)構(gòu),樹形結(jié)構(gòu)中主題的名稱之間用一個(gè)斜杠(/)分隔--例如,Books/UnixBooks/SolarisBooks。如果要訂閱一個(gè)以上的主題,可以在指定主題名字的時(shí)候使用通配符,例如,"Books/#"就是一個(gè)使用通配符的例子。
下面給出了一個(gè)JMS Pub/Sub通信的例子(為簡(jiǎn)單計(jì),這里省略了try/catch代碼塊)。在這個(gè)例子中,Books/ UnixBooks/SolarisBooks主題的訂閱者將收到所有發(fā)布到SolarisBooks的消息,Books/#主題的訂閱者將收到所有有關(guān)Books的消息(包括發(fā)布到UnixBooks和SolarisBooks主題的消息)。
// JMS Pub/Sub通信
import javsx.jms.*;
import javax.naming.*;
import javax.ejb.*;
public class PublisherExample implements javax.ejb.SessionBean {
private TopicConnectionFactory topicConnFactory = null;
private TopicConnection topicConnection = null;
private TopicPublisher topicPublisher = null;
private TopicSession topicSession = null;
private SessionContext sessionContext = null;
public void setSessionContext(SessionContext ctx) {
sessionContext = cts;
}
public void ejbCreate() throws CreateException {
InitialContext initContext = new InitialContext();
// 從JNDI查找主題的連接工廠
topicConnFactory =(TopicConnectionFactory)
initContext.lookup("java:comp/env/TCF");
// 從JNDI查找主題
Topic unixBooksTopic =
(Topic) initContext.lookup("java:comp/env/UnixBooks");
Topic javaBooksTopic =
(Topic) initContext.lookup("java:comp/env/JavaBooks");
Topic linuxBooksTopic =
(Topic) initContext.lookup("java:comp/env/LinuxBooks");
Topic windowsBooksTopic =
(Topic) initContext.lookup("java:comp/env/WindowsBooks");
Topic allBooksTopic =
(Topic) initContext.lookup("java:comp/env/AllBooks");
// 創(chuàng)建連接
topicConnection = topicConnFactory.createTopicConnection();
topicConn.start();
// 創(chuàng)建會(huì)話
topicSession =
topicConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
}
public void publishMessage(String workMessage,
String topicToPublish) {
// 創(chuàng)建一個(gè)消息
TextMessage message = topicSession.createTextMessage
(workMessage); // 創(chuàng)建出版者,發(fā)布消息
if ((topicToPublish.toLowerCase()).equals("java")) {
TopicPublisher javaBooksPublisher =
topicSession.createPublisher(javaBooksTopic);
javaBooksPublisher.publish(message);
}
if ((topicToPublish.toLowerCase()).equals("unix")) {
TopicPublisher unixBooksPublisher =
topicSession.createPublisher(unixBooksTopic);
J2EE Enterprise Messaging 475 unixBooksPublisher.
publish(message);
}
if ((topicToPublish.toLowerCase()).equals("linux")) {
TopicPublisher linuxBooksPublisher =
topicSession.createPublisher(linuxBooksTopic);
linuxBooksPublisher.publish(message);
}
if ((topicToPublish.toLowerCase()).equals("windows")) {
TopicPublisher windowsBooksPublisher =
topicSession.createPublisher(windowsBooksTopic);
windowsBooksPublisher.publish(message);
}
TopicPublisher allBooksPublisher =
topicSession.createPublisher(allBooksTopic);
allBooksPublisher.publish(message);
}
public void ejbActivate() {
}
public void ejbPassivate() {
}
public void ejbRemove() {
// 清理工作...
if (javaBooksPublisher != null) {
javaBooksPublisher.close();
javaBooksPublisher = null;
}
if (unixBooksPublisher != null) {
unixBooksPublisher.close();
Chapter 9 476 unixBooksPublisher = null;
}
if (linuxBooksPublisher != null) {
linuxBooksPublisher.close();
linuxBooksPublisher = null;
}
if (windowsBooksPublisher != null) {
windowsBooksPublisher.close();
windowsBooksPublisher = null;
}
if (allBooksPublisher != null) {
allBooksPublisher.close();
allBooksPublisher = null;
}
if (topicSession != null) {
topicSession.close();
topicSession = null;
}
if (topicConnection != null) {
topicConnection.stop();
topicConnection.close();
topicConnection = null;
}
} |
這段代碼比較簡(jiǎn)單,想來不需要多余的解釋了。唯一值得一提的地方是如何將一個(gè)消息發(fā)布到不同的主題:對(duì)于每一個(gè)特定的主題,分別創(chuàng)建一個(gè)對(duì)應(yīng)的出版者,然后用它將消息發(fā)布到主題。
如果一個(gè)MDB組件只負(fù)責(zé)接收消息,把所有其他的消息處理操作都委托給專用業(yè)務(wù)組件(這意味著MDB之內(nèi)不包含消息發(fā)送或發(fā)布邏輯),MDB的代碼就相當(dāng)于P2P通信方式中的處理代碼,使用MDB唯一的改變之處是將監(jiān)聽端口從監(jiān)聽一個(gè)隊(duì)列改為監(jiān)聽一個(gè)主題。有興趣的讀者可以自己試驗(yàn)一下雙重監(jiān)聽的實(shí)現(xiàn)方式。
五、二階段提交的事務(wù)
在企業(yè)級(jí)處理任務(wù)中,為了保證JMS或非JMS代碼處理業(yè)務(wù)邏輯過程的完整性(對(duì)于一個(gè)單元的工作,要么成功提交所有的處理步驟,要么全部回退所有處理步驟),操作一般要在一個(gè)事務(wù)上下文之內(nèi)進(jìn)行。除了將消息放入隊(duì)列的過程之外,如果還要向數(shù)據(jù)庫(kù)插入記錄(二階段提交,要么全部成功,要么全部失敗),事務(wù)上下文的重要性尤其突出。
為了支持二階段提交,JMS規(guī)范定義了下列XA版的JMS對(duì)象:XAConnectionFactory、XAQueueConnectionFactory、XASession、XAQueueSession、XATopicConnectionFactory、XATopicConnection,以及XATopicSession。另外,凡是參與全局事務(wù)的所有資源均應(yīng)該使用其XA版。特別地,對(duì)于JDBC資源,必須使用JDBC XADataSource。最后一點(diǎn)是,全局事務(wù)由JTA TransactionManager控制。下面的代碼顯示了建立全局事務(wù)所需的步驟。
// 配置全局事務(wù)
// 從JNDI名稱空間獲取一個(gè)JTA TransactionManager
TransactionManager globalTxnManager =
jndiContext.lookup("java:comp/env/txt/txnmgr");
// 啟動(dòng)全局事務(wù)
globalTxnManager.begin();
// 獲取事務(wù)對(duì)象
Transaction globalTxn = globalTxnManager.getTransaction();
// 獲取XA數(shù)據(jù)資源
XADatasource xaDatasource = jndiContext.lookup
("java:comp/env/jdbc/datasource");
// 獲取一個(gè)連接
XAConnection jdbcXAConn = xaDatasource.getConnection();
// 從XA連接獲取XAResource
XAResource jdbcXAResource = jdbcXAConn.getXAResource();
// 在全局事務(wù)中"征募"XAResource
globalTxn.enlist(jdbcXAResource);
// 獲取XA隊(duì)列連接
XAQueueConnectionFactory xaQueueConnectionFactory =
JndiContext.lookup("java:comp/env/jms/xaQCF")
XAQueueConnection xaQueueConnection =
XaQueueConnectionFactory.createXAQueueConnection();
// 獲取XA隊(duì)列會(huì)話
XAQueueSession xaQueueSession = xaQueueConnection.
createXAQueueSession();
// 從會(huì)話獲取XA資源
XAResource jmsXAResource = xaQueueSession.getXAResource();
// 在全局事務(wù)中"征募"該XAResource
globalTxn.enlist(jmsXAResource);
// 其他處理工作...
// 提交全局事務(wù)
globalTxn.commit(); |
總結(jié):本文介紹了JMS及其在WSAD環(huán)境下的編程,探討了JMS異步通信的主要優(yōu)點(diǎn),以及兩種通信方式(P2P和Pub/Sub)、MDB、JMS事務(wù)、二階段提交全局事務(wù)等方面的概念。希望本文的介紹對(duì)你有所幫助。