<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    廉頗老矣,尚能飯否

    java:從技術到管理

    常用鏈接

    統計

    最新評論

    Apache ActiveMQ學習筆記【mq的方式有兩種:點到點和發布/訂閱】

    .簡介ActiveMQ

    ActiveMQ 是最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現

    .下載ActiveMQ

    首先去http://activemq.apache.org/download.html 下載穩定版本4.1.0release

    Download Here

    Description ==Download Link ==PGP Signature file of download

    Binary for Windows== apache-activemq-4.1.0-incubator.zip== incubator-activemq-4.1.0.zip.asc

    Binary-for-Unix/Linux/Cygwin==apache-activemq-4.1.0-incubator.tar.gz==incubator-activemq-4.1.0.tar.gz.as 

    解壓后目錄如下:

    +bin (windows下面的bat和unix/linux下面的sh)

    +conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)

    +data (默認是空的)

    +docs (index,replease版本里面沒有文檔,-.-b不知道為啥不帶)

    +example (幾個例子

    +lib (activemMQ使用到的lib)

    -apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

    -LICENSE.txt

    -NOTICE.txt

    -README.txt

    -user-guide.html

    .啟動ActiveMQ

    可以使用bin\activemq.bat(activemq) 啟動,如果一切順利,你就會看見類似下面的信息(此處解壓到D盤根目錄下).幾個小提示

    1. 這個僅僅是最基礎的ActiveMQ的配置,很多地方都沒有配置因此不要直接使用這個配置用于生產系統

    2. 有的時候由于端口被占用,導致ActiveMQ錯誤,ActiveMQ可能需要以下端口1099(JMX),61616(默認的TransportConnector)

    3. 如果沒有物理網卡,或者MS的LoopBackAdpater Multicast會報一個錯誤

    .監控ActiveMQ

    啟動JDK自帶的java控制臺查看程序查看客戶端(如C:\jdk1.6.0_07\bin\jconsole.exe)

    遠程進程:127.0.0.1:1099

    . 測試ActiveMQ

    由于ActiveMQ是一個獨立的jms provider,所以我們不需要其他任何第三方服務器就可以馬上做我們的測試了.編譯example目錄下面的程序ProducerTool/ConsumerTool 是JMS參考里面提到的典型應用,Producer產生消息,Consumer消費消息,而且這個例子還可以加入參數幫助你測試剛才啟動的本地ActiveMQ或者是遠程的ActiveMQ

    ProducerTool [url] broker的地址,默認的是tcp://localhost:61616

    [true|flase] 是否使用topic,默認是false

    [subject] subject的名字,默認是TOOL.DEFAULT

    [durabl] 是否持久化消息,默認是false

    [messagecount] 發送消息數量,默認是10

    [messagesize] 消息長度,默認是255

    [clientID] durable為true的時候,需要配置clientID

    [timeToLive] 消息存活時間

    [sleepTime] 發送消息中間的休眠時間

    [transacte] 是否采用事務

    ConsumerTool [url] broker的地址,默認的是tcp://localhost:61616

    [true|flase] 是否使用topic,默認是false

    [subject] subject的名字,默認是TOOL.DEFAULT

    [durabl] 是否持久化消息,默認是false

    [maxiumMessages] 接受最大消息數量,0表示不限制

    [clientID] durable為true的時候,需要配置clientID

    [transacte] 是否采用事務

    [sleepTime] 接受消息中間的休眠時間,默認是0,onMeesage方法不休眠

    [receiveTimeOut] 接受超時

    .使用應用程序發送點到點消息隊列

    TestSender類

    package test;

    import javax.jms.DeliveryMode;

    import javax.jms.JMSException;

    import javax.jms.Message;

    import javax.jms.Session;

    import org.springframework.context.ApplicationContext;

    import org.springframework.context.support.FileSystemXmlApplicationContext;

    import org.springframework.jms.core.JmsTemplate;

    import org.springframework.jms.core.MessageCreator;

    public class TestSender {

        public static void main(String[] args) {

            ApplicationContext ctx = new FileSystemXmlApplicationContext("conf/applicationContext.xml");

            JmsTemplate template = (JmsTemplate) ctx.getBean("myJmsTemplate");

            template.setDeliveryMode(DeliveryMode.PERSISTENT);

            template.send(new MessageCreator() {

                public Message createMessage(Session session) throws JMSException {

                    Message message = session.createTextMessage();

                    message.setStringProperty("name", "wangwu");

                    message.setStringProperty("password", "ww");

                    System.out.println("send success");

                    return message;

                }

            });

        }

    }

    applicationContext.xml配置文件

    <beans xmlns="http://www.springframework.org/schema/beans"

        xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

      http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"

            destroy-method="stop">

            <property name="connectionFactory">

                <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                    <property name="brokerURL" value="tcp://127.0.0.1:61616" />

                </bean>

            </property>

        </bean>

        <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

            <property name="defaultDestinationName" value="Hello.Queue" />

            <property name="connectionFactory">

                <ref local="jmsFactory" />

            </property>

        </bean>

    </beans>

    . 使用應用程序接受點到點消息隊列

    ExampleListener類

    package test;

    import javax.jms.JMSException;

    import javax.jms.MapMessage;

    import javax.jms.Message;

    import javax.jms.ObjectMessage;

    import javax.jms.Session;

    import javax.jms.StreamMessage;

    import javax.jms.TextMessage;

     

    import org.springframework.jms.listener.SessionAwareMessageListener;

    public class ExampleListener implements SessionAwareMessageListener {

        public void onMessage(Message message, Session session) throws JMSException {

            if(message instanceof TextMessage) {

                System.out.println("TextMessage begin");

                System.out.println("name = " + message.getStringProperty("name"));

                System.out.println("password = " +message.getStringProperty("password"));

            }

            if (message instanceof ObjectMessage) {

                System.out.println("ObjectMessage");

            } else if (message instanceof TextMessage) {

                System.out.println("TextMessage");

            } else if (message instanceof StreamMessage) {

                System.out.println("StreamMessage");

            } else if (message instanceof MapMessage) {

                System.out.println("MapMessage");

            }

        }

    }

    TestReceiver類

    package test;

    import javax.jms.JMSException;

    import org.springframework.context.support.FileSystemXmlApplicationContext;

    public class TestReceiver {

        public static void main(String[] args) throws JMSException {

            new FileSystemXmlApplicationContext("conf/context.xml");

        }

    }

    context.xml配置文件.

    <beans xmlns="http://www.springframework.org/schema/beans"

        xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

      http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

        <bean id="ExampleListener" class="test.ExampleListener" />

        <bean id="destinationa" class="org.apache.activemq.command.ActiveMQQueue">

            <constructor-arg>

                <value>Hello.Queue</value>

            </constructor-arg>

        </bean>

        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"

            destroy-method="stop">

            <property name="connectionFactory">

                <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                    <property name="brokerURL" value="tcp://127.0.0.1:61616" />

                </bean>

            </property>

        </bean>

        <bean id="listenerContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">

            <property name="concurrentConsumers" value="1" />

            <property name="connectionFactory" ref="jmsFactory" />

            <property name="destination" ref="destinationa" />

            <property name="messageListener" ref="ExampleListener" />

        </bean>

    </beans>

    . 使用WEB程序發送點到點消息隊列

    TestWebSender類

    package com.jl.material.adapter.impl;

    import javax.jms.JMSException;

    import com.jl.framework.jms.JmsManager;

    import com.jl.framework.jms.JmsManagerFactory;

    import com.jl.framework.jms.MessageCreateable;

    import com.jl.framework.jms.MessageCreatorDefault;

    import com.jl.framework.jms.util.JmsUtil;

    import com.jl.material.util.MaterialConstants;

    public class TestWebSender{

        public void sender(String name, String password) {

            MessageCreateable mc = new MessageCreatorDefault();       

            mc.setStringProperty("name", name);

            mc.setStringProperty("password", password);       

            JmsManagerFactory jmsManagerFactory = new JmsManagerFactory();

            JmsManager jmsTXManager = jmsManagerFactory.createJmsManager(MaterialConstants.MATERIAL_MODULE_NAME);

            try {

                jmsTXManager.send(JmsUtil.getDestinationFromConfig("quality_synVerifyBatch_queue_M2Q"), mc);

                jmsTXManager.commit();

            } catch (JMSException e) {

                jmsTXManager.rollback();

                throw new RuntimeException(e);

            }

        }

    }

    . 使用WEB程序接受點到點消息隊列

    TestWebReceiver類

    package com.jl.material.adapter.impl;

    import javax.jms.MapMessage;

    import javax.jms.ObjectMessage;

    import javax.jms.StreamMessage;

    import javax.jms.TextMessage;

    import com.jl.framework.jms.util.support.JMSCallbackable;

    public class TestWebReceiver implements JMSCallbackable {

        public void mdCallback(Object TextMessage_textMessage) {}

        public void logJMSMessageInfo(String arg0) {}

        public void mdCallback(ObjectMessage arg0) throws Exception {}

        public void mdCallback(TextMessage textMessage) throws Exception {

            String name = textMessage.getStringProperty("name");

            String password = textMessage.getStringProperty("password");

            System.out.println("name = " + name);

            System.out.println("password = " + password);

        }

        public void mdCallback(StreamMessage arg0) throws Exception {}

        public void mdCallback(MapMessage arg0) throws Exception {}

    }

    test_JMS_Spring_Listener.xml配置文件

    <?xml version="1.0" encoding="UTF-8"?>

    <beans>

        <bean id="test_testWebReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">

           <property name="jmsCallbackable">

               <bean class="com.jl.material.adapter.impl.TestWebReceiver "/>

           </property>

        </bean>

        <bean id="test_testWebReceiver _queue" class="org.apache.activemq.command.ActiveMQQueue">

           <constructor-arg>

               <value>${quality_assayVerifyBatch_queue_Q2M}</value>

           </constructor-arg>

        </bean>

     

        <bean id="test_listenerContainerA"

           class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">

            <property name="concurrentConsumers" value="1" />

           <property name="connectionFactory" ref="jmsRecieveFactory" />

           <property name="destination" ref=" test_testWebReceiver _queue ">

           <property name="messageListener" ref=" test_testWebReceiver " />

           <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>

        </bean>   

    </beans>

    jms.properties資源文件

    jms.sendip=tcp://10.1.1.40:61616,tcp://activemq:61616

    jms.listenip=tcp://10.1.1.40:61616,tcp://activemq:61616

    quality_assayVerifyBatch_queue_Q2M=

    Quality_Assay_VerifyBatch_Q2M.Queue

    context.xml配置文件

    <?xml version='1.0' encoding='utf-8'?>

    <Context path="/identity" reloadable="false">

        <Environment name="jms/jms.sendip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />

        <Environment name="jms/jms.listenip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />

    </Context>

    十. 使用發布/訂閱方式
    使用該方式的發布方基本與點到點方式一樣,區別只在隊列名的后綴從 .Queue 變成了 .Topic
    區別主要在接收方配置文件
    <bean id="pound_removeBindingInfoReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
        <property name="jmsCallbackable">
            <bean class="com.jl.pound.adapter.impl.RemoveBindingInfoReceiver">
             <property name="initialInfomationService" ref="initialInfomationService" />
             <property name="productPoundServiceFacade" ref="productPoundServiceFacade" />
            </bean>
        </property>
     </bean>

     <bean id="pound_removeBindingInfomation_topic" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg><value>${CraftPound_RemoveBindingInfomation}</value></constructor-arg>
     </bean>

     <bean id="pound_listenerContainerB"
         class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
          <property name="concurrentConsumers" value="1"/>
          <property name="connectionFactory" ref="jmsRecieveFactory" />
          <property name="destination" ref="pound_removeBindingInfomation_topic" />
          <property name="messageListener" ref="pound_removeBindingInfoReceiver" />
          <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
          <property name="subscriptionDurable" value="true"></property>
          <property name="pubSubDomain" value="true"></property>
          <!-- <property name="clientId" value="100554"></property> -->
          <property name="clientId" ref="generateClientIdentityCode1"></property>
     </bean>

     <bean id="generateClientIdentityCode1"
       class="com.jl.pound.security.GenerateClientIdentityCode">
       <property name="prefix" value="1"></property>
     </bean>

    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.FactoryBean;

    import com.**.util.NetworkUtils;

    public class GenerateClientIdentityCode implements FactoryBean {

        /**
         * LOGGER for sample classify delegate
         */
        private static final Logger LOGGER = Logger.getLogger(GenerateClientIdentityCode.class);
       
        /**
         * prefix
         */
        private String prefix;

     /**
      * @return the prefix
      */
     public String getPrefix() {
      return prefix;
     }

     /**
      * @param prefix the prefix to set
      */
     public void setPrefix(String prefix) {
      this.prefix = prefix;
     }
     
        public String generateIdentityCode() {
            String mac = NetworkUtils.getMACAddress();
            mac = prefix + mac;
            LOGGER.info("this client identity code is:" + mac);
            return mac;
        }

     public Object getObject() throws Exception {
      return generateIdentityCode();
     }

     public Class getObjectType() {
      return String.class;
     }

     public boolean isSingleton() {
      return false;
     }
    }

    在JMS中,Topic實現publish和subscribe語義。一條消息被publish時,它將發到所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。

    JMS Queue執行load balancer語義。一條消息僅能被一個consumer收到。如果在message發送的時候沒有可用的consumer,那么它將被保存一直到能處理該message的consumer可用。如果一個consumer收到一條message后卻不響應它,那么這條消息將被轉到另一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負載均衡

    可以使用queue方式發送注冊郵件 好友動態數據等

    <一>表說明:
    當在啟動ActiveMQ時,先判斷表是否存在,如果不存在,將去創建表,如下:
    (1)ACTIVEMQ_ACKS:持久訂閱者列表
    1.CONTAINER:類型://主題
    如:topic://basicInfo.topic
    2.SUB_DEST:應該是描述,與1內容相同
    3.CLIENT_ID:持久訂閱者的標志ID,必須唯一
    4.SUB_NAME:持久訂閱者的名稱.(durableSubscriptionName)
    5.SELECTOR:消息選擇器,consumer可以選擇自己想要的
    6.LAST_ACKED_ID:最后一次確認ID,這個字段存的該該訂閱者最后一次收到的消息的ID

    (2)ACTIVEMQ_LOCK:進行數據訪問的排斥鎖
    1.ID:值為1
    2.TIME:時間
    3.BROKER_NAME:broker的名稱
       這個表似為集群使用,但現在ActiveMQ并不能共享數據庫.

    (3)ACTIVEMQ_MSGS:存儲Queue和Topic消息的表
    1.ID:消息的ID
    2.CONTAINER: 類型://主題
    如:queue://my.queue
    Topic://basicInfo.topic
    3.MSGID_PROD:發送消息者的標志
    MSGID_PROD =ID:[computerName][…..]
    注意computerName,不要使用中文,消息對象中會存儲這個部分,解析connectID時會出現Bad String錯誤.
    4.MSGID_SEQ:還不知用處
    5.EXPIRATION:到期時間.
    6.MSG:消息本身,Blob類型.
    可以在JmsTemplate發送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直沒有被處理,消息會被刪除,但是表中會存在CONTAINER為queue://ActiveMQ.DLQ的記錄.也就是說,相當于將過期的消息發給了一個ActiveMQ自定義的刪除隊列..

    <二>關于ActiveMQ的持久訂閱消息刪除操作
    1.主題消息只有一條,所有訂閱了這個消息的持久訂閱者都要收到消息,只有所有訂閱者收到消息并確認(Acknowledge)之后.才會刪除.
    說明:ActiveMQ支持批量(optimizeAcknowledge為true)確認,以提高性能
    2.ActiveMQ執行刪除Topic消息的cleanup()操作的時間間隔為5 minutes..



    柳德才
    13691193654
    18942949207
    QQ:422157370
    liudecai_zan@126.com
    湖北-武漢-江夏-廟山

    posted on 2009-04-08 11:18 liudecai_zan@126.com 閱讀(18465) 評論(3)  編輯  收藏 所屬分類: 程序人生

    評論

    # re: Apache ActiveMQ學習筆記【mq的方式有兩種:點到點和發布/訂閱】 2011-07-01 11:16 easy518網址導航

    http://www.easy518.com  回復  更多評論   

    # re: Apache ActiveMQ學習筆記【mq的方式有兩種:點到點和發布/訂閱】 2012-08-17 11:40 geek

    好文章  回復  更多評論   

    # re: Apache ActiveMQ學習筆記【mq的方式有兩種:點到點和發布/訂閱】 2016-08-03 13:32 zcf

    大神 ActiveMQ支不支持大消息拆分呢,求解釋  回復  更多評論   

    主站蜘蛛池模板: 久久午夜夜伦鲁鲁片无码免费| 黄色毛片视频免费| 日韩精品内射视频免费观看| 亚洲精品国产va在线观看蜜芽| 久久亚洲精品高潮综合色a片| 毛片在线免费视频| 国产成人免费高清激情明星| 亚洲AV无码成人网站久久精品大 | 国产精品亚洲A∨天堂不卡| 国产精品1024在线永久免费| 亚洲日韩在线观看| 中文字幕在线观看免费| 亚洲区小说区图片区QVOD| 中国国产高清免费av片| 亚洲AV无码久久精品色欲| 暖暖日本免费中文字幕| 久久亚洲AV无码精品色午夜麻豆| 亚洲中文字幕无码av永久| 在线播放高清国语自产拍免费 | 99久久这里只精品国产免费| 亚洲老熟女五十路老熟女bbw| 四虎影视在线永久免费看黄| www免费插插视频| 亚洲av综合av一区| 欧洲乱码伦视频免费| 国产AV无码专区亚洲AV麻豆丫| 亚洲国产成人久久笫一页| 久久精品国产影库免费看| 亚洲乱码在线视频| 五月婷婷亚洲综合| 七色永久性tv网站免费看| 亚洲剧场午夜在线观看| 国产伦精品一区二区三区免费下载| WWW国产成人免费观看视频| 色拍自拍亚洲综合图区| 午夜一区二区免费视频| 一本久久免费视频| 亚洲永久中文字幕在线| 免费一级毛片在级播放| 美丽姑娘免费观看在线观看中文版| 亚洲午夜精品一区二区麻豆|