<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Terry.Li-彬

    虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業(yè)。

      BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks
    1、介紹
    本章介紹在Jboss中使用Java Messageing Service (JMS). 不是JMS指南,而是JBoss和JMS如何一起使用,如果你想看JMS的介紹,請參考 JMS Specification 或 JMS turorial.
    最近隨著JBoss版本不斷更新,對JMS支持越來越成熟,也造成JBoss各個版本之間的不同。在這里我主要介紹JBoss3.0.X版本。
    本章給出的例子比較簡單,并指出了如何使用JMS的原理。所有的例子可以通過ant build file 來建立。
    為了能建立和運行此例子。我們使用兩種方式來進行:一是使用Ant命令,二是使用JAR和JAVA基本命令。必須有下面的環(huán)境變量:
    w JAVA_HOME 安裝JDK1.4的目錄。
    w JBOSS_DIST 安裝JBoss的目錄。
    你必須安裝JDK, 如果使用Ant必須安裝 Ant。可以參考我前面文檔的介紹。
    2、概述
    1) 什么是JMS
    JMS是Java API, 允許應用程序來建立、接收和讀取消息。程序依靠這些API, 在運行時需要一個JMS實現(xiàn)接口,來提供管理和控制,這被稱為JMS provider, 現(xiàn)在有幾種不同的JMS Provider; 在JBoss中的叫做JbossMQ。
    2) JMS 和J2EE
    JMS是在EJB和J2EE框架開發(fā)之前進行開發(fā)的,所以在JMS說明書中沒有涉及到EJB或J2EE。
    EJB 和J2EE第一代版本中也沒有涉及到JMS,一直到EJB1.1,在生成一個可用Beand的容器provider中JMS也不是必須的API。在J2EE1.2中JMS接口是必需的情況,但并不是非得要包含一個JMS Provider;在EJB2.0和J2EE1.3中又進行改變,應用服務器包含了一個JMS Provider,自從J2EE1。3需要EJB2.0,增加了以下兩個JMS特性:
    w 一種新Bean類型定義, 也就是消息驅(qū)動Beam (MDB), 這種bean做為JMS消息監(jiān)聽者,可以異步地處理JMS消息。
    w JMS處理作為資源,來自一個Bean 的JMD 發(fā)布(發(fā)送)必須能和其他bean的全局事務環(huán)境共享。這個需要把JMS認為是一個容器管理資源,象JDBC的連接。
    3) JMS 和JBoss
    JBoss從2.0版本以后都支持JMS。 在2.1中增加了MDB,從2.4版本開始JMS作為一個事務資源。
    JBoss中JMS的體系結(jié)構(gòu)如下:
    w JMS Provider, 叫做JbossMQ 。 是JBoss實現(xiàn)JMS 1.0.2規(guī)范的一部分,包括可選部分,象ASF(Application Service Facvility)。 JbossMQ處理和普遍JMS一樣:建立 queues (隊列)或topic(標題),持久性等。
    w MDB (Message Driven Beans),
    w 資源適配器。

    3、JMS Provider
    JBoss有它自己的JMS Provider 叫做JbossMQ。 適合與JMS 1.0.2 的JMS Provider,并且能用在所有平常的JMS程序中。為了清楚在JBoss中JMS是如何工作的,首先要清楚在JMS中涉及到的概念和術(shù)語,最好的辦法是閱讀JMS規(guī)范,下面給出了簡單的JMS介紹。
    1) JMS的簡單介紹
    當你發(fā)送一個消息,你不能直接發(fā)送到對此消息感興趣的接受者。而是你發(fā)送到一個目的地。對此消息感興趣的接受者必須連接到目的地,得到此消息或在目的地設置訂閱。
    在JMS中有兩種域:topics 和queues 。
    w 一個消息發(fā)送到一個topics ,可以有多個客戶端。用topic發(fā)布允許一對多,或多對多通訊通道。消息的產(chǎn)生者被叫做publisher, 消息接受者叫做subscriber。
    w queue 是另外一種方式,僅僅允許一個消息傳送給一個客戶。一個發(fā)送者將消息放在消息隊列中,接受者從隊列中抽取并得到消息,消息就會在隊列中消失。第一個接受者抽取并得到消息后,其他人就不能在得到它。
    為了能發(fā)送和接收消息,必須得到一個JMS連接。該連接是使用JMS Provider得到連接的,在得到連接之后,建立一個會話(Session)。然后再建立publisher/sender 來發(fā)送消息或subscriber/receiver來接收消息。
    運行時,如果使用topic 那么publisher 或subscriber 通過一個topic來關(guān)聯(lián),如果使用queue ,則sender 或receiver通過queue來關(guān)聯(lián)起來。
    通常,在JMS框架中運轉(zhuǎn)的方法如下:
    (1) 得到一個JNDI初始化上下文(Context);
    (2) 根據(jù)上下文來查找一個連接工廠TopicConnectFactory/ QueueConnectionFactory (有兩種連接工廠,根據(jù)是topic/queue來使用相應的類型);
    (3) 從連接工廠得到一個連接(Connect 有兩種[TopicConnection/ QueueConnection]);
    (4) 通過連接來建立一個會話(Session);
    (5) 查找目的地(Topic/ Queue);
    (6) 根據(jù)會話以及目的地來建立消息制造者(TopicPublisher/QueueSender)和消費者(TopicSubscriber/ QueueReceiver).
    為了得到一個連接和得到一個目的地(用來關(guān)聯(lián)publisher/sender 或subscriber/receiver),必須用provider-specific參數(shù)。
    通過JNDI來查找相應的連接工廠或目的地,JNDI適合于任何JMS Provider。但是查找用的名字是provider使用的。因此,在你使用的JMS Provider(其中包括JBossMQ),必須學會如何進行指定。JMS Provider中的任何設置,象連接特性,用到目的地等,在用到的Provider都有明確描述。
    2) 配置
    當使用一個JMS Provider時,有三個Provider-specific因素:
    w 得到一個JNDI初始化上下文
    w 用到的連接工廠的名字。
    w 對目的地的管理和命名協(xié)定。
    JBoss同它的JNDI一起執(zhí)行。為了簡單的JMS client, 配置和查找初始化上下文,同實現(xiàn)其他J2EE客戶端一樣。
    JMS-specific 來約束JBoss 的JMS provider (JBossMQ)。JbossMQ是通過xml 文件jbossmq-service.xml進行配置的,該文件放在在serverdefaultdeploy下。
    在xml文件中最基本的三件事情:
    w 增加一個新的目的地
    w 配置用戶
    w 獲得可用連接工廠的名字。
    (1) 增加新的目的地
    w 在目的地的xml文件在jboss 3.x中是jbossmq-destinations-service.xml(server/../deploy)。在文件中已經(jīng)存在幾個缺省的目的地,所以你比較容易明白怎樣增加到文件中。在例子中你需要一個topic目的地 spool,所以增加下面的語句到jbossmq-destinations-service.xml中。這種方式是長久存在的,不隨著JBoss服務器關(guān)閉而消失。
    <mbean code="org.jboss.mq.server.jmx.Topic"
    name="jboss.mq.destination:service=Topic,name=spool">
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    </mbean>
    w 另外一種方法是可以通過JMX HTML管理界面。通過http://localhost:8080/jmx-console 來訪問。在jboss.mq 下查找service=DestinationManager 的連接。然后在createTopic()或createQueue()來建立,這種方法建立的目的地是臨時性的,隨著服務器開始存在,當當JBoss 服務器重新啟動時,動態(tài)建立的目的地將會不存在。在JbossMQ中所有目的地都有一個目的地類型的前綴。對于topic前綴是topic ,對于queues前綴是queue。例如查找一個testTopic目的地,需要查找名字為“topic/testTopic”。
    在此種方法中有createTopic()或createQueue()分別有兩種方法:一是有兩個參數(shù),二是有一個參數(shù)的。兩個參數(shù)分別是:建立的目的地名稱和JNDI名稱。一個參數(shù)的只是目的地名稱,對于JNDI名稱默認是:[目的地類型(topic/queue) ]/目的地名稱。
    在這里我們使用的是第一種方法。直接修改jbossmq-destinations-service.xml文件。
    (2) 管理用戶
    在JMS中可能關(guān)聯(lián)一個連接和一個用戶,不幸的是沒有明確的方法來限制訪問JbossMQ或訪問特殊的目的地到一個給定的用戶。為了給大部分角色,在JbossMQ中不需要建立用戶,除非想有一個持久topic訂閱者。在這個例子中,用戶是必須的。
    用戶可以直接在文件jbossmq-state.xml(server/../conf)中添加。同樣也可以使用JMX HTML管理界面來增加(jboss.mq->service=StateManager->addUser())。
    <User>
    <Name>jacky</Name>
    <Password>jacky</Password>
    <Id>DurableSubscriberExample</Id>
    </User>>

    (3) 連接工廠
    JBossMQ 包括為topic和queue幾個不同的連接工廠,每個連接工廠有自己特性。當通過JNDI來查找一個連接工廠時,需要知道此連接工廠的名稱。所有可用連接工廠和它們的屬性,名稱都會在文件jbossmq-service.xml中。
    有三種類型連接工廠,依賴的通訊協(xié)議如下:
    OIL
    快速雙向scoket通訊協(xié)議。它是缺省的。
    UIL
    超過一個socket協(xié)議,可以使用在通過防火墻訪問,或者當客戶端不能正確的查找到服務器的IP地址。
    RMI
    最早的協(xié)議,是穩(wěn)定的,但是比較慢。
    JVM
    在JBoss 2.4之后增加的一個新的連接工廠類型。不需要用socket,當客戶端和JbossMQ使用同樣虛擬機時,可以使用。
    在JBoss2.4.1以后版本中,對于topic- 和 queue-目的地,連接工廠使用同樣的名字。下表列出了在JBoss中JMS連接工廠:
    目的地類型 JNDI名字 連接工廠類型
    Topic/Queue java:/ConnectionFactory JVM
    Topic/Queue java:/XAConnectionFactory JVM支持XA事務
    Topic/Queue RMIConnectionFactory RMI
    Topic/Queue RMIXAConnectionFactory RMI支持XA事務
    Topic/Queue ConnectionFactory OIL
    Topic/Queue XAConnectionFactory OIL支持XA事務
    Topic/Queue UILConnectionFactory UIL
    Topic/Queue UILXAConnectionFactory UIL支持XA事務

    3) JBoss中高級JMS配置
    在上邊段落主要描述了和JbossMQ一起實行的基本配置工作。在本段來描述JMS其他配置。
    (1) JMS持久性管理
    JMS持久性管理(PM)負責存儲消息,并且將消息標記為持久,如果服務器發(fā)生故障時,能保證消息不會丟失,并允許恢復持久性消息。持久性JMS消息可以使用不同的方法來完成。每個方法有自己優(yōu)點和缺陷:
    PM 名字 優(yōu)點 缺點
    File 比較穩(wěn)定 速度慢
    Rollinglogged 速度比較快 此方法比較新,有些地方需要完善
    JDBC 對于穩(wěn)定性和可量測性比較好 必須有JDBC
    Logged 速度快 Log files grow without bound, memory management problems during recovery
    在JBoss中缺省的持久性消息管理是File持久性管理。可以改變它,但必須對于一個JMS
    Server有且僅有一個持久性管理配置。所以你在JBoss管理界面的jboss.mq ? >
    service=PersistenceManager 只是看到一個。
    持久性管理的配置文件是jbossmq-service.xml。在server..deploy下。
    為了讓大家了解持久性管理的各種方法,我下面來逐個介紹如何配置。
    w File持久性管理
    File持久性管理的概念就是為每一個持久性消息建立一個文件。消息持久性方法不是全部都能使用,但它是比較穩(wěn)定的。
    File持久性管理在JBoss發(fā)布時,作為一個缺省的持久性管理。如果你打開jbossmq-service.xml文件,你會看到下面的XML:
    <mbean code="org.jboss.mq.pm.file.PersistenceManager"
    name="jboss.mq:service=PersistenceManager">
    <attribute name="DataDirectory">db/jbossmq/file</attribute>
    <depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends>
    </mbean>
    當設置Mbean配置時,F(xiàn)ile持久性管理允許你指定下面的屬性:
    DataDircetory 是存放持久性消息的路徑,會把生成的數(shù)據(jù)文件放在此目錄下。

    w 設置Rollinglogged持久性管理
    Rollinglogged持久性管理是比較新的一種持久性消息管理方法,因為它使用日志文件來持續(xù)多個消息,所以當建立一個文件時,不需要許多的I/O。
    定義Rollinglogged持久性管理:
    <mbean code="org.jboss.mq.pm.rollinglogged.PersistenceManager"
    name="jboss.mq:service=PersistenceManager">
    <attribute name="DataDirectory">db/jbossmq/file</attribute>
    <depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends>
    </mbean>
    Rollinglogged持久性管理中DataDirctory 存放持久性消息的路徑,會把生成的數(shù)據(jù)文件放在此目錄下。

    w 設置JDBC持久性管理
    JDBC持久性管理使用數(shù)據(jù)庫表來存儲消息。需要一個JBoss配置的數(shù)據(jù)源來訪問數(shù)據(jù)庫。具體內(nèi)容參考jbossmq-service.xml文件中的內(nèi)容。
    w 設置Logged持久性管理
    Logged持久性管理是比較早的一個,在JBoss2.4.1以后版本中不在建議使用。現(xiàn)在有其他更好的辦法。
    4、舉例說明
    當我們清楚了以后內(nèi)容后,現(xiàn)在我們來用JBoss實現(xiàn)一個例子來加深對JBoss和JMS的了解。
    在上面敘述中,我們知道明確使用JMS provider有三個基本的事情要做:配置JNDI初始化上下文,連接工廠的名字和使用目的地的名字。
    當編寫產(chǎn)品的最好的事情是不受provider-specific 影響,使代碼能在不同的JMS provider之間容易移植。在此這個例子沒有聚焦在開發(fā)產(chǎn)品上,而是解釋如何使用JbossMQ來工作。
    1) 初始化上下文
    w 配置JNDI的一個方法是通過屬性文件jndi.properties。在這個文件中使用正確的值,并且把它所在的路徑包含到classpath中,它比較容獲得正確初始化上下文。
    jndi.properties文件的內(nèi)容如下:
    java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
    java.naming.provider.url=localhost:1099
    java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
    把該文件放置的路徑成為你的classpath的一部分。如果你使用這種方法,在初始化上下文時,代碼比較簡單: Context context = new IntialContext();1
    w 在某些情景下,可能需要手工配置JNDI;例如當運行的類文件中環(huán)境已經(jīng)配置了一個初始化上下文,但不是你想用的上下文時,需要手工來配置一個上下文。設置在哈希表中的幾個屬性值,并且使用此哈希表來實例化一個上下文。定義語法:
    Hashtable props = new Hashtable();
    props.put(Context.INITIAL_CONTEXT_FACTORY,
    "org.jnp.interfaces.NamingContextFactory");
    props.put(Context.PROVIDER_URL, "localhost:1099");
    props.put("java.naming.rmi.security.manager", "yes");
    props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");

    2) 查找連接工廠
    自有了上下文后,需要查找一個連接工廠。為了查找它,使用一個可用的名字。查找連接工廠的代碼如下:
    對于一個topic目的地
    TopicConnectionFactory topicFactory = (TopicConnectionFactory) context.lookup (“ConnectionFactory”)
    Queue 目的地:
    QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup (“ConnectionFactory”)
    3) 建立連接和會話
    在我們有了連接工廠后,建立一個連接,在此連接中建立一個會話。
    對于topic代碼如下:
    //建立一個連接
    topicConnection = topicFactory.createTopicConnection();
    //建立一個會話
    topicSession = topicConnection.createTopicSession(false, //不需要事務
    Session.AUTO_ACKNOLEDGE //自動接收消息的收條。
    );
    對于queue代碼如下:
    //建立一個連接
    queueConnection = queueFactory.createQueueConnection();
    //建立一個會話
    queueSession = queueConnection .createQueueSession(false, //不需要事務
    Session.AUTO_ACKNOLEDGE //自動接收消息的收條。
    );
    一個會話建立時,配置是否調(diào)用事務
    在事務Session中,當事務被提交后,自動接收,如果事務回滾,所有的被消費的消息將會被重新發(fā)送。
    在非事務Session中,如果沒有調(diào)用事務處理,消息傳遞的方式有三種:
    Session.AUTO_ACKNOWLEDGE :當客戶機調(diào)用的receive方法成功返回,或當MessageListenser 成功處理了消息,session將會自動接收消息的收條。
    Session.CLIENT_ACKNOWLEDGE :客戶機通過調(diào)用消息的acknowledge方法來接收消息。接收發(fā)生在session層。接收到一個被消費的消息時,將自動接收該session已經(jīng)消費的所有消息。例如:如果消息的消費者消費了10條消息,然后接收15個被傳遞的消息,則前面的10個消息的收據(jù)都會在這15個消息中被接收。
    Session.DUPS_ACKNOWLEDGE :指示session緩慢接收消息。

    4) 查找目的地
    現(xiàn)在我們來介紹建立publishes/sends 或subscribles/receives消息。
    下面的代碼列出來查找一個目的地:
    對于topic 查找一個testTopic目的地
    Topic topic = (Topic) context.lookup(“topic/testTopic”);

    對于queue 查找一個testQueue目的地
    Queue queue= (Queue) context.lookup(“queue/testQueue”);
    注意:JbossM的前綴topic/ (queue/)通常被放在topic (queue)名字前面。
    在JMS中,當客戶機扮演每種角色,象對于topic來將的publisher ,subscriber 或?qū)τ趒ueue來將的sender, receiver, 都有自己不同類繼承和不同函數(shù)。
    5) 建立一個消息制造者Message Producer (topic publisher/ queue sender)
    消息制造者是一個由session創(chuàng)建的對象,主要工作是發(fā)送消息到目的地。
    對于一個topic,需要通過TopicSession來創(chuàng)建一個TopicPublisher。代碼如下:
    TopicPublisher topicPublisher = TopicSession.createPublisher(topic);

    對于一個queue,需要通過QueueSession來創(chuàng)建一個QueueSender。代碼如下:
    QueuePublisher queuePublisher = queueSession.createSender(queue);
    6) 消息發(fā)送
    建立一個TestMessage并且publish 它, 代碼:
    TextMessage message = topicSession.createTestMessage();
    message.setText(msg);
    topicPublishe.publish(topic, message);
    建立一個TestMessage并且send它, 代碼:
    TextMessage message = queueSession.createTestMessage();
    message.setText(msg);
    queueSender.send(queue, message);

    7) 下面是一個完成的topic publisher 代碼,文件名HelloPublisher.java :
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicConnection;
    import javax.jms.TopicSession;
    import javax.jms.TopicPublisher;
    import javax.jms.Topic;
    import javax.jms.TextMessage;
    import javax.jms.Session;
    import javax.jms.JMSException;
    import java.util.Hashtable;
    public class HelloPublisher {

    TopicConnection topicConnection;
    TopicSession topicSession;
    TopicPublisher topicPublisher;
    Topic topic;

    public HelloPublisher(String factoryJNDI, String topicJNDI)
    throws JMSException, NamingException {
    Hashtable props=new Hashtable();
    props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
    props.put(Context.PROVIDER_URL, "localhost:1099");
    props.put("java.naming.rmi.security.manager", "yes");
    props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
    Context context = new InitialContext(props);
    TopicConnectionFactory topicFactory =
    (TopicConnectionFactory)context.lookup(factoryJNDI);
    topicConnection = topicFactory.createTopicConnection();
    topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

    topic = (Topic)context.lookup(topicJNDI);

    topicPublisher = topicSession.createPublisher(topic);

    }

    public void publish(String msg) throws JMSException {

    TextMessage message = topicSession.createTextMessage();
    message.setText(msg);
    topicPublisher.publish(topic, message);
    }

    public void close() throws JMSException {
    topicSession.close();
    topicConnection.close();
    }

    public static void main(String[] args) {
    try {
    HelloPublisher publisher = new HelloPublisher(
    "ConnectionFactory", "topic/testTopic");
    for (int i = 1; i < 11; i++) {
    String msg = "Hello World no. " + i;
    System.out.println("Publishing message: " + msg);
    publisher.publish(msg);
    }
    publisher.close();
    } catch(Exception ex) {
    System.err.println(
    "An exception occurred while testing HelloPublisher25: " + ex);
    ex.printStackTrace();
    }
    }
    }
    我們知道,使用JMS不僅能發(fā)送(send)/發(fā)布(publish)消息,也能獲得(send)/發(fā)布(publish)的消息。在時間方式有良種方法來做:
    w 同步(Synchronously):需要手工的去得到消息,為了得到一個消息客戶機調(diào)用方法得到消息,直到消息到達或在規(guī)定的時間內(nèi)沒有到達而超時。我們在例子中沒有說明這部分,大家可以實驗一下。
    w 異步(Asynchronously):你需要定義一個消息監(jiān)聽器(MessageListener),實現(xiàn)該接口。當消息達到時,JMS provider通過調(diào)用該對象的 onMessage方法來傳遞消息。
    從原則來將,topic和queue都是異步的,但是在這兩種目的地中有不同的類和方法。首先,必須定義一個MessageListener接口。
    8) 建立一個MessageListener
    在建立了你需要的subscriber/receiver,并且登記了監(jiān)聽器后。就可以調(diào)用連接的start方法得到JMS provider 發(fā)送到的消息了。如果在登記監(jiān)聽器之前調(diào)用start方法,很可能會丟失消息。
    public void onMessage(Message m) {
    try {
    String msg = ((TextMessage)m).getText();
    System.out.println("HelloSubscriber got message: " + msg);
    } catch(JMSException ex) {
    System.err.println("Could not get text message: " + ex);
    ex.printStackTrace();
    }
    }
    9) 建立消息消費者
    對于topic來將:
    //建立一個訂閱者
    topicSubscriber = topicSession.createSubscriber(topic);
    //設置消息監(jiān)聽器,
    topicSubscriber.setMessageListener(this)
    //連接開始
    topicConnection.start();
    對于queue來將:
    //建立一個訂閱者
    queueReceiver = queueSession.createReceiver(queue);
    //設置消息監(jiān)聽器,
    queueReceiver .setMessageListener(this)
    //連接開始
    queueConnection.start();
    10) 完整的代碼,放在文件HelloSubscriber.java中,如下:
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;

    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicConnection;
    import javax.jms.TopicSession;
    import javax.jms.TopicSubscriber;
    import javax.jms.Topic;
    import javax.jms.Message;
    import javax.jms.TextMessage;
    import javax.jms.Session;
    import javax.jms.MessageListener;
    import javax.jms.JMSException;

    public class HelloSubscriber implements MessageListener {
    TopicConnection topicConnection;
    TopicSession topicSession;
    TopicSubscriber topicSubscriber;
    Topic topic;
    public HelloSubscriber(String factoryJNDI, String topicJNDI)
    throws JMSException, NamingException
    {
    Context context = new InitialContext();
    TopicConnectionFactory topicFactory =
    (TopicConnectionFactory)context.lookup(factoryJNDI);
    topicConnection = topicFactory.createTopicConnection();
    topicSession = topicConnection.createTopicSession(
    false, Session.AUTO_ACKNOWLEDGE);
    topic = (Topic)context.lookup(topicJNDI);
    topicSubscriber = topicSession.createSubscriber(topic);
    topicSubscriber.setMessageListener(this);
    System.out.println(
    "HelloSubscriber subscribed to topic: " + topicJNDI);
    topicConnection.start();
    }
    public void onMessage(Message m) {
    try {
    String msg = ((TextMessage)m).getText();
    System.out.println("HelloSubscriber got message: " + msg);
    } catch(JMSException ex) {
    System.err.println("Could not get text message: " + ex);
    ex.printStackTrace();
    }
    }
    public void close() throws JMSException {
    topicSession.close();
    topicConnection.close();
    }
    public static void main(String[] args) {
    try {
    HelloSubscriber subscriber = new HelloSubscriber(
    "TopicConnectionFactory",
    "topic/testTopic");
    } catch(Exception ex) {
    System.err.println(
    "An exception occurred while testing HelloSubscriber: " + ex);
    ex.printStackTrace();
    }
    }
    }

    11) 編輯、運行程序
    直接使用命令(java)
    w 開啟命令操作符。設置classpath :
    set classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.jar;C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientlog4j.jar;.
    w 首先運行訂閱消息端:java HelloSubscriber
    w 再開啟另外一個命令窗口設置classpath :
    set classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.jar;C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientlog4j.jar;.
    w 運行發(fā)布消息端:java HelloPublisher
    5、補充
    在最后我們解釋JBoss-specific特性:如何用代碼來管理目的地。JBoss各個版本可能不同,但是差別不大。我使用的是jboss3.0.6。
    實現(xiàn)這個目的有兩種不同的方法,依賴于是否代碼是在和JBoss同樣的虛擬機還是獨立獨立的。它們都包括調(diào)用一個通過service=DestinationManager 登記的JMX Bean。這個Mbean 有四個方法來管理目的地:createTopic(),createQueue(),destroyTopic(),destroyQueue()。
    在代碼中實現(xiàn)管理目的地在影射怎樣調(diào)用MBean有不同的地方。如果程序虛擬機和Mbean服務器一樣,可以直接調(diào)用。
    建立一個topic 目的地的代碼如下:
    MBeanServer server = (MBeanServer)
    MBeanServerFactory.findMBeanServer(null).iterator().next();
    server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"),
    method,
    new Object[] { “myTopic” },
    new String[] { "java.lang.String" });

    如果程序和Mbean服務器的虛擬機不同,需要通過一個JMX adapter。一個JMX adapter是一個HTML GUI。用程序通過URL來調(diào)用Mbean。代碼如下:
    import java.io.InputStream;
    import java.net.URL;
    import java.net.HttpURLConnection;
    import javax.management.MBeanServerFactory;
    import javax.management.MBeanServer;
    import javax.management.ObjectName;
    import javax.jms.Topic;
    import javax.jms.Queue;
    public class DestinationHelper {
    static final String HOST = "localhost";
    static final int PORT = 8080;
    static final String BASE_URL_ARG = "/jmx-console/HtmlAdaptor?";
    public static void createDestination(Class type, String name)
    throws Exception
    {
    String method = null;
    if (type == Topic.class) { method = "createTopic"; }
    else if (type == Queue.class) { method = "createQueue";}
    invoke(method, name);
    }

    public static void destroyDestination(Class type, String name)
    throws Exception
    {
    String method = null;
    if (type == Topic.class) { method = "destroyTopic"; }
    else if (type == Queue.class) { method = "destroyQueue";}
    invoke(method, name);
    }

    protected static void invoke(String method, String destName)
    throws Exception
    {
    try {
    MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next();
    invokeViaMBean(method, destName);
    }catch(Exception ex) { invokeViaUrl(method, destName);}
    }
    protected static void invokeViaUrl(String method, String destName)
    throws Exception
    {
    String action = "action=invokeOp&methodIndex=6&name=jboss.mq%3Aservice%3DDestinationManager&arg0=" + destName;
    String arg = BASE_URL_ARG + action;
    URL url = new URL("http", HOST, PORT, arg);
    HttpURLConnection con = (HttpURLConnection)url.openConnection();
    con.connect();

    InputStream is = con.getInputStream();
    java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream();
    byte[] buff = new byte[1024];
    for(;;) {
    int size = is.read( buff );
    if (size == -1 ) { break; }
    os.write(buff, 0, size);
    }
    os.flush();

    if (con.getResponseCode() != HttpURLConnection.HTTP_OK ) {
    throw new Exception ("Could not invoke url: " + con.getResponseMessage() );
    } else {
    System.out.println("Invoked URL: " + method + " for destination " + destName + "got resonse: " + os.toString());
    }
    }
    protected static void invokeViaMBean(String method, String destName)
    throws Exception
    {
    MBeanServer server = (MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next();
    server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"),
    method,
    new Object[] { destName },
    new String[] { "java.lang.String" });
    }
    public static void main(String[] args) {
    try {
    if (args.length >0){
    destroyDestination(Topic.class,"myCreated");
    }else {
    createDestination(Topic.class,"myCreated");
    }
    }catch(Exception ex) {
    System.out.println("Error in administering destination: " + ex);
    ex.printStackTrace();
    }
    }

    }
    編輯命令:
    javac -classpath C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx.jar;. DestinationHelper.java
    運行命令
    java -classpath C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx.jar;. DestinationHelper
    當運行完后查看http://localhost:8080/jmx-console下面的jboss.mq.destination中有name=myCreated,service=Topic
    表明你建立成功。當JBoss關(guān)閉重新啟動時。該目的地不會在存在
    posted on 2008-06-28 20:22 禮物 閱讀(905) 評論(0)  編輯  收藏 所屬分類: java
    主站蜘蛛池模板: 亚洲国产高清在线精品一区| 美女的胸又黄又www网站免费| 91成人免费在线视频| 亚洲国产一区二区三区在线观看 | 日本不卡高清中文字幕免费| 青青草97国产精品免费观看| 亚洲日本中文字幕区| 日本一区二区三区日本免费| 日本高清免费观看| 亚洲高清国产拍精品熟女| 国产精品亚洲аv无码播放| 成年男女免费视频网站| 久久国产免费观看精品| 亚洲gay片在线gv网站| 国产V亚洲V天堂无码| 日本黄色免费观看| 99久久人妻精品免费二区| 国产亚洲人成在线影院| 亚洲精品熟女国产| 亚洲午夜精品第一区二区8050| 91久久成人免费| 中文字幕在线免费播放| 亚洲精品中文字幕| 亚洲黄色免费网址| 亚洲一区二区三区自拍公司| 日韩免费a级在线观看| 国产成人精品久久免费动漫| 9久热这里只有精品免费| 337P日本欧洲亚洲大胆精品| 亚洲精品资源在线| 日韩亚洲欧洲在线com91tv| 国产男女猛烈无遮挡免费视频网站| 88xx成人永久免费观看| 国产精品免费久久| 日韩色日韩视频亚洲网站| 亚洲国产精品一区二区久| 亚洲av日韩av激情亚洲| 伊人久久精品亚洲午夜| 国产男女猛烈无遮挡免费视频 | 久久久久久亚洲精品中文字幕| 亚洲AV中文无码乱人伦|