<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    少年阿賓

    那些青春的歲月

      BlogJava :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
      500 Posts :: 0 Stories :: 135 Comments :: 0 Trackbacks

    企業(yè)中各項(xiàng)目中相互協(xié)作的時(shí)候可能用得到消息通知機(jī)制。比如有東西更新了,可以通知做索引。

    在 Java 里有 JMS 的多個(gè)實(shí)現(xiàn)。其中 apache 下的 ActiveMQ 就是不錯(cuò)的選擇。ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。這里示例下使用 ActiveMQ

    用 ActiveMQ 最好還是了解下 JMS

    JMS 公共 點(diǎn)對(duì)點(diǎn)域 發(fā)布/訂閱域
    ConnectionFactory QueueConnectionFactory TopicConnectionFactory
    Connection QueueConnection TopicConnection
    Destination Queue Topic
    Session QueueSession TopicSession
    MessageProducer QueueSender TopicPublisher
    MessageConsumer QueueReceiver TopicSubscriber

    JMS 定義了兩種方式:Quere(點(diǎn)對(duì)點(diǎn));Topic(發(fā)布/訂閱)。

    ConnectionFactory 是連接工廠,負(fù)責(zé)創(chuàng)建Connection。

    Connection 負(fù)責(zé)創(chuàng)建 Session。

    Session 創(chuàng)建 MessageProducer(用來發(fā)消息) 和 MessageConsumer(用來接收消息)。

    Destination 是消息的目的地。

    詳細(xì)的可以網(wǎng)上找些 JMS 規(guī)范(有中文版)。

    下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解壓,然后雙擊 bin/activemq.bat。運(yùn)行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。

    Jms 發(fā)送 代碼:

    public static void main(String[] args) throws Exception {   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
      
        Connection connection = connectionFactory.createConnection();   
        connection.start();   
      
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
        Destination destination = session.createQueue("my-queue");   
      
        MessageProducer producer = session.createProducer(destination);   
        for(int i=0; i<3; i++) {   
            MapMessage message = session.createMapMessage();   
            message.setLong("count", new Date().getTime());   
            Thread.sleep(1000);   
            //通過消息生產(chǎn)者發(fā)出消息   
            producer.send(message);   
        }   
        session.commit();   
        session.close();   
        connection.close();   
    }



    Jms 接收代碼:


    public static void main(String[] args) throws Exception {   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
      
        Connection connection = connectionFactory.createConnection();   
        connection.start();   
      
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
        Destination destination = session.createQueue("my-queue");   
      
        MessageConsumer consumer = session.createConsumer(destination);   
        int i=0;   
        while(i<3) {   
            i++;   
            MapMessage message = (MapMessage) consumer.receive();   
            session.commit();   
      
            //TODO something....   
            System.out.println("收到消息:" + new Date(message.getLong("count")));   
        }   
      
        session.close();   
        connection.close();   
    }



    JMS五種消息的發(fā)送/接收的例子

    轉(zhuǎn)自:http://chenjumin.javaeye.com/blog/687124  

    1、消息發(fā)送

    //連接工廠  
    ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
            ActiveMQConnection.DEFAULT_USER,  
            ActiveMQConnection.DEFAULT_PASSWORD,  
            "tcp://localhost:61616");  
     
    //連接到JMS提供者  
    Connection conn = connFactory.createConnection();  
    conn.start();  
     
    //事務(wù)性會(huì)話,自動(dòng)確認(rèn)消息  
    Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
     
    //消息的目的地  
    Destination destination = session.createQueue("queue.hello");  
     
    //消息生產(chǎn)者  
    MessageProducer producer = session.createProducer(destination);  
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化  
     
     
    //文本消息  
    TextMessage textMessage = session.createTextMessage("文本消息");  
    producer.send(textMessage);  
     
    //鍵值對(duì)消息  
    MapMessage mapMessage = session.createMapMessage();  
    mapMessage.setLong("age", new Long(32));  
    mapMessage.setDouble("sarray", new Double(5867.15));  
    mapMessage.setString("username", "鍵值對(duì)消息");  
    producer.send(mapMessage);  
     
    //流消息  
    StreamMessage streamMessage = session.createStreamMessage();  
    streamMessage.writeString("streamMessage流消息");  
    streamMessage.writeLong(55);  
    producer.send(streamMessage);  
     
    //字節(jié)消息  
    String s = "BytesMessage字節(jié)消息";  
    BytesMessage bytesMessage = session.createBytesMessage();  
    bytesMessage.writeBytes(s.getBytes());  
    producer.send(bytesMessage);  
     
    //對(duì)象消息  
    User user = new User("cjm", "對(duì)象消息"); //User對(duì)象必須實(shí)現(xiàn)Serializable接口  
    ObjectMessage objectMessage = session.createObjectMessage();  
    objectMessage.setObject(user);  
    producer.send(objectMessage);  
     
     
    session.commit(); //在事務(wù)性會(huì)話中,只有commit之后,消息才會(huì)真正到達(dá)目的地  
    producer.close();  
    session.close();  
    conn.close(); 



    2、消息接收:通過消息監(jiān)聽器的方式接收消息


    public class Receiver implements MessageListener{  
        private boolean stop = false;  
          
        public void execute() throws Exception {  
            //連接工廠  
            ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                    ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD,  
                    "tcp://localhost:61616");  
              
            //連接到JMS提供者  
            Connection conn = connFactory.createConnection();  
            conn.start();  
              
            //事務(wù)性會(huì)話,自動(dòng)確認(rèn)消息  
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
              
            //消息的來源地  
            Destination destination = session.createQueue("queue.hello");  
              
            //消息消費(fèi)者  
            MessageConsumer consumer = session.createConsumer(destination);  
            consumer.setMessageListener(this);  
              
            //等待接收消息  
            while(!stop){  
                Thread.sleep(5000);  
            }  
              
            session.commit();  
              
            consumer.close();  
            session.close();  
            conn.close();  
        }  
     
        public void onMessage(Message m) {  
            try{  
                if(m instanceof TextMessage){ //接收文本消息  
                    TextMessage message = (TextMessage)m;  
                    System.out.println(message.getText());  
                }else if(m instanceof MapMessage){ //接收鍵值對(duì)消息  
                    MapMessage message = (MapMessage)m;  
                    System.out.println(message.getLong("age"));  
                    System.out.println(message.getDouble("sarray"));  
                    System.out.println(message.getString("username"));  
                }else if(m instanceof StreamMessage){ //接收流消息  
                    StreamMessage message = (StreamMessage)m;  
                    System.out.println(message.readString());  
                    System.out.println(message.readLong());  
                }else if(m instanceof BytesMessage){ //接收字節(jié)消息  
                    byte[] b = new byte[1024];  
                    int len = -1;  
                    BytesMessage message = (BytesMessage)m;  
                    while((len=message.readBytes(b))!=-1){  
                        System.out.println(new String(b, 0, len));  
                    }  
                }else if(m instanceof ObjectMessage){ //接收對(duì)象消息  
                    ObjectMessage message = (ObjectMessage)m;  
                    User user = (User)message.getObject();  
                    System.out.println(user.getUsername() + " _ " + user.getPassword());  
                }else{  
                    System.out.println(m);  
                }  
                  
                stop = true;  
            }catch(JMSException e){  
                stop = true;  
                e.printStackTrace();  
            }  
        }  





    http://blog.csdn.net/caihaijiang/article/details/5903296
    posted on 2012-08-02 14:59 abin 閱讀(1760) 評(píng)論(0)  編輯  收藏 所屬分類: ActiveMQ

    只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 久久乐国产综合亚洲精品| 亚洲中文久久精品无码| 成年女人18级毛片毛片免费| 亚洲高清日韩精品第一区| 亚洲一区精品无码| 亚洲精品国产精品乱码不99| 国产精品V亚洲精品V日韩精品| 亚洲日韩VA无码中文字幕 | 亚洲精品无码人妻无码 | 又大又粗又爽a级毛片免费看| 免费国产高清视频| 天堂亚洲免费视频| 亚洲国产成人爱av在线播放| 亚洲真人日本在线| 精品亚洲综合久久中文字幕| 亚洲av网址在线观看| 1区1区3区4区产品亚洲| 亚洲成人福利网站| 亚洲色大成网站www| 青娱乐在线免费观看视频| 一级特黄录像免费播放中文版| 两个人www免费高清视频| 久9久9精品免费观看| 国产成人精品免费视| 午夜免费福利网站| 亚洲狠狠爱综合影院婷婷| 日韩亚洲变态另类中文| 亚洲国产高清人在线| 亚洲人成综合在线播放| 亚洲乱亚洲乱妇24p| 草久免费在线观看网站| 91在线免费观看| 91成人免费观看| 天天摸天天碰成人免费视频| 免费人成视频x8x8入口| 国产亚洲精品美女久久久| 亚洲图片中文字幕| 老司机福利在线免费观看| 久久免费精品一区二区| 动漫黄网站免费永久在线观看| 哒哒哒免费视频观看在线www|