<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    瘋狂

    STANDING ON THE SHOULDERS OF GIANTS
    posts - 481, comments - 486, trackbacks - 0, articles - 1
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    ActiveMQ

    Posted on 2010-01-29 10:14 瘋狂 閱讀(2090) 評論(0)  編輯  收藏 所屬分類: spring

    企業中各項目中相互協作的時候可能用得到消息通知機制。比如有東西更新了,可以通知做索引。

    在 Java 里有 JMS 的多個實現。其中 apache 下的 ActiveMQ 就是不錯的選擇。還有一個比較熱的是 RabbitMQ (是 erlang 語言實現的)。這里示例下使用 ActiveMQ

    用 ActiveMQ 最好還是了解下 JMS

    JMS 公共 點對點域 發布/訂閱域
    ConnectionFactory QueueConnectionFactory TopicConnectionFactory
    Connection QueueConnection TopicConnection
    Destination Queue Topic
    Session QueueSession TopicSession
    MessageProducer QueueSender TopicPublisher
    MessageConsumer QueueReceiver TopicSubscriber

    JMS 定義了兩種方式:Quere(點對點);Topic(發布/訂閱)。

    ConnectionFactory 是連接工廠,負責創建Connection。

    Connection 負責創建 Session。

    Session 創建 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息)。

    Destination 是消息的目的地。

    詳細的可以網上找些 JMS 規范(有中文版)。

    下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html,解壓,然后雙擊 bin/activemq.bat。運行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo。把 activemq-all-5.3.0.jar 加入 classpath。

    Jms 發送 代碼:

    1. public static void main(String[] args) throws Exception {   
    2.     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
    3.   
    4.     Connection connection = connectionFactory.createConnection();   
    5.     connection.start();   
    6.   
    7.     Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
    8.     Destination destination = session.createQueue("my-queue");   
    9.   
    10.     MessageProducer producer = session.createProducer(destination);   
    11.     for(int i=0; i<3; i++) {   
    12.         MapMessage message = session.createMapMessage();   
    13.         message.setLong("count"new Date().getTime());   
    14.         Thread.sleep(1000);   
    15.         //通過消息生產者發出消息   
    16.         producer.send(message);   
    17.     }   
    18.     session.commit();   
    19.     session.close();   
    20.     connection.close();   
    21. }  

    Jms 接收代碼:

    1. public static void main(String[] args) throws Exception {   
    2.     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
    3.   
    4.     Connection connection = connectionFactory.createConnection();   
    5.     connection.start();   
    6.   
    7.     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
    8.     Destination destination = session.createQueue("my-queue");   
    9.   
    10.     MessageConsumer consumer = session.createConsumer(destination);   
    11.     /*//listener 方式  
    12.     consumer.setMessageListener(new MessageListener() {  
    13.  
    14.         public void onMessage(Message msg) {  
    15.             MapMessage message = (MapMessage) msg;  
    16.             //TODO something....  
    17.             System.out.println("收到消息:" + new Date(message.getLong("count")));  
    18.             session.commit();  
    19.         }  
    20.  
    21.     });  
    22.     Thread.sleep(30000);  
    23.     */  
    24.     int i=0;   
    25.     while(i<3) {   
    26.         i++;   
    27.         MapMessage message = (MapMessage) consumer.receive();   
    28.         session.commit();   
    29.   
    30.         //TODO something....   
    31.         System.out.println("收到消息:" + new Date(message.getLong("count")));   
    32.     }   
    33.   
    34.     session.close();   
    35.     connection.close();   
    36. }  

    啟動 JmsReceiver 和 JmsSender 可以在看輸出三條時間信息。當然 Jms 還指定有其它格式的數據,如 TextMessage

    結合 Spring 的 JmsTemplate 方便用:

    xml:

    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    3.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  
    4.   
    5. <!-- 在非 web / ejb 容器中使用 pool 時,要手動 stop,spring 不會為你執行 destroy-method 的方法   
    6.     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  
    7.         <property name="connectionFactory">  
    8.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
    9.                 <property name="brokerURL" value="tcp://localhost:61616" />  
    10.             </bean>  
    11.         </property>  
    12.     </bean>  
    13. -->  
    14.     <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
    15.         <property name="brokerURL" value="tcp://localhost:61616" />  
    16.     </bean>  
    17.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    18.         <property name="connectionFactory" ref="jmsFactory" />  
    19.         <property name="defaultDestination" ref="destination" />  
    20.         <property name="messageConverter">  
    21.             <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />  
    22.         </property>  
    23.     </bean>  
    24.   
    25.     <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">  
    26.         <constructor-arg index="0" value="my-queue" />  
    27.     </bean>  
    28.   
    29. </beans>  

    sender:

    1. public static void main(String[] args) {   
    2.     ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");   
    3.   
    4.     JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");   
    5.   
    6.     jmsTemplate.send(new MessageCreator() {   
    7.   
    8.         public Message createMessage(Session session) throws JMSException {   
    9.             MapMessage mm = session.createMapMessage();   
    10.             mm.setLong("count"new Date().getTime());   
    11.             return mm;   
    12.         }   
    13.   
    14.     });   
    15. }  

    receiver:

    1. public static void main(String[] args) {   
    2.     ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");   
    3.   
    4.     JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");   
    5.     while(true) {   
    6.         Map<String, Object> mm =  (Map<String, Object>) jmsTemplate.receiveAndConvert();   
    7.         System.out.println("收到消息:" + new Date((Long)mm.get("count")));   
    8.     }   
    9. }  

    注意:直接用 Jms 接口時接收了消息后要提交一下,否則下次啟動接收者時還可以收到舊數據。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath

    主站蜘蛛池模板: 日韩精品亚洲专区在线观看| 精品亚洲成a人在线观看| 亚洲乱码日产精品a级毛片久久| 4444www免费看| 三上悠亚在线观看免费| 色妞www精品视频免费看| 亚洲性线免费观看视频成熟| 亚洲成人动漫在线| 国产亚洲精品久久久久秋霞 | 久久精品亚洲乱码伦伦中文| 成人免费视频一区二区三区| 中文字幕免费在线看线人| 91免费福利视频| 国产精品黄页免费高清在线观看| 国产成人精品亚洲| 亚洲精品第一国产综合亚AV| 亚洲国产精品成人久久久| 1区1区3区4区产品亚洲| 亚洲成a人片在线观看中文动漫| 国产亚洲无线码一区二区| 中文字幕亚洲日韩无线码| 亚洲精品无码专区久久同性男| 日本高清免费中文字幕不卡| 成人毛片免费观看视频大全| 黄色片在线免费观看 | 亚洲日本乱码一区二区在线二产线 | 亚洲AV成人精品日韩一区18p| 在线免费观看国产视频| 欧洲美熟女乱又伦免费视频| 成人毛片免费观看视频| 成人免费视频软件网站| 浮力影院第一页小视频国产在线观看免费| 四虎成人精品一区二区免费网站| 女人张开腿给人桶免费视频| 日韩精品视频免费观看| 国产一级一片免费播放i| 免费在线观看一级毛片| 亚洲欧洲日本在线| 怡红院亚洲怡红院首页| 久久久久久久综合日本亚洲| 亚洲国产精品成人精品无码区|