<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    瘋狂

    STANDING ON THE SHOULDERS OF GIANTS
    posts - 481, comments - 486, trackbacks - 0, articles - 1
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    ActiveMQ

    Posted on 2010-01-29 10:14 瘋狂 閱讀(2090) 評論(0)  編輯  收藏 所屬分類: spring

    企業中各項目中相互協作的時候可能用得到消息通知機制。比如有東西更新了,可以通知做索引。

    在 Java 里有 JMS 的多個實現。其中 apache 下的 ActiveMQ 就是不錯的選擇。還有一個比較熱的是 RabbitMQ (是 erlang 語言實現的)。這里示例下使用 ActiveMQ

    用 ActiveMQ 最好還是了解下 JMS

    JMS 公共 點對點域 發布/訂閱域
    ConnectionFactory QueueConnectionFactory TopicConnectionFactory
    Connection QueueConnection TopicConnection
    Destination Queue Topic
    Session QueueSession TopicSession
    MessageProducer QueueSender TopicPublisher
    MessageConsumer QueueReceiver TopicSubscriber

    JMS 定義了兩種方式:Quere(點對點);Topic(發布/訂閱)。

    ConnectionFactory 是連接工廠,負責創建Connection。

    Connection 負責創建 Session。

    Session 創建 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息)。

    Destination 是消息的目的地。

    詳細的可以網上找些 JMS 規范(有中文版)。

    下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html,解壓,然后雙擊 bin/activemq.bat。運行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo。把 activemq-all-5.3.0.jar 加入 classpath。

    Jms 發送 代碼:

    1. public static void main(String[] args) throws Exception {   
    2.     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
    3.   
    4.     Connection connection = connectionFactory.createConnection();   
    5.     connection.start();   
    6.   
    7.     Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
    8.     Destination destination = session.createQueue("my-queue");   
    9.   
    10.     MessageProducer producer = session.createProducer(destination);   
    11.     for(int i=0; i<3; i++) {   
    12.         MapMessage message = session.createMapMessage();   
    13.         message.setLong("count"new Date().getTime());   
    14.         Thread.sleep(1000);   
    15.         //通過消息生產者發出消息   
    16.         producer.send(message);   
    17.     }   
    18.     session.commit();   
    19.     session.close();   
    20.     connection.close();   
    21. }  

    Jms 接收代碼:

    1. public static void main(String[] args) throws Exception {   
    2.     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
    3.   
    4.     Connection connection = connectionFactory.createConnection();   
    5.     connection.start();   
    6.   
    7.     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
    8.     Destination destination = session.createQueue("my-queue");   
    9.   
    10.     MessageConsumer consumer = session.createConsumer(destination);   
    11.     /*//listener 方式  
    12.     consumer.setMessageListener(new MessageListener() {  
    13.  
    14.         public void onMessage(Message msg) {  
    15.             MapMessage message = (MapMessage) msg;  
    16.             //TODO something....  
    17.             System.out.println("收到消息:" + new Date(message.getLong("count")));  
    18.             session.commit();  
    19.         }  
    20.  
    21.     });  
    22.     Thread.sleep(30000);  
    23.     */  
    24.     int i=0;   
    25.     while(i<3) {   
    26.         i++;   
    27.         MapMessage message = (MapMessage) consumer.receive();   
    28.         session.commit();   
    29.   
    30.         //TODO something....   
    31.         System.out.println("收到消息:" + new Date(message.getLong("count")));   
    32.     }   
    33.   
    34.     session.close();   
    35.     connection.close();   
    36. }  

    啟動 JmsReceiver 和 JmsSender 可以在看輸出三條時間信息。當然 Jms 還指定有其它格式的數據,如 TextMessage

    結合 Spring 的 JmsTemplate 方便用:

    xml:

    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    3.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  
    4.   
    5. <!-- 在非 web / ejb 容器中使用 pool 時,要手動 stop,spring 不會為你執行 destroy-method 的方法   
    6.     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  
    7.         <property name="connectionFactory">  
    8.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
    9.                 <property name="brokerURL" value="tcp://localhost:61616" />  
    10.             </bean>  
    11.         </property>  
    12.     </bean>  
    13. -->  
    14.     <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
    15.         <property name="brokerURL" value="tcp://localhost:61616" />  
    16.     </bean>  
    17.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    18.         <property name="connectionFactory" ref="jmsFactory" />  
    19.         <property name="defaultDestination" ref="destination" />  
    20.         <property name="messageConverter">  
    21.             <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />  
    22.         </property>  
    23.     </bean>  
    24.   
    25.     <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">  
    26.         <constructor-arg index="0" value="my-queue" />  
    27.     </bean>  
    28.   
    29. </beans>  

    sender:

    1. public static void main(String[] args) {   
    2.     ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");   
    3.   
    4.     JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");   
    5.   
    6.     jmsTemplate.send(new MessageCreator() {   
    7.   
    8.         public Message createMessage(Session session) throws JMSException {   
    9.             MapMessage mm = session.createMapMessage();   
    10.             mm.setLong("count"new Date().getTime());   
    11.             return mm;   
    12.         }   
    13.   
    14.     });   
    15. }  

    receiver:

    1. public static void main(String[] args) {   
    2.     ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");   
    3.   
    4.     JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");   
    5.     while(true) {   
    6.         Map<String, Object> mm =  (Map<String, Object>) jmsTemplate.receiveAndConvert();   
    7.         System.out.println("收到消息:" + new Date((Long)mm.get("count")));   
    8.     }   
    9. }  

    注意:直接用 Jms 接口時接收了消息后要提交一下,否則下次啟動接收者時還可以收到舊數據。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath

    主站蜘蛛池模板: 亚洲色自偷自拍另类小说| 亚洲国产av美女网站| 久久国产乱子伦精品免费不卡| 亚洲成人免费电影| 免费在线观看视频a| 今天免费中文字幕视频| 99999久久久久久亚洲| 亚洲精品成人片在线观看| 67pao强力打造高清免费| 激情婷婷成人亚洲综合| 亚洲国产精品久久久久网站 | 亚洲一二成人精品区| 日韩一区二区a片免费观看| 九九综合VA免费看| 亚洲乱码一二三四区乱码| 久久久久亚洲精品无码网址 | 久久久久亚洲av无码尤物| 久久电影网午夜鲁丝片免费| 国产无限免费观看黄网站| 亚洲人精品亚洲人成在线| 亚洲色大成网站WWW久久九九| 成人免费男女视频网站慢动作| 国产性生大片免费观看性| 亚洲中文字幕精品久久| 亚洲成色在线综合网站| 免费国产a国产片高清网站| 亚洲免费黄色网址| 免费播放在线日本感人片| 亚洲AV香蕉一区区二区三区| 亚洲精品中文字幕无码AV| 国产aⅴ无码专区亚洲av麻豆 | 亚洲大尺度无码无码专线一区| 亚洲AV电影院在线观看| 亚洲成AV人在线观看网址| 性生交片免费无码看人| 日本免费人成视频在线观看| a一级毛片免费高清在线| 久久久久亚洲精品无码网址色欲| 亚洲国产精品久久人人爱| 亚洲卡一卡2卡三卡4卡无卡三| 亚洲人成影院在线无码按摩店|