<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    瘋狂

    STANDING ON THE SHOULDERS OF GIANTS
    posts - 481, comments - 486, trackbacks - 0, articles - 1
      BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

    Spring和 jms

    Posted on 2013-04-11 11:30 瘋狂 閱讀(3648) 評(píng)論(0)  編輯  收藏 所屬分類: spring

    主要的幾個(gè)類說明:

    1 JmsTemplate 用于發(fā)送和接受消息。需要消息工廠參數(shù)。

    基于監(jiān)聽:

    2 DefaultMessageListenerContainerSimpleMessageListenerContainer這兩個(gè)容器可以創(chuàng)建多個(gè)session和消費(fèi)者來對(duì)每個(gè)隊(duì)列進(jìn)行消息處理并條用消息監(jiān)聽類的方法進(jìn)行處理。并通過多線程進(jìn)行處理。每個(gè)線程通過輪訓(xùn)的方式(while(true))去進(jìn)行消息接收處理。

    3 MessageListenerAdapter 它相當(dāng)于一個(gè)特殊的自定義監(jiān)聽器,里面可以調(diào)用一些消息格式的裝換工具。例如jsonxmlstringbean的轉(zhuǎn)換等等。

     

    其中DefaultMessageListenerContainer接受動(dòng)態(tài)添加session。而后者不支持。

    ----

    具體消息監(jiān)聽處理源碼過程如下:以DefaultMessageListenerContainer為例:

    ----

    1 初始化建立消費(fèi)者線程:

    /**

         * Creates the specified number of concurrent consumers,

         * in the form of a JMS Session plus associated MessageConsumer.

         * @see #createListenerConsumer

         */

        protected void doInitialize() throws JMSException {

            establishSharedConnection();

            initializeConsumers();

        }

    protected void initializeConsumers() throws JMSException {

            // Register Sessions and MessageConsumers.

            synchronized (this.consumersMonitor) {

                if (this.consumers == null) {

                    this.sessions = new HashSet<Session>(this.concurrentConsumers);

                    this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);

                    Connection con = getSharedConnection();

                    for (int i = 0; i < this.concurrentConsumers; i++) {

                        Session session = createSession(con);

                        MessageConsumer consumer = createListenerConsumer(session);

                        this.sessions.add(session);

                        this.consumers.add(consumer);

                    }

                }

            }

        }

    2 啟動(dòng)消費(fèi)者線程

    /**

         * Start the shared Connection, if any, and notify all invoker tasks.

         * @throws JMSException if thrown by JMS API methods

         * @see #startSharedConnection

         */

        protected void doStart() throws JMSException {

            // Lazily establish a shared Connection, if necessary.

            if (sharedConnectionEnabled()) {

                establishSharedConnection();

            }

     

            // Reschedule paused tasks, if any.

            synchronized (this.lifecycleMonitor) {

                this.running = true;

                this.lifecycleMonitor.notifyAll();

                resumePausedTasks();

            }

     

            // Start the shared Connection, if any.

            if (sharedConnectionEnabled()) {

                startSharedConnection();

            }

        }

    3消息消費(fèi)

    其中的resumePausedTasks方法會(huì)進(jìn)行輪訓(xùn)處理:

    protected void resumePausedTasks() {

            synchronized (this.lifecycleMonitor) {

                if (!this.pausedTasks.isEmpty()) {//所有的線程一開始會(huì)被置為暫停

                    for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {

                        Object task = it.next();

                        try {

                            doRescheduleTask(task);

                            it.remove();

                            if (logger.isDebugEnabled()) {

                                logger.debug("Resumed paused task: " + task);

                            }

                        }

                        catch (RuntimeException ex) {

                            logRejectedTask(task, ex);

                            // Keep the task in paused mode...

                        }

                    }

                }

            }

    每個(gè)線程是一個(gè) AsyncMessageListenerInvoker,通過它的run方法來進(jìn)行消息處理:

    先執(zhí)行:

        private boolean invokeListener() throws JMSException {

                initResourcesIfNecessary();

                boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);

                this.lastMessageSucceeded = true;

                return messageReceived;

            }

    接著:

    Message message = receiveMessage(consumerToUse);

     

    這就是最終的阻塞方法去獲取消息

    /**

         * Receive a message from the given consumer.

         * @param consumer the MessageConsumer to use

         * @return the Message, or <code>null</code> if none

         * @throws JMSException if thrown by JMS methods

         */

        protected Message receiveMessage(MessageConsumer consumer) throws JMSException {

            return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));

        }

    4調(diào)用我們自定義的監(jiān)聽器:

    其中監(jiān)聽器可以實(shí)現(xiàn)SessionAwareMessageListenerMessageListener接口,不一樣的地方在于前者會(huì)把sessionmessage參數(shù)給你,你可以進(jìn)行一個(gè)回調(diào)操作。后者只有消息內(nèi)容message參數(shù)。

    protected void invokeListener(Session session, Message message) throws JMSException {

            Object listener = getMessageListener();

            if (listener instanceof SessionAwareMessageListener) {

                doInvokeListener((SessionAwareMessageListener) listener, session, message);

            }

            else if (listener instanceof MessageListener) {

                doInvokeListener((MessageListener) listener, message);

            }

            else if (listener != null) {

                throw new IllegalArgumentException(

                        "Only MessageListener and SessionAwareMessageListener supported: " + listener);

            }

            else {

                throw new IllegalStateException("No message listener specified - see property 'messageListener'");

            }

        }

    主站蜘蛛池模板: 免费看内射乌克兰女| 亚洲中文字幕无码av永久| jizz中国免费| 亚洲日本在线观看视频| 在线播放免费人成视频网站| 亚洲国产成人精品无码久久久久久综合| 国产精品亚洲lv粉色| 国产一区二区三区无码免费| 国产成人综合亚洲| 亚洲男女内射在线播放| 东北美女野外bbwbbw免费| 亚洲AV无码久久寂寞少妇| 99视频在线精品免费| 亚洲乱码在线视频| 国产精品二区三区免费播放心 | 日韩在线视频免费| 久久综合亚洲色HEZYO国产| 国产成人免费ā片在线观看老同学 | 国产免费网站看v片在线| 亚洲一区二区成人| 99精品国产免费久久久久久下载 | 久草福利资源网站免费| 亚洲av综合av一区| 永久免费AV无码国产网站| 色五月五月丁香亚洲综合网| 亚洲毛片网址在线观看中文字幕| 九九热久久免费视频| 亚洲网站在线免费观看| 日韩视频免费在线| 91在线免费视频| 亚洲国产精品久久丫| 免费在线观看亚洲| 日韩人妻一区二区三区免费| 亚洲小说图区综合在线| 精品国产日韩亚洲一区| 69式互添免费视频| 五级黄18以上免费看| 亚洲最新在线视频| 亚洲精品国产成人影院| h片在线免费观看| 一区二区三区免费在线观看|