<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    kapok

    垃圾桶,嘿嘿,我藏的這么深你們還能找到啊,真牛!

      BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      455 隨筆 :: 0 文章 :: 76 評(píng)論 :: 0 Trackbacks
    http://news.ccidnet.com/pub/article/c1060_a59896_p2.html
    提綱:


    一、JMS基本概念

      1.1 P2P通信

      1.2 Pub/Sub通信

    二、JMS消息

    三、JMS P2P編程

      3.1 使用JMS QueueConnection對(duì)象


            3.2 處理回退事件

      3.3 關(guān)閉JMS對(duì)象

      3.4 接收消息

      3.5 消息驅(qū)動(dòng)的Bean

      3.6 消息持久化

      3.7 消息選擇器

    四、JMS Pub/Sub編程

    五、二階段提交的事務(wù)

    ━━━━━━━━━━━━━━━━━━━━━━━━━━

      EJB 2.0和J2EE 1.3規(guī)范開始提供對(duì)Java消息服務(wù)(JMS)的支持。在J2EE 1.3加入JMS之前,J2EE環(huán)境中的組件通過RMI-IIOP協(xié)議通信,J2EE是一個(gè)完全同步的平臺(tái)。由于在J2EE 1.3規(guī)范中引入了JMS,J2EE環(huán)境開始具備一項(xiàng)極其重要的功能--異步通信。

      ● 說明:RMI-IIOP是Java遠(yuǎn)程方法調(diào)用(RMI,Remote Method Invocation)的一個(gè)兼容CORBA的版本,CORBA是Common Object Request Broker Architecture的縮寫,即公用對(duì)象請(qǐng)求代理(調(diào)度)體系結(jié)構(gòu)。RMI-IIOP通過CORBA平臺(tái)和語言中立的通信協(xié)議發(fā)送RMI消息。

      為了支持異步通信,J2EE 1.3規(guī)范還引入了一種新的EJB類型:消息驅(qū)動(dòng)的Bean,即Message Driven Bean,簡(jiǎn)稱MDB。如前所述,在JMS之前,J2EE原來是一個(gè)建立在Java RMI-IIOP通信協(xié)議基礎(chǔ)上的同步環(huán)境,但MDB卻具有接收異步消息的能力。

      異步通信使得企業(yè)應(yīng)用能夠建立在一種全新的通信機(jī)制之上,它具有如下重要優(yōu)點(diǎn):

      ■ 異步通信程序不直接交換信息。由于這個(gè)原因,通信的任意一方能夠在對(duì)方停止響應(yīng)時(shí)繼續(xù)自己的操作,例如當(dāng)接收消息的一方不能響應(yīng)時(shí),發(fā)送方仍能夠發(fā)出消息。

      ■ 異步通信有助于提高可靠性。支持異步通信的中間件軟件提供了有保證的(可靠的)消息遞送機(jī)制,即使整個(gè)環(huán)境出現(xiàn)問題也能夠保證消息傳遞到目的地。Internet是一種不可靠的通信媒體,對(duì)于通過Internet通信的應(yīng)用來說,異步通信的這一特點(diǎn)非常富有吸引力。

      ■ 發(fā)送異步消息的程序不會(huì)在等待應(yīng)答的時(shí)候被鎖定,從而極大地有利于提高性能。

      JMS本身不是一個(gè)通信軟件,而是一個(gè)標(biāo)準(zhǔn)的應(yīng)用編程接口(API),用來建立廠商中立的異步通信機(jī)制。從這個(gè)意義上說,JMS類似于JDBC和JNDI,例如就JDBC來說,它要求有一個(gè)底層的數(shù)據(jù)庫(kù)提供者,JMS則要求有一個(gè)支持JMS標(biāo)準(zhǔn)的底層異步通信中間件提供者,一般稱為面向消息的中間件(Message-Oriented Middleware,MOM)。

      MOM是一種支持應(yīng)用程序通過交換消息實(shí)現(xiàn)異步通信的技術(shù)。在某種程度上,異步通信有點(diǎn)象是人們通過email進(jìn)行通信;類似地,同步通信的程序就象是人們通過電話進(jìn)行通信。在異步通信過程中,程序的結(jié)合方式是寬松的,換句話說,異步通信的程序并不直接相互聯(lián)系,而是通過稱為隊(duì)列(Queue)或主題(Topic)的虛擬通道聯(lián)系。

      同時(shí),異步通信還意味著消息通過一個(gè)中轉(zhuǎn)站,按照存儲(chǔ)-轉(zhuǎn)發(fā)的方式傳遞,即使通信的接收方當(dāng)時(shí)并未運(yùn)行,另一方(發(fā)送方)的程序也能夠順利發(fā)出消息,一旦接收方的程序后來開始運(yùn)行,消息就會(huì)傳遞給它。

      JMS通信主要的優(yōu)點(diǎn)是提供了一個(gè)環(huán)境,在這個(gè)環(huán)境中各個(gè)程序通過標(biāo)準(zhǔn)的API進(jìn)行通信,開發(fā)者不必再面對(duì)各種不同操作系統(tǒng)、數(shù)據(jù)表示方式、底層協(xié)議所帶來的復(fù)雜性。

      本文討論了JMS編程的基本概念以及兩種JMS通信方式:第一種是端對(duì)端通信(Point-to-Point,P2P)方式,第二種是出版/訂閱(Publish/Subscribe,Pub/Sub)方式,另外還涵蓋了JMS消息結(jié)構(gòu)、JMS主要對(duì)象、MDB、JMS和WebSphere編程、消息持久化以及JMS事務(wù)支持方面的問題。

      一、JMS基本概念

      鑒于JMS是一種比較新的技術(shù),所以本文將首先詳細(xì)介紹JMS編程的一般概念,然后再探討WSAD 5.0環(huán)境下的JMS開發(fā)。如前所示,JMS程序本身并不直接相互通信,消息被發(fā)送給一個(gè)臨時(shí)中轉(zhuǎn)站--隊(duì)列或主題。隊(duì)列和主題都是能夠收集消息的中轉(zhuǎn)對(duì)象,但兩者支持的消息傳遞機(jī)制又有所不同,分別對(duì)應(yīng)兩種不同的通信方式--P2P和Pub/Sub。

      1.1 P2P通信

      P2P消息傳遞又可以按照推(Push)和拉(Pull)兩種方式運(yùn)作。在P2P拉方式中,程序通過稱為隊(duì)列的虛擬通道通信:在通信會(huì)話的發(fā)送方,發(fā)送程序把一個(gè)消息"放入"隊(duì)列,在接收方,接收程序定期掃描隊(duì)列,尋找它希望接收和處理的消息。和推方式相比,拉方式的消息傳遞效率較低,因?yàn)樗枰芏鴱?fù)始地檢查消息是否到達(dá),這個(gè)過程會(huì)消耗一定的資源。另外必須注意的一點(diǎn)是,當(dāng)接收方發(fā)現(xiàn)一個(gè)需要處理的消息時(shí),它就會(huì)"提取"消息,從效果上看等于從隊(duì)列刪除了消息。

      因此,即使有多個(gè)接收程序在處理同一個(gè)隊(duì)列,對(duì)于某一特定的消息來說,總是只有一個(gè)接收者。JMS程序可以使用多個(gè)隊(duì)列,每一個(gè)隊(duì)列可以由多個(gè)程序處理,但是只有一個(gè)程序才會(huì)收到某個(gè)特定的消息。

      在P2P推方式的消息傳遞中,發(fā)送程序的工作原理也一樣,它把消息發(fā)送到隊(duì)列,但接收程序的工作原理有所不同。接收程序?qū)崿F(xiàn)了一個(gè)Listener接口,包括實(shí)現(xiàn)了該接口中的onMessage回調(diào)方法,在J2EE環(huán)境中監(jiān)聽隊(duì)列接收消息的任務(wù)交給了容器,每當(dāng)一個(gè)新的消息達(dá)到隊(duì)列,容器就調(diào)用onMessage方法,將消息作為參數(shù)傳遞給onMessage方法。

      P2P通信最重要的特點(diǎn)(不管是推還是拉)是:每一個(gè)消息總是只由一個(gè)程序接收。一般而言,P2P程序在通信過程中參與的活動(dòng)較多,例如,發(fā)送程序可以向接收程序指出應(yīng)答消息應(yīng)當(dāng)放入哪一個(gè)隊(duì)列,它還可以要求返回一個(gè)確認(rèn)或報(bào)告消息。

      1.2 Pub/Sub通信

      在Pub/Sub通信方式中,程序之間通過一個(gè)主題(Topic)實(shí)現(xiàn)通信,用主題作為通信媒介要求有Pub/Sub代理程序的支持(稍后詳細(xì)介紹)。在消息發(fā)送方,生產(chǎn)消息的程序向主題發(fā)送消息;在接收方,消息的消費(fèi)程序向感興趣的主題訂閱消息。當(dāng)一個(gè)消息到達(dá)主題,所有向該主題訂閱的消費(fèi)程序都會(huì)通過onMessage方法的參數(shù)收到消息。

      這是一種推式的消息傳遞機(jī)制。可以設(shè)想,會(huì)有一個(gè)以上的消費(fèi)程序收到同一消息的副本。相比之下,程序在Pub/Sub通信過程中參與的活動(dòng)較少,當(dāng)生產(chǎn)者程序向某個(gè)特定的隊(duì)列發(fā)送消息,它不知道到底會(huì)有多少程序接收到該消息(可能有多個(gè),可能沒有)。通過訂閱主題實(shí)現(xiàn)的通信是一種比較靈活的通信方式,主題的訂閱者可以動(dòng)態(tài)地改變,卻不需要改動(dòng)底層的通信機(jī)制,而且它對(duì)整個(gè)通信過程完全透明。

      Pub/Sub方式的通信要求有Pub/Sub代理的支持。Pub/Sub代理是一種協(xié)調(diào)、控制消息傳遞過程,保證消息傳遞到接收方的程序。P2P通信方式中程序利用一個(gè)隊(duì)列作為通信的中轉(zhuǎn)站,相比之下,Pub/Sub通信方式中程序直接交互的是特殊的代理隊(duì)列,這就是我們要在WebSphere MQ常規(guī)安裝方式之上再裝一個(gè)MA0C的原因(只有用WebSphere MQ作為JMS提供者時(shí)才必須如此。要求使用MQ 5.3.1或更高的版本),MA0C就是一個(gè)支持Pub/Sub方式通信的代理軟件。

      QueueConnectionFactory和TopicConnectionFactory對(duì)象是創(chuàng)建相應(yīng)的QueueConnection對(duì)象和TopicConnection對(duì)象的JMS工廠類對(duì)象,JMS程序正是通過QueueConnection對(duì)象和TopicConnection對(duì)象連接到底層的MOM技術(shù)。

      二、JMS消息

      基于JMS的程序通過交換JMS消息實(shí)現(xiàn)通信,JMS消息由三部分構(gòu)成:消息頭,消息屬性(可選的),消息正文。消息頭有多個(gè)域構(gòu)成,包含了消息遞送信息和元數(shù)據(jù)。

      消息屬性部分包含一些標(biāo)準(zhǔn)的以及應(yīng)用特定的域,消息選擇器依據(jù)這些信息來過濾收到的消息。JMS定義了一組標(biāo)準(zhǔn)的屬性,要求MOM提供者支持(如表1所示)。消息正文部分包含應(yīng)用特定的數(shù)據(jù)(也就是要求遞送到目標(biāo)位置的內(nèi)容)。



    表1 JMS標(biāo)準(zhǔn)消息屬性


      可選的屬性包括JMSXUserID、JMSXAppID、JMSXProducerTXID、ConsumerTXID、JMSXRcvTimestamp、JMSXDeliveryCount以及JMSXState。消息頭為JMS消息中間件提供了描述性信息,諸如消息遞送的目標(biāo)、消息的創(chuàng)建者、保留消息的時(shí)間,等等,如表2所示。



    表2 JMS消息頭的域


      *在所有這些消息類型中,TextMessage是最常用的,它不僅簡(jiǎn)單,而且能夠用來傳遞XML數(shù)據(jù)。  

      JMS支持多種消息正文類型,包括:

      ■ textMessage:消息正文包含一個(gè)java.lang.String對(duì)象。這是最簡(jiǎn)單的消息格式,但可以用來傳遞XML文檔。

      ■ ObjectMessage:消息正文中包含了一個(gè)串行化之后的Java對(duì)象,這類Java對(duì)象必須是可串行化的。

      ■ MapMessage:消息正文包含一系列"名字-值"形式的數(shù)據(jù)元素,通常用來傳送一些按鍵-值形式編碼的數(shù)據(jù)。設(shè)置數(shù)據(jù)元素可以用setInt、setFloat、setString等方法;在接收方,提取數(shù)據(jù)元素使用相應(yīng)的getInt、getFloat、getString等方法。

      ■ BytesMessage:消息正文包含一個(gè)字節(jié)數(shù)組。如果需要發(fā)送應(yīng)用生成的原始數(shù)據(jù),通常采用這一消息類型。

      ■ StreamMessage:消息正文包含一個(gè)Java基本數(shù)據(jù)類型(int,char,double,等等)的流。接收方讀取數(shù)據(jù)的次序跟發(fā)送方寫入數(shù)據(jù)的次序一樣,讀寫消息元素分別使用readInt和writeInt、readString和writeString之類的方法。

      JMS消息對(duì)象可以用JMS會(huì)話對(duì)象(參見本文"使用JMS QueueConnection對(duì)象"部分)創(chuàng)建。下面的例子顯示了如何創(chuàng)建各類消息對(duì)象:

    TextMessage textMsg = session.createTextMessage();
    MapMessage mapMsg = session.createMapMessage();
    ObjectMessage objectMsg = session.createObjectMessage();
    BytesMessage byteMsg = session.createBytesMessage();


      消息對(duì)象提供了設(shè)置、提取各個(gè)消息頭域的方法,下面是幾個(gè)設(shè)置和提取JMS消息頭域的例子:

    // 獲取和設(shè)置消息ID
    String messageID = testMsg.getJMSMessageID();
    testMsg.setJMSCorrelationID(messageID);
    // 獲取和設(shè)置消息優(yōu)先級(jí)
    int messagePriority = mapMsg.getJMSPriority();
    mapMsg.setJMSPriority(1);


      另外,消息對(duì)象還為標(biāo)準(zhǔn)屬性和應(yīng)用特有的屬性提供了類似的設(shè)置和提取方法。下面的例子顯示了如何設(shè)置、提取JMS消息標(biāo)準(zhǔn)屬性和應(yīng)用特有屬性域:

    int groupSeq = objectMsg.getIntProperty("JMSGroupSeq");
    objectMsg.setStringProperty("myName", "孫悟空");


      JMS為讀寫消息正文內(nèi)容提供了許多方法。下面略舉數(shù)例,說明如何操作不同類型的消息正文:

    // Text Message
    TextMessage textMsg = session.createTextMessage();
    textMsg.setText("消息內(nèi)容文本");
    
    // Map Message
    MapMessage mapMsg = session.createMapMessage();
    mapMsg.setInt(BookCatalogNumber, 100);
    mapMsg.setString(BookTitle, "書籍題目");
    mapMsg.setLong(BookCost, 50.00);
    String bookTitle = mapMsg.getString("BookTitle");
    
    // Object Message
    ObjectMessage objectMsg = session.createObjectMessage();
    Book book = new Book("WinSocks 2.0");
    objectMsg.setObject(book);


    三、JMS P2P編程

      在JMS P2P通信方式中,發(fā)送程序?qū)⑾⒎湃胍粋€(gè)隊(duì)列,根據(jù)通信要求,發(fā)送程序可以要求一個(gè)應(yīng)答信息(請(qǐng)求-應(yīng)答模式),也可以不要求立即獲得應(yīng)答(發(fā)送-遺忘模式)。如果需要應(yīng)答信息,發(fā)送程序通過消息頭的JMSReplayTo域向消息的接收程序聲明應(yīng)答信息應(yīng)當(dāng)放入哪一個(gè)本地隊(duì)列。

    在請(qǐng)求-應(yīng)答模式中,發(fā)送程序可以按照兩種方式操作。一種稱為偽同步方式,發(fā)送程序在等待應(yīng)答消息時(shí)會(huì)被阻塞;另一種是異步方式,發(fā)送程序發(fā)送消息后不被阻塞,可以照常執(zhí)行其他處理任務(wù),它可以在以后適當(dāng)?shù)臅r(shí)機(jī)檢查隊(duì)列,查看有沒有它希望得到的應(yīng)答信息。下面的代碼片斷顯示了JMS程序發(fā)送消息的過程。

    // 發(fā)送JMS消息
    import javax.jms.Message;
    import javax.jms.TextMessage;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueConnection;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.Queue;
    import javax.jms.JMSException;
    import javax.naming.InitialContext;
    import javax.naming.Context;
    
    public class MyJMSSender {
        private String requestMessage;
        private String messageID;
        private int requestRetCode = 1;
        private QueueConnectionFactory queueConnectionFactory = null;
        private Queue requestQueue = null;
        private Queue responseQueue = null;
        private QueueConnection queueConnection = null;
        private QueueSession queueSession = null;
        private QueueSender queueSender = null;
        private TextMessage textMsg = null;
    
        // 其他代碼...
        public int processOutputMessages(String myMessage) {
            // 查找管理對(duì)象
            try {
                InitialContext initContext = new InitialContext();
                Context env = (Context) initContext.lookup
    	    ("java:comp/env");
                queueConnectionFactory = (QueueConnectionFactory)
    	    env.lookup("tlQCF");
                requestQueue = (Queue) env.lookup("tlReqQueue");
                responseQueue = (Queue) env.lookup("tlResQueue");
                queueConnection = queueConnectionFactory.
    	    createQueueConnection();
                queueConnection.start();
                queueSession = queueConnection.createQueueSession
    	    (true, 0);
                queueSender = queueSession.createSender(requestQueue);
                textMsg = queueSession.createTextMessage();
                textMsg.setText(myMessage);
                textMsg.setJMSReplyTo(responseQueue);
                // 處理消息的代碼邏輯...
                queueSender.send(textMsg);
                queueConnection.stop();
                queueConnection.close();
                queueConnection = null;
            } catch (Exception e) {
                // 異常處理代碼...
            }
            return requestRetCode = 0;
        }
    }


      下面來分析一下這段代碼。JMS程序的第一個(gè)任務(wù)是找到JNDI名稱上下文的位置。對(duì)于WSAD開發(fā)環(huán)境來說,如果JMS程序?qū)儆贘2EE項(xiàng)目,則JNDI名稱空間的位置由WSAD測(cè)試服務(wù)器管理,運(yùn)行時(shí)環(huán)境能夠自動(dòng)獲知這些信息。在這種情況下,我們只要調(diào)用InitialContext類默認(rèn)的構(gòu)造函數(shù)創(chuàng)建一個(gè)實(shí)例就可以了,即:

    InitialContext initContext = new InitialContext();


      對(duì)于WSAD環(huán)境之外的程序,或者程序不是使用WSAD JNDI名稱空間,例如使用LDAP(輕量級(jí)目錄訪問協(xié)議),程序?qū)ふ襃NDI名稱的操作稍微復(fù)雜一點(diǎn),必須在一個(gè)Properties或Hashtable對(duì)象中設(shè)定INITIAL_CONTEXT_FACTORY和PROVIDER_URL,然后將該P(yáng)roperties對(duì)象或Hashtable對(duì)象作為參數(shù)調(diào)用InitialContext的構(gòu)造函數(shù)。下面我們來看幾個(gè)創(chuàng)建InitialContext對(duì)象的例子,第一個(gè)例子顯示了運(yùn)行在WSAD之外的程序如何找到WSAD InitialContext對(duì)象。

    // 例一:運(yùn)行在WSAD之外的程序?qū)ふ襑SAD InitialContext對(duì)象
    // 說明:將localhost替換為JNDI服務(wù)運(yùn)行的服務(wù)器名稱
    Properties props = new Properties();
    props.put(Context.INITIAL_CONTEXT_FACTORY, 
        "com.ibm.websphere.naming.WsnInitialContextFactory");
    props.put(Context.PROVIDER_URL, "iiop://localhost/");
    InitialContext initialContext = InitialContext(props);
    
    // 例二:下面的例子顯示了如何找到基于文件的JNDI InitialContext
    Hashtable hashTab = new Hashtable ();
    hashTab.put(Context.INITIAL_CONTEXT_FACTORY,
    "com.sun.jndi.fscontext.RefFSContextFactory");
    hashTab.put(Context.PROVIDER_URL, "file://c:/temp");
    InitialContext initialContext = InitialContext(hashTab);
    
    // 例三:下面的例子顯示了如何找到基于LDAP的JNDI InitialContext
    Hashtable hashTab = new Hashtable ();
    hashTab.put(Context.INITIAL_CONTEXT_FACTORY,
    "com.sun.jndi.ldap.LdapCtxFactory");
    hashTab.put(Context.PROVIDER_URL,
    "file://server.company.com/o=provider_name, c=us");
    InitialContext initialContext = InitialContext(hashTab);


      獲得InitialContext對(duì)象之后,下一步是要查找java:comp/env子上下文(Subcontext),例如:

    Context env = (Context) initContext.lookup("java:comp/env";


    java:comp/env是J2EE規(guī)范推薦的保存環(huán)境變量的JNDI名稱子上下文。在這個(gè)子上下文中,JMS程序需要找到幾個(gè)由JMS管理的對(duì)象,包括QueueConnectionFactory對(duì)象、Queue對(duì)象等。

      下面的代碼尋找由JMS管理的幾個(gè)對(duì)象,這些對(duì)象是JMS程序執(zhí)行操作所必需的。

    queueConnectionFactory = (QueueConnectionFactory) env.lookup("QCF");
    requestQueue = (Queue) env.lookup("requestQueue");


      接下來,用QueueConnectionFactory對(duì)象構(gòu)造出QueueConnection對(duì)象。

    queueConnection = queueConnectionFactory.createQueueConnection();


      3.1 使用JMS QueueConnection對(duì)象

      JMS QueueConnection提供了一個(gè)通向底層MOM(就本文而言,它是指WebSphere MQ隊(duì)列管理器)的連接,按照這種方式創(chuàng)建的連接使用默認(rèn)的Java綁定傳輸端口來連接到本地的隊(duì)列管理器。

      對(duì)于MQ客戶程序來說(運(yùn)行在不帶本地隊(duì)列管理器的機(jī)器上的MQ程序),QueueConnectionFactory對(duì)象需要稍作調(diào)整,以便使用客戶端傳輸端口:

    QueueConn.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);


      另外,剛剛創(chuàng)建的QueueConnection對(duì)象總是處于停止?fàn)顟B(tài),需要調(diào)用"queueConnection.start();"將它啟動(dòng)。

      建立連接之后,用QueueConnection對(duì)象的createQueueSession方法來創(chuàng)建一次會(huì)話。必須注意的是,QueueSession對(duì)象有一個(gè)單線程的上下文,不是線程安全的,因此會(huì)話對(duì)象本身以及在會(huì)話對(duì)象基礎(chǔ)上創(chuàng)建的對(duì)象都不是線程安全的,在多線程環(huán)境中必須加以保護(hù)。createQueueSession方法有兩個(gè)參數(shù),構(gòu)造一個(gè)QueueSession對(duì)象的語句類如:

    queueSession = queueConnection.createQueueSession
    (false, Session.AUTO_ACKNOWLEDGE);


      createQueueSession方法的第一個(gè)參數(shù)是boolean類型,它指定了JMS事務(wù)類型--也就是說,當(dāng)前的隊(duì)列會(huì)話是事務(wù)化的(true)還是非事務(wù)化的(false)。JMS事務(wù)類型主要用于控制消息傳遞機(jī)制,不要將它與EJB的事務(wù)類型(NotSupported,Required,等等)混淆了,EJB的事務(wù)類型設(shè)定的是EJB模塊本身的事務(wù)上下文。createQueueSession方法的第二個(gè)參數(shù)是一個(gè)整數(shù),指定了確認(rèn)模式,也就是決定了如何向服務(wù)器證實(shí)消息已經(jīng)傳到。

      如果隊(duì)列會(huì)話是事務(wù)化的(調(diào)用createQueueSession方法的第一個(gè)參數(shù)是true),第二個(gè)參數(shù)的值將被忽略,因?yàn)樘峤皇聞?wù)時(shí)應(yīng)答總是自動(dòng)執(zhí)行的。如果由于某種原因事務(wù)被回退,則不會(huì)有應(yīng)答出現(xiàn),視同消息尚未遞送,JMS服務(wù)器將嘗試再次發(fā)送消息。如果有多個(gè)消息參與到同一會(huì)話上下文,它們被作為一個(gè)組處理,確認(rèn)最后一個(gè)消息也就自動(dòng)確認(rèn)了此前所有尚未確認(rèn)的消息。如果發(fā)生回退,情況也相似,整個(gè)消息組將被視為尚未遞送,JMS服務(wù)器將試圖再次遞送這些消息。

      下面說明一下底層的工作機(jī)制。當(dāng)發(fā)送程序發(fā)出一個(gè)消息,JMS服務(wù)器接收該消息;如果消息是持久性的,服務(wù)器先將消息寫入磁盤,然后確認(rèn)該消息。自此之后,JMS服務(wù)器負(fù)責(zé)把消息發(fā)送到目的地,除非它從客戶程序收到了確認(rèn)信息,否則不會(huì)從臨時(shí)存儲(chǔ)位置刪除消息。對(duì)于非持久性的消息,收到消息并保存到內(nèi)存之后確認(rèn)信息就立即發(fā)出了。

      如果隊(duì)列會(huì)話是非事務(wù)化的(調(diào)用createQueueSession方法的第一個(gè)參數(shù)是false),則應(yīng)答模式由第二個(gè)參數(shù)決定。第二個(gè)參數(shù)的可能取值包括:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,以及CLIENT_ACKNOWLEDGE。

      ■ 對(duì)于非事務(wù)化的會(huì)話來說,AUTO_ACKNOWLEDGE確認(rèn)模式是最常用的確認(rèn)模式。對(duì)于事務(wù)化的會(huì)話,系統(tǒng)總是假定使用AUTO_ACKNOWLEDGE確認(rèn)模式。

      ■ DUPS_OK_ACKNOWLEDGE模式是一種"懶惰的"確認(rèn)方式。可以想到,這種模式可能導(dǎo)致消息提供者傳遞的一些重復(fù)消息出錯(cuò)。這種確認(rèn)模式只用于程序可以容忍重復(fù)消息存在的情況。

      ■ 在CLIENT_ACKNOWLEDGE模式中,消息的傳遞通過調(diào)用消息對(duì)象的acknowledge方法獲得確認(rèn)。

      在AUTO_ACKNOWLEDGE模式中,消息的確認(rèn)通常在事務(wù)結(jié)束處完成。CLIENT_ACKNOWLEDGE使得應(yīng)用程序能夠加快這一過程,只要處理過程一結(jié)束就立即予以確認(rèn),恰好在整個(gè)事務(wù)結(jié)束之前。當(dāng)程序正在處理多個(gè)消息時(shí),這種確認(rèn)模式也是非常有用的,因?yàn)樵谶@種模式中程序可以在收到所有必需的消息后立即予以確認(rèn)。

      對(duì)于非事務(wù)化的會(huì)話,一旦把消息放入了隊(duì)列,所有接收程序立即能夠看到該消息,且不能回退。對(duì)于事務(wù)化的會(huì)話,JMS事務(wù)化上下文把多個(gè)消息組織成一個(gè)工作單元,消息的發(fā)送和接收都是成組執(zhí)行。事務(wù)化上下文會(huì)保存事務(wù)執(zhí)行期間產(chǎn)生的所有消息,但不會(huì)把消息立即發(fā)送到目的地。

      只有當(dāng)事務(wù)化的隊(duì)列會(huì)話執(zhí)行提交時(shí),保存的消息才會(huì)作為一個(gè)整體發(fā)送給目的地,這時(shí)接收程序才可以看到這些消息。如果在事務(wù)化隊(duì)列會(huì)話期間出現(xiàn)錯(cuò)誤,在錯(cuò)誤出現(xiàn)之前已經(jīng)成功處理的消息也會(huì)被撤銷。定義成事務(wù)化的隊(duì)列會(huì)話總是有一個(gè)當(dāng)前事務(wù),不需要顯式地用代碼來開始一個(gè)事務(wù)。當(dāng)然,事務(wù)化的環(huán)境總是存在一定的代價(jià)--事務(wù)化會(huì)話總是要比非事務(wù)化會(huì)話慢。

      ● 注意:JMS隊(duì)列會(huì)話的事務(wù)化是一個(gè)概念,實(shí)現(xiàn)了JMS邏輯的Java方法的事務(wù)屬性是另一個(gè)概念,不要將兩者混淆了。TX_REQUIRED屬性表示的是方法在一個(gè)事務(wù)上下文之內(nèi)運(yùn)行,從而確保數(shù)據(jù)庫(kù)更新和隊(duì)列中消息的處理作為一個(gè)不可分割的工作單元執(zhí)行(要么都成功提交,要么都回退)。順便說明一下,在容器管理的事務(wù)環(huán)境中,一個(gè)全局性的兩階段提交(Two-Phase Commit)事務(wù)上下文會(huì)被激活(參見本文后面的詳細(xì)說明),這時(shí)參與全局性事務(wù)的數(shù)據(jù)源應(yīng)當(dāng)用XA版的數(shù)據(jù)庫(kù)驅(qū)動(dòng)程序構(gòu)建。

      相比之下,在createQueueSession方法的第一個(gè)參數(shù)中指定true建立的是JMS事務(wù)上下文:多個(gè)消息被視為一個(gè)工作單元。在消息的接收方,執(zhí)行queueSession.commit()之前,即使已經(jīng)收到了多個(gè)消息也不會(huì)給出確認(rèn)信息;一旦queueSession.commit()執(zhí)行,它就確認(rèn)收到了在此之前尚未提交的所有消息;消息發(fā)送方的情況也相似。

      3.2 處理回退事件

      如前所述,如果事務(wù)異常終止,收到的消息會(huì)被發(fā)回到原來的隊(duì)列。接收消息的程序下一次再來處理該隊(duì)列時(shí),它還會(huì)再次得到同一個(gè)消息,而且這一次事務(wù)很有可能還是要失敗,必須再次把消息發(fā)送回輸入隊(duì)列--如此不斷重復(fù),就形成了無限循環(huán)的情況。

      為防止這種情況出現(xiàn),我們可以設(shè)置監(jiān)聽端口上的Max_retry計(jì)數(shù)器,超過Max_retry計(jì)數(shù)器規(guī)定的值,接收程序就不會(huì)再收到該消息;或者對(duì)于推式的會(huì)話,消息不會(huì)再被傳遞。另外,在推式會(huì)話中,重新傳遞的事務(wù)會(huì)被設(shè)置JMSRedelivered標(biāo)記,程序可以調(diào)用消息對(duì)象的getJMSRedelivered方法來檢查該標(biāo)記。

      消息通過QueueSender JMS對(duì)象發(fā)送,QueueSender對(duì)象利用QueueSession對(duì)象的createSender方法創(chuàng)建,每一個(gè)隊(duì)列都要?jiǎng)?chuàng)建一個(gè)QueueSender對(duì)象:

    queueSender = queueSession.createSender(requestQueue);


      接下來創(chuàng)建一個(gè)消息(TextMessage類型),根據(jù)myMessage字符串的值設(shè)置消息的內(nèi)容,myMessage變量作為輸入?yún)?shù)傳遞給queueSession.createTextMessag方法。

    textMsg = queueSession.createTextMessage(myMessage);


      指定接收隊(duì)列,告訴接收消息的程序要把應(yīng)答放入哪一個(gè)隊(duì)列:

    textMsg.setJMSReplyTo(responseQueue);


      最后,用Sender對(duì)象發(fā)送消息,然后停止并關(guān)閉連接。

    queueSender.send(textMsg);
    queueConnection.stop();
    queueConnection.close();


      發(fā)出消息之后,可以調(diào)用消息對(duì)象的getJMSMessageID方法獲得JMS賦予消息的ID(即提取JMSMessageID消息頭域),以后就可以通過這一ID尋找應(yīng)答消息:

    String messageID = message.getJMSMessageID();


      如果有JMS連接池,釋放后的會(huì)話不會(huì)被拆除,而是被返回給連接池以供重用。

      3.3 關(guān)閉JMS對(duì)象

      垃圾收集器不一定會(huì)及時(shí)關(guān)閉JMS對(duì)象,如果程序要?jiǎng)?chuàng)建大量短期生存的對(duì)象,可能會(huì)引起問題,至少會(huì)浪費(fèi)大量寶貴的資源,所以顯式地釋放所有不用的資源是很重要的。

    if (queueConn != null) {
        queueConn.stop();
        queueConn.close();
        queueConn = null;
    }


      關(guān)閉隊(duì)列連接對(duì)象將自動(dòng)關(guān)閉所有利用該連接對(duì)象創(chuàng)建的對(duì)象。但個(gè)別JMS提供者例外,這時(shí)請(qǐng)按照下面代碼所示的次序關(guān)閉所有JMS對(duì)象。

    // 關(guān)閉JMS對(duì)象
    if (queueReceiver != null) {
        queueReceiver.close();
        queueReceiver = null;
    }
    if (queueSender != null) {
        queueSender.close();
        queueSender = null;
    }
    if (queueSession != null) {
        queueSession.close();
        queueSession = null;
    }
    if (queueConn != null) {
        queueConn.stop();
        queueConn.close();
        queueConn = null;
    }


      3.4 接收消息

      消息接收方的處理邏輯也和發(fā)送方的相似。消息由JMS QueueReceiver對(duì)象接收,QueueReceiver對(duì)象建立在為特定隊(duì)列創(chuàng)建的QueueSession對(duì)象的基礎(chǔ)上。差別在于QueueReceiver接收消息的方式--QueueReceiver對(duì)象能夠按照偽同步或異步方式接收消息。下面的代碼顯示了如何用偽同步方式接收消息。

    // 偽同步方式接收消息
    import javax.jms.Message;
    import javax.jms.TextMessage;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueConnection;
    import javax.jms.QueueSender;
    import javax.jms.Queue;
    import javax.jms.Exception;
    import javax.naming.InitialContext;
    public class MyJMSReceiver {
        private String responseMessage;
        private String messageID;
        private int replyRetCode = 1;
        private QueueConnectionFactory queueConnectionFactory = null;
        private Queue inputQueue = null;
        private QueueConnection queueConnection = null;
        private QueueSession queueSession = null;
        private QueueReceiver queueReceiver = null;
        private TextMessage textMsg = null;
        public void processIncomingMessages() {
            // 查找管理對(duì)象
            InitialContext initContext = new InitialContext();
            Context env = (Context) initContext.lookup("java:comp/env");
            queueConnectionFactory = (QueueConnectionFactory)
    	env.lookup("tlQCF");
            inputQueue = (Queue) env.lookup("tlQueue");
            queueConnection = queueConnectionFactory.
    	createQueueConnection();
            queueConnection.start();
            queueSession=queueConnection.createQueueSession(true, 0);
            queueReceiver = queueSession.createReceiver(inputQueue);
            // 等一秒鐘,看看是否有消息到達(dá)
            TextMessage inputMessage = queueReceiver.receive(1000);
            // 其他處理代碼...
            queueConnection.stop();
            queueConnection.close();
        }
    }


      下面分析一下上面的代碼。消息由QueueReceiver對(duì)象執(zhí)行receive方法接收。receive方法有一個(gè)參數(shù),它指定了接收消息的等待時(shí)間(以毫秒計(jì))。在上面的代碼中,QueueReceiver對(duì)象在一秒之后超出期限,解除鎖定,把控制返回給程序。如果調(diào)用receive方法時(shí)指定了等待時(shí)間,QueueReceiver對(duì)象在指定的時(shí)間內(nèi)被鎖定并等待消息,如果超出了等待時(shí)間仍無消息到達(dá),QueueReceiver對(duì)象超時(shí)并解除鎖定,把控制返回給程序。

      接收消息的方法還有一個(gè)"不等待"的版本,使用這個(gè)方法時(shí)QueueReceiver對(duì)象檢查是否有消息之后立即返回,將控制交還給程序。下面是一個(gè)例子:

    TextMessage message = queueReceiver.receiveNoWait();


      如果調(diào)用receive方法時(shí)不指定參數(shù),QueueReceiver對(duì)象會(huì)無限期地等待消息。采用這種用法時(shí)應(yīng)當(dāng)非常小心,因?yàn)槌绦驎?huì)被無限期地鎖定。下面是一個(gè)無限期等待消息的例子:

    TextMessage message = queueReceiver.receive();


      不管等待期限的參數(shù)設(shè)置了多少,這都屬于拉式消息傳遞,如前所述,這種消息傳遞方式的效率較低。不僅如此,這種方法還不適合于J2EE EJB層,不能用于EJB組件之內(nèi),原因稍后就會(huì)說明。不過,這種處理方式適合在Servlet、JSP和普通Java JMS應(yīng)用中使用。

      接收消息的第二種辦法是異步接收。用異步接收方式時(shí),QueueReceiver對(duì)象必須用setMessageListener(class_name)方法注冊(cè)一個(gè)MessageListener類,其中class_name參數(shù)可以是任何實(shí)現(xiàn)了onMessage接口方法的類。在下面這個(gè)例子中,onMessage方法由同一個(gè)類實(shí)現(xiàn)(為簡(jiǎn)單計(jì),這里沒有給出try/catch塊的代碼)。

      ● 注意:接下來的消息接收方式不適用于EJB組件,這些代碼僅適用于Servlet、JSP和普通的Java JMS應(yīng)用程序。

    // 消息監(jiān)聽類的例子
    import javax.jms.Message;
    import javax.jms.TextMessage;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueConnection;
    import javax.jms.QueueReceiver;
    import javax.jms.Queue;
    import javax.jms.Exception;
    import javax.naming.InitialContext;
    public class MyListenerClass implements javax.jms.MessageListener {
        private int responseRetCode = 1;
        private boolean loopFlag = true;
        private QueueConnectionFactory queueConnectionFactory = null;
        private Queue responseQueue = null;
        private QueueConnection queueConnection = null;
        private QueueSession queueSession = null;
        private QueueSender queueSender = null;
        public void prepareEnvironment(String myMessage) {
            // 查找管理對(duì)象
            InitialContext initContext = new InitialContext();
            Context env = (Context) initContext.lookup("java:comp/env");
            queueConnectionFactory = (QueueConnectionFactory)
    	env.lookup("tlQCF");
            responseQueue = (Queue) env.lookup("tlResQueue");
            queueConnection = queueConnectionFactory.
    	createQueueConnection();
            queueSession = queueConnection.createQueueSession
    	(true, 0);
            queueReceiver = queueSession.createReceiver
    	(responseQueue);
            queueReceiver.setMessageListener(this) 
    	queueConnection.start();
        }
    
        public void onMessage(Message message) {
            // 希望收到一個(gè)文本消息...
            if (message instanceof TextMessage) {
                String responseText =
                  "確認(rèn)消息傳遞:" + ((TextMessage) message).getText();
                // 當(dāng)收到一個(gè)以@字符開頭的消息時(shí),循環(huán)結(jié)束,
                // MessageListener終止
                if (responseText.charAt(0) == '@') {
                    loopFlag = 1; // 結(jié)束處理;
                } else {t
                    // 繼續(xù)處理消息
                    // 本例中我們知道應(yīng)答消息的隊(duì)列,并非真的要用到
    		//message.getJMSReplyTo
                    // 這只是一個(gè)如何獲取應(yīng)答消息隊(duì)列的例子
                    Destination replyToQueue=message.getJMSReplyTo();
                    // 設(shè)置應(yīng)答消息
                    TextMessage responseMessage =
                     responseSession.createTextMessage(responseText);
                    // 使CorrelationID等于消息ID,
    		//這樣客戶程序就能將應(yīng)答消息和原來的請(qǐng)求
                    // 消息對(duì)應(yīng)起來
                    messageID = message.getJMSMessageID();
                    responseMessage.setJMSCorrelationID(messageID);
                    // 設(shè)置消息的目的地
                    responseMessage.setJMSDestination(
                        replyToQueue)
                    queueSender.send(
                        responseMessage);
                }
            }
        }
        // 保持監(jiān)聽器活動(dòng)
        while (loopFlag) {
            // 將控制轉(zhuǎn)移到其他任務(wù)(休眠2秒)
            System.out.println("正處于監(jiān)聽器循環(huán)之內(nèi)...");
            Thread.currentThread().sleep(2000);
        }
        // 當(dāng)loopFlag域設(shè)置成flase時(shí),結(jié)束處理過程
        queueConn.stop();
        queueConnection.close();
    }


      注冊(cè)一個(gè)MessageListener對(duì)象時(shí),一個(gè)實(shí)現(xiàn)了MessageListener邏輯的新線程被創(chuàng)建。我們要保持這個(gè)線程處于活動(dòng)狀態(tài),因此使用了一個(gè)while循環(huán),首先讓線程休眠一定的時(shí)間(這里是2秒),將處理器的控制權(quán)轉(zhuǎn)讓給其他任務(wù)。當(dāng)線程被喚醒時(shí),它檢查隊(duì)列,然后再次進(jìn)入休眠狀態(tài)。當(dāng)一個(gè)消息到達(dá)當(dāng)前注冊(cè)的MessageListener對(duì)象所監(jiān)視的隊(duì)列時(shí),JMS調(diào)用MessageListener對(duì)象的onMessage(message)方法,將消息作為參數(shù)傳遞給onMessage方法。

      這是一種推式的消息接收方式,程序效率較高,但仍不適合在EJB組件之內(nèi)使用。下面將探討為什么這些接收消息的方式都不能用于EJB組件之內(nèi),然后給出解決辦法。雖然這部分內(nèi)容放入了P2P通信方式中討論,但其基本思路同樣適用于Pub/Sub通信方式。


    3.5 消息驅(qū)動(dòng)的Bean

      在前文討論JMS消息接收處理邏輯的過程中,我們看到的代碼僅僅適用于Servlet、JSP以及普通的Java應(yīng)用程序,但不適用于EJB,因?yàn)樵贘MS的接收端使用EJB存在一些技術(shù)問題。一般地,JMS程序的交互模式分兩種:

      ■ 發(fā)送-遺忘:JMS客戶程序發(fā)出消息,但不需要應(yīng)答。從性能的角度來看,這是最理想的處理模式,發(fā)送程序不需要等待對(duì)方響應(yīng)請(qǐng)求,可以繼續(xù)執(zhí)行自己的任務(wù)。
    ■ 同步請(qǐng)求-答復(fù):JMS客戶程序發(fā)出消息并等待答復(fù)。在JMS中,這種交互方式通過執(zhí)行一個(gè)偽同步的接收方法(前文已經(jīng)討論)實(shí)現(xiàn)。然而,這里可能出現(xiàn)問題。如果EJB模塊在一個(gè)事務(wù)上下文之內(nèi)執(zhí)行(通常總是如此),單一事務(wù)之內(nèi)無法執(zhí)行請(qǐng)求-答復(fù)處理,這是因?yàn)榘l(fā)送者發(fā)出一個(gè)消息之后,只有當(dāng)發(fā)送者提交了事務(wù),接收者才能收到消息。因此,在單一事務(wù)之內(nèi)是不可能得到應(yīng)答的,因?yàn)樵谝粋€(gè)未提交的事務(wù)上下文之內(nèi)接收程序永遠(yuǎn)收不到消息,當(dāng)然也不可能作出應(yīng)答了。解決的辦法是請(qǐng)求-答復(fù)必須通過兩個(gè)不同的事務(wù)執(zhí)行。

      對(duì)于EJB來說,在JMS通信的接收端還可能出現(xiàn)另一個(gè)EJB特有的問題。在異步通信中,何時(shí)能夠獲得應(yīng)答是不可預(yù)料的,異步通信的主要優(yōu)點(diǎn)之一就是發(fā)送方能夠在發(fā)出消息之后繼續(xù)當(dāng)前的工作,即使接收方當(dāng)時(shí)根本不能接收消息也一樣,但請(qǐng)求-答復(fù)模式卻隱含地假定了EJB組件(假定是一個(gè)會(huì)話Bean)應(yīng)該在發(fā)出消息之后等待答復(fù)。J2EE實(shí)際上是一個(gè)基于組件技術(shù)的事務(wù)處理環(huán)境,它的設(shè)計(jì)目標(biāo)是處理大量短期生存的任務(wù),卻不是針對(duì)那些可能會(huì)被長(zhǎng)期阻塞來等待應(yīng)答的任務(wù)。

      為了解決這個(gè)問題,Sun在EJB 2.0規(guī)范中加入了一種新的EJB類型,即MDB。MDB是專門為JMS異步消息環(huán)境中(接收端)EJB組件面臨的問題而設(shè)計(jì)的,其設(shè)計(jì)思路是把監(jiān)聽消息是否到達(dá)的任務(wù)從EJB組件移到容器,由于這個(gè)原因,MDB組件總是在容器的控制之下運(yùn)行,容器代替MDB監(jiān)聽著特定的隊(duì)列或主題,當(dāng)消息到達(dá)該隊(duì)列或者主題,容器就激活MDB,調(diào)用MDB的onMessage方法,將收到的消息作為參數(shù)傳遞給onMessage方法。

      MDB是一種異步組件,它的工作方式和其他同步工作的EJB組件(會(huì)話Bean,實(shí)體Bean)不同,MDB沒有Remote接口和Home接口,客戶程序不能直接激活MDB,MDB只能通過收到的消息激活。MDB的另一個(gè)重要特點(diǎn)是它對(duì)事務(wù)和安全上下文的處理方式與眾不同,MDB已經(jīng)徹底脫離了客戶程序,因此也不使用客戶程序的事務(wù)和安全上下文。

      發(fā)送JMS消息的遠(yuǎn)程客戶程序完全有可能運(yùn)行在不同的環(huán)境之中--非J2EE的環(huán)境,例如普通的Java應(yīng)用程序,可能根本沒有任何安全或事務(wù)上下文。因此,發(fā)送方的事務(wù)和安全上下文永遠(yuǎn)不會(huì)延續(xù)到接收方的MDB組件。由于MDB永遠(yuǎn)不會(huì)由客戶程序直接激活,所以它們也永遠(yuǎn)不可能在客戶程序的事務(wù)上下文之內(nèi)運(yùn)行。由于這個(gè)原因,下面這些事務(wù)屬性對(duì)于MDB來說沒有任何意義--Supports,RequiresNew,Mandatory,以及None,因?yàn)檫@些事務(wù)屬性隱含地意味著延用客戶程序的事務(wù)上下文。MDB可以使用的事務(wù)屬性只有兩個(gè):NotSupported和Required。如果一個(gè)MDB組件有NotSupported事務(wù)屬性,則它的消息處理過程不屬于任何事務(wù)上下文。

      就象其他類型的EJB一樣,MDB可能參與到兩種類型的事務(wù):Bean管理的事務(wù),容器管理的事務(wù)。MDB組件的所有方法中,只有onMessage方法可以參與到事務(wù)上下文。如果讓MDB參與到Bean管理的事務(wù)上下文,則表示允許MDB在onMessage方法之內(nèi)開始和結(jié)束一個(gè)事務(wù)。這種策略的問題在于,收到的消息總是位于onMessage方法之內(nèi)開始的事務(wù)之外(要讓消息參與其中已經(jīng)太遲了)。在這種情況下,如果由于某種原因必須回退,則消息必須手工處理。

      如果讓MDB參與到容器管理的事務(wù)上下文,情況就完全不同了。只要設(shè)置了Required事務(wù)屬性,容器就可以在收到消息時(shí)開始一個(gè)事務(wù),這樣,消息就成為事務(wù)的一部分,當(dāng)事務(wù)成功提交時(shí),或者當(dāng)事務(wù)被回退而消息被返回到發(fā)送隊(duì)列時(shí),都可以獲得明確的確認(rèn)信息。

      事務(wù)可能在兩種情形下回退:第一種情形是程序顯式地調(diào)用setRollBackOnly方法,第二種情形是在onMessage方法之內(nèi)拋出一個(gè)系統(tǒng)異常(注意,拋出應(yīng)用程序異常不會(huì)觸發(fā)回退動(dòng)作)。當(dāng)事務(wù)回退時(shí),消息就被返回到原來的隊(duì)列,監(jiān)聽器將再次發(fā)送消息要求處理。一般地,這種情況會(huì)引起無限循環(huán),最后導(dǎo)致應(yīng)用運(yùn)行不正常。Max_retries屬性可以用來控制監(jiān)聽器再次提取重發(fā)消息的次數(shù)(這個(gè)屬性在配置監(jiān)聽器端口的時(shí)候設(shè)置),超過Max_retries規(guī)定的限制值之后,監(jiān)聽器將停止處理該消息(顯然,這算不上最理想的處理方式)。

      如果用WebSphere MQ作為JMS提供者,還有一種更好的解決辦法。我們可以配置WebSphere MQ,使其嘗試一定的次數(shù)之后停止發(fā)送同一消息,并將消息放入一個(gè)特定的錯(cuò)誤隊(duì)列或Dead.Letter.Queue。記住,MDB是無狀態(tài)的組件,也就是說它不會(huì)在兩次不同的方法調(diào)用之間維持任何狀態(tài)信息(就象Web服務(wù)器處理HTTP請(qǐng)求的情況一樣),同時(shí),由于客戶程序永遠(yuǎn)不會(huì)直接調(diào)用MDB組件,所以MDB組件不能識(shí)別任何特定的客戶程序,在這個(gè)意義上,可以認(rèn)為MDB組件是"匿名"運(yùn)行的。所有MDB組件都必須實(shí)現(xiàn)javax.ejb.MessageDrivenBean接口和javax.jms.MessageListener接口。

      除了onMessage方法之外,MDB還有其他幾個(gè)回調(diào)方法--由容器調(diào)用的方法:

      ■ ejbCreate方法:容器調(diào)用該方法來創(chuàng)建MDB的實(shí)例。在ejbCreate方法中可以放入一些初始化的邏輯。

      ■ setMessageDrivenContext方法:當(dāng)Bean第一次被加入到MDB Bean的緩沖池時(shí)容器將調(diào)用該方法。setMessageDrivenContext方法通常用來捕獲MessageDrivenContext,并將setMessageDrivenContext保存到類里面的變量,例如:

    public void setMessageDrivenContext(java.ejb.
    MessageDrivenContext mdbContext)  {
        messageDrivenContext = mdbContext;
    }


      ■ ejbRemove方法:容器把Bean從緩沖池移出并拆除時(shí)會(huì)調(diào)用該方法。一般地,ejbRemove方法會(huì)執(zhí)行一些清理操作。

      一般而言,在onMessage方法之內(nèi)執(zhí)行業(yè)務(wù)邏輯是不推薦的,業(yè)務(wù)方面的操作最好委派給其他EJB組件執(zhí)行。

      MDB容器會(huì)自動(dòng)控制好并發(fā)處理多個(gè)消息的情形。每一個(gè)MDB實(shí)例處理一個(gè)消息,在onMessage方法把控制返回給容器之前,不會(huì)要求MDB同時(shí)處理另一個(gè)消息。如果有多個(gè)消息必須并行處理,容器會(huì)激活同一MDB的多個(gè)實(shí)例。

      從WebSphere 5.0開始,開發(fā)環(huán)境(WSAD 5.0)和運(yùn)行環(huán)境(WAS 5.0)都開始全面支持MDB。

      下面的代碼片斷顯示了一個(gè)MDB的概念框架。

    // MDB概念框架
    // package 聲明略 
    import javax.jms.Message;
    import javax.jms.MapMessage;
    import javax.naming.InitialContext;
    import java.util.*;
    public class LibraryNotificationBean
        implements javax.ejb.MessageDrivenBean, 
        javax.jms.MessageListener {
        MessageDrivenContext messageDrivenContext;
        Context jndiContext;
        public void setMessageDrivenContext(MessageDrivenContext
        msgDrivenContext) {
            messageDrivenContext = msgDrivenContext;
            try {
                jndiContext = new InitialContext();
            } catch (NamingException ne) {
                throw new EJBException(ne);
            }
        }
    
        public void ejbCreate() {
        }
        public void onMessage(Message notifyMsg) {
            try {
                MapMessage notifyMessage = (MapMessage) notifyMsg;
                String bookTitle = (String) notifyMessage.
    	     getString("BookTitle");
                String bookAuthor = (String) notifyMessage.
    	     getString("BookAuthor");
                String bookCatalogNumber =
                    (String) notifyMessage.getString
    		 ("bookCatalogNumber");
                Integer bookQuantity =
                 (Integer) notifyMessage.getInteger("BookQuantity");
                // 處理消息...(調(diào)用其他EJB組件)
            } catch (Exception e) {
                throw new EJBException(e);
            }
        }
        public void ejbRemove() {
            try {
                jndiContext.close();
                jndiContext = null;
            } catch (NamingException ne) {
                // 異常處理
            }
        }
    }


      3.6 消息持久化

      消息可以是持久性的,也可以是非持久性的,持久性的消息和非持久性的消息可以放入同一個(gè)隊(duì)列。持久性的消息會(huì)被寫入磁盤,即使系統(tǒng)出現(xiàn)故障也可以恢復(fù)。當(dāng)然,正象其他場(chǎng)合的持久化操作一樣,消息的持久化也會(huì)增加一定的開銷,持久性消息大約要慢7%。控制消息持久化的辦法之一是定義隊(duì)列時(shí)設(shè)置其屬性,如果沒有顯式地設(shè)置持久化屬性,系統(tǒng)將采用默認(rèn)值。另外,JMS應(yīng)用程序本身也可以定義持久化屬性:

      ■ PERSISTENCE(QDEF):繼承隊(duì)列的默認(rèn)持久化屬性值。

      ■ PERSISTENCE(PERS):持久性的消息。

      ■ PERSISTENCE(NON):非持久性的消息。

      消息的持久性還可以通過消息屬性頭JMSDeliveryMode來調(diào)節(jié),JMSDeliveryMode可以取值DeliveryMode.PERSISTENT或DeliveryMode.NON_PERSISTENT,前者表示消息必須持久化,后者表示消息不需持久化。在事務(wù)化會(huì)話中處理的消息總是持久性的。

      3.7 消息選擇器

      JMS提供了從隊(duì)列選取一個(gè)消息子集的機(jī)制,能夠過濾掉所有不滿足選擇條件的消息。選擇消息的條件不僅可以引用消息頭的域,還可以引用消息屬性的域。下面是如何利用這一功能的例子:

    QueueReceiver queueReceiver =
        queueSession.createReceiver(requestQueue, 
          "BookTitle = 'Windows 2000'");
    QueueBrowser queueBrowser = queueSession.createBrowser
    (requestQueue, "BookTitle = 'Windows 2000'
    AND BookAuthor = 'Robert Lee'");


      注意,字符串(例如'Windows 2000')被一個(gè)雙引號(hào)之內(nèi)的單引號(hào)對(duì)包圍。

      四、JMS Pub/Sub編程

      Pub/Sub通信方式的編程與P2P通信編程相似,兩者最主要的差別是消息發(fā)送的目的地對(duì)象。在Pub/Sub通信方式中,發(fā)布消息的目的地和消費(fèi)消息的消息源是一個(gè)稱為主題(Topic)的JMS對(duì)象。主題對(duì)象的作用就象是一個(gè)虛擬通道,其中封裝了一個(gè)Pub/Sub的目的地(Destination)對(duì)象。

      在P2P通信方式中,(發(fā)送到隊(duì)列的)消息只能由一個(gè)消息消費(fèi)者接收;但在Pub/Sub通信方式中,消息生產(chǎn)者發(fā)布到主題的消息可以分發(fā)到多個(gè)消息消費(fèi)者,而且,消息的生產(chǎn)者和消費(fèi)者之間的結(jié)合是如此寬松,以至于生產(chǎn)者根本不必了解任何有關(guān)消息消費(fèi)者的情況,消息生產(chǎn)者和消費(fèi)者都只要知道一個(gè)公共的目的地(也就是雙方"交談"的主題)。

      由于這個(gè)原因,消息的生產(chǎn)者通常稱為出版者,消息的消費(fèi)者則相應(yīng)地稱為訂閱者。出版者為某一主題發(fā)布的所有消息都會(huì)傳遞到該主題的所有訂閱者,訂閱者將收到它訂閱的主題上的所有消息,每一個(gè)訂閱者都會(huì)收到一個(gè)消息的副本。訂閱可以是耐久性的(Durable)或非耐久性(Nondurable)。非耐久性的訂閱者只能收到訂閱之后才發(fā)布的消息。

      耐久性訂閱者則不同,它可以中斷之后再連接,仍能收到在它斷線期間發(fā)布的消息。Put/Sub通信方式中耐久性連接(在某種程度上)類似于P2P通信中的持久性消息/隊(duì)列。出版者和訂閱者永遠(yuǎn)不會(huì)直接通信,Pub/Sub代理的作用就象是一個(gè)信息發(fā)布中心,把所有消息發(fā)布給它們的訂閱者。

      ● 注意:從WebSphere MQ 5.2開始,加裝了MA88和MA0C擴(kuò)展的WebSphere MQ可以當(dāng)作JMS Pub/Sub通信代理。此外,WebSphere MQ Integrator也能夠作為一個(gè)代理用。從MQ 5.3開始,MA88成了MQ基本軟件包的一部分,因此只要在MQ 5.3的基礎(chǔ)上安裝MA0C就可以了。在MQ JMS環(huán)境中,要讓Pub/Sub通信順利運(yùn)行,運(yùn)行Pub/Sub代理的隊(duì)列管理器上必須創(chuàng)建一組系統(tǒng)隊(duì)列。

      MQ JMS MA0C擴(kuò)展模塊提供了一個(gè)工具來構(gòu)造所有必需的Pub/Sub系統(tǒng)隊(duì)列,這個(gè)工具就是MQJMS_PSQ.mqsc,位于\java\bin目錄之下。要構(gòu)造Pub/Sub通信方式所需的系統(tǒng)隊(duì)列,只需從上述目錄執(zhí)行命令:runmqsc < MQJMS_PSQ.mqsc。

      多個(gè)主題可以組織成一種樹形的層次結(jié)構(gòu),樹形結(jié)構(gòu)中主題的名稱之間用一個(gè)斜杠(/)分隔--例如,Books/UnixBooks/SolarisBooks。如果要訂閱一個(gè)以上的主題,可以在指定主題名字的時(shí)候使用通配符,例如,"Books/#"就是一個(gè)使用通配符的例子。

      下面給出了一個(gè)JMS Pub/Sub通信的例子(為簡(jiǎn)單計(jì),這里省略了try/catch代碼塊)。在這個(gè)例子中,Books/ UnixBooks/SolarisBooks主題的訂閱者將收到所有發(fā)布到SolarisBooks的消息,Books/#主題的訂閱者將收到所有有關(guān)Books的消息(包括發(fā)布到UnixBooks和SolarisBooks主題的消息)。

    // JMS Pub/Sub通信
    import javsx.jms.*;
    import javax.naming.*;
    import javax.ejb.*;
    public class PublisherExample implements javax.ejb.SessionBean {
        private TopicConnectionFactory topicConnFactory = null;
        private TopicConnection topicConnection = null;
        private TopicPublisher topicPublisher = null;
        private TopicSession topicSession = null;
        private SessionContext sessionContext = null;
        public void setSessionContext(SessionContext ctx) {
            sessionContext = cts;
        }
        public void ejbCreate() throws CreateException {
            InitialContext initContext = new InitialContext();
            // 從JNDI查找主題的連接工廠
            topicConnFactory =(TopicConnectionFactory) 
    	initContext.lookup("java:comp/env/TCF");
            // 從JNDI查找主題
            Topic unixBooksTopic =
              (Topic) initContext.lookup("java:comp/env/UnixBooks");
            Topic javaBooksTopic =
              (Topic) initContext.lookup("java:comp/env/JavaBooks");
            Topic linuxBooksTopic =
              (Topic) initContext.lookup("java:comp/env/LinuxBooks");
            Topic windowsBooksTopic =
              (Topic) initContext.lookup("java:comp/env/WindowsBooks");
            Topic allBooksTopic =
              (Topic) initContext.lookup("java:comp/env/AllBooks");
            // 創(chuàng)建連接
            topicConnection = topicConnFactory.createTopicConnection();
            topicConn.start();
            // 創(chuàng)建會(huì)話
            topicSession =
                topicConn.createTopicSession(false, 
    	      Session.AUTO_ACKNOWLEDGE);
        }
        public void publishMessage(String workMessage, 
          String topicToPublish) {
            // 創(chuàng)建一個(gè)消息
            TextMessage message = topicSession.createTextMessage
    	 (workMessage);   // 創(chuàng)建出版者,發(fā)布消息
            if ((topicToPublish.toLowerCase()).equals("java")) {
                TopicPublisher javaBooksPublisher =
                    topicSession.createPublisher(javaBooksTopic);
                javaBooksPublisher.publish(message);
            }
            if ((topicToPublish.toLowerCase()).equals("unix")) {
                TopicPublisher unixBooksPublisher =
                    topicSession.createPublisher(unixBooksTopic);
                J2EE Enterprise Messaging 475 unixBooksPublisher.
    	      publish(message);
            }
            if ((topicToPublish.toLowerCase()).equals("linux")) {
                TopicPublisher linuxBooksPublisher =
                    topicSession.createPublisher(linuxBooksTopic);
                linuxBooksPublisher.publish(message);
            }
            if ((topicToPublish.toLowerCase()).equals("windows")) {
                TopicPublisher windowsBooksPublisher =
                    topicSession.createPublisher(windowsBooksTopic);
                windowsBooksPublisher.publish(message);
            }
            TopicPublisher allBooksPublisher =
                topicSession.createPublisher(allBooksTopic);
            allBooksPublisher.publish(message);
        }
        public void ejbActivate() {
        }
        public void ejbPassivate() {
        }
        public void ejbRemove() {
            // 清理工作...
            if (javaBooksPublisher != null) {
                javaBooksPublisher.close();
                javaBooksPublisher = null;
            }
            if (unixBooksPublisher != null) {
                unixBooksPublisher.close();
                Chapter 9 476 unixBooksPublisher = null;
            }
            if (linuxBooksPublisher != null) {
                linuxBooksPublisher.close();
                linuxBooksPublisher = null;
            }
            if (windowsBooksPublisher != null) {
                windowsBooksPublisher.close();
                windowsBooksPublisher = null;
            }
            if (allBooksPublisher != null) {
                allBooksPublisher.close();
                allBooksPublisher = null;
            }
            if (topicSession != null) {
                topicSession.close();
                topicSession = null;
            }
            if (topicConnection != null) {
                topicConnection.stop();
                topicConnection.close();
                topicConnection = null;
            }
        }


      這段代碼比較簡(jiǎn)單,想來不需要多余的解釋了。唯一值得一提的地方是如何將一個(gè)消息發(fā)布到不同的主題:對(duì)于每一個(gè)特定的主題,分別創(chuàng)建一個(gè)對(duì)應(yīng)的出版者,然后用它將消息發(fā)布到主題。

      如果一個(gè)MDB組件只負(fù)責(zé)接收消息,把所有其他的消息處理操作都委托給專用業(yè)務(wù)組件(這意味著MDB之內(nèi)不包含消息發(fā)送或發(fā)布邏輯),MDB的代碼就相當(dāng)于P2P通信方式中的處理代碼,使用MDB唯一的改變之處是將監(jiān)聽端口從監(jiān)聽一個(gè)隊(duì)列改為監(jiān)聽一個(gè)主題。有興趣的讀者可以自己試驗(yàn)一下雙重監(jiān)聽的實(shí)現(xiàn)方式。

      五、二階段提交的事務(wù)

      在企業(yè)級(jí)處理任務(wù)中,為了保證JMS或非JMS代碼處理業(yè)務(wù)邏輯過程的完整性(對(duì)于一個(gè)單元的工作,要么成功提交所有的處理步驟,要么全部回退所有處理步驟),操作一般要在一個(gè)事務(wù)上下文之內(nèi)進(jìn)行。除了將消息放入隊(duì)列的過程之外,如果還要向數(shù)據(jù)庫(kù)插入記錄(二階段提交,要么全部成功,要么全部失敗),事務(wù)上下文的重要性尤其突出。

      為了支持二階段提交,JMS規(guī)范定義了下列XA版的JMS對(duì)象:XAConnectionFactory、XAQueueConnectionFactory、XASession、XAQueueSession、XATopicConnectionFactory、XATopicConnection,以及XATopicSession。另外,凡是參與全局事務(wù)的所有資源均應(yīng)該使用其XA版。特別地,對(duì)于JDBC資源,必須使用JDBC XADataSource。最后一點(diǎn)是,全局事務(wù)由JTA TransactionManager控制。下面的代碼顯示了建立全局事務(wù)所需的步驟。

    // 配置全局事務(wù)
    
    // 從JNDI名稱空間獲取一個(gè)JTA TransactionManager
    TransactionManager globalTxnManager =
        jndiContext.lookup("java:comp/env/txt/txnmgr");
    // 啟動(dòng)全局事務(wù)
    globalTxnManager.begin();
    // 獲取事務(wù)對(duì)象
    Transaction globalTxn = globalTxnManager.getTransaction();
    // 獲取XA數(shù)據(jù)資源
    XADatasource xaDatasource = jndiContext.lookup
    ("java:comp/env/jdbc/datasource");
    
    // 獲取一個(gè)連接
    XAConnection jdbcXAConn = xaDatasource.getConnection();
    // 從XA連接獲取XAResource
    XAResource jdbcXAResource = jdbcXAConn.getXAResource();
    // 在全局事務(wù)中"征募"XAResource
    globalTxn.enlist(jdbcXAResource);
    // 獲取XA隊(duì)列連接
    XAQueueConnectionFactory xaQueueConnectionFactory =
        JndiContext.lookup("java:comp/env/jms/xaQCF")
        XAQueueConnection xaQueueConnection =
            XaQueueConnectionFactory.createXAQueueConnection();
    // 獲取XA隊(duì)列會(huì)話
    XAQueueSession xaQueueSession = xaQueueConnection.
    createXAQueueSession();
    // 從會(huì)話獲取XA資源
    XAResource jmsXAResource = xaQueueSession.getXAResource();
    // 在全局事務(wù)中"征募"該XAResource
    globalTxn.enlist(jmsXAResource);
    // 其他處理工作...
    // 提交全局事務(wù)
    globalTxn.commit();


      總結(jié):本文介紹了JMS及其在WSAD環(huán)境下的編程,探討了JMS異步通信的主要優(yōu)點(diǎn),以及兩種通信方式(P2P和Pub/Sub)、MDB、JMS事務(wù)、二階段提交全局事務(wù)等方面的概念。希望本文的介紹對(duì)你有所幫助。
    posted on 2005-07-09 16:52 笨笨 閱讀(1557) 評(píng)論(1)  編輯  收藏 所屬分類: J2EEALL

    評(píng)論

    # re: WSAD環(huán)境下JMS異步通信全攻略[未登錄] 2007-08-07 16:05 nick
    非常的好,謝謝!!!!  回復(fù)  更多評(píng)論
      

    主站蜘蛛池模板: 成人看的午夜免费毛片| 久久久久久国产a免费观看不卡| 亚洲成a人片7777| 亚洲AV无码精品无码麻豆| 亚洲色无码专区在线观看| 亚洲色大成网站www永久一区| 国产亚洲人成网站在线观看| 五月天婷亚洲天综合网精品偷| 国产免费观看a大片的网站| 免费A级毛片在线播放不收费| 免费a级毛片18以上观看精品| 亚洲精品无码激情AV| 亚洲午夜福利在线观看| 亚洲va久久久噜噜噜久久男同| 亚洲国产综合精品中文第一区| 午夜亚洲AV日韩AV无码大全| 亚洲色图.com| 亚洲熟妇无码一区二区三区| 亚洲av永久无码一区二区三区| 国产成人综合亚洲绿色| www免费黄色网| 国产午夜无码精品免费看| 3344永久在线观看视频免费首页| 99久久99久久精品免费看蜜桃 | 亚洲综合校园春色| 在线a亚洲老鸭窝天堂av高清| 国产AV无码专区亚洲AV蜜芽 | 亚洲综合色一区二区三区小说| 亚洲a∨无码男人的天堂| 亚洲av最新在线观看网址| 国产精品福利在线观看免费不卡| 黄网站免费在线观看| 99视频全部免费精品全部四虎| 四虎免费大片aⅴ入口| 亚洲人成电影在线播放| 久久精品亚洲视频| 国产色在线|亚洲| 人妻18毛片a级毛片免费看| 伊人久久免费视频| 在线观看免费宅男视频| 77777亚洲午夜久久多人|