主要的幾個(gè)類說明:
1 JmsTemplate 用于發(fā)送和接受消息。需要消息工廠參數(shù)。
基于監(jiān)聽:
2 DefaultMessageListenerContainer,SimpleMessageListenerContainer這兩個(gè)容器可以創(chuàng)建多個(gè)session和消費(fèi)者來對(duì)每個(gè)隊(duì)列進(jìn)行消息處理并條用消息監(jiān)聽類的方法進(jìn)行處理。并通過多線程進(jìn)行處理。每個(gè)線程通過輪訓(xùn)的方式(while(true))去進(jìn)行消息接收處理。
3 MessageListenerAdapter 它相當(dāng)于一個(gè)特殊的自定義監(jiān)聽器,里面可以調(diào)用一些消息格式的裝換工具。例如json,xml,string到bean的轉(zhuǎn)換等等。
其中DefaultMessageListenerContainer接受動(dòng)態(tài)添加session。而后者不支持。
----
具體消息監(jiān)聽處理源碼過程如下:以DefaultMessageListenerContainer為例:
----
1 初始化建立消費(fèi)者線程:
/**
* Creates the specified number of concurrent consumers,
* in the form of a JMS Session plus associated MessageConsumer.
* @see #createListenerConsumer
*/
protected void doInitialize() throws JMSException {
establishSharedConnection();
initializeConsumers();
}
protected void initializeConsumers() throws JMSException {
// Register Sessions and MessageConsumers.
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.sessions = new HashSet<Session>(this.concurrentConsumers);
this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);
Connection con = getSharedConnection();
for (int i = 0; i < this.concurrentConsumers; i++) {
Session session = createSession(con);
MessageConsumer consumer = createListenerConsumer(session);
this.sessions.add(session);
this.consumers.add(consumer);
}
}
}
}
2 啟動(dòng)消費(fèi)者線程
/**
* Start the shared Connection, if any, and notify all invoker tasks.
* @throws JMSException if thrown by JMS API methods
* @see #startSharedConnection
*/
protected void doStart() throws JMSException {
// Lazily establish a shared Connection, if necessary.
if (sharedConnectionEnabled()) {
establishSharedConnection();
}
// Reschedule paused tasks, if any.
synchronized (this.lifecycleMonitor) {
this.running = true;
this.lifecycleMonitor.notifyAll();
resumePausedTasks();
}
// Start the shared Connection, if any.
if (sharedConnectionEnabled()) {
startSharedConnection();
}
}
3消息消費(fèi)
其中的resumePausedTasks方法會(huì)進(jìn)行輪訓(xùn)處理:
protected void resumePausedTasks() {
synchronized (this.lifecycleMonitor) {
if (!this.pausedTasks.isEmpty()) {//所有的線程一開始會(huì)被置為暫停
for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {
Object task = it.next();
try {
doRescheduleTask(task);
it.remove();
if (logger.isDebugEnabled()) {
logger.debug("Resumed paused task: " + task);
}
}
catch (RuntimeException ex) {
logRejectedTask(task, ex);
// Keep the task in paused mode...
}
}
}
}
每個(gè)線程是一個(gè) AsyncMessageListenerInvoker,通過它的run方法來進(jìn)行消息處理:
先執(zhí)行:
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
}
接著:
Message message = receiveMessage(consumerToUse);
這就是最終的阻塞方法去獲取消息:
/**
* Receive a message from the given consumer.
* @param consumer the MessageConsumer to use
* @return the Message, or <code>null</code> if none
* @throws JMSException if thrown by JMS methods
*/
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
}
4調(diào)用我們自定義的監(jiān)聽器:
其中監(jiān)聽器可以實(shí)現(xiàn)SessionAwareMessageListener和MessageListener接口,不一樣的地方在于前者會(huì)把session,message參數(shù)給你,你可以進(jìn)行一個(gè)回調(diào)操作。后者只有消息內(nèi)容message參數(shù)。
protected void invokeListener(Session session, Message message) throws JMSException {
Object listener = getMessageListener();
if (listener instanceof SessionAwareMessageListener) {
doInvokeListener((SessionAwareMessageListener) listener, session, message);
}
else if (listener instanceof MessageListener) {
doInvokeListener((MessageListener) listener, message);
}
else if (listener != null) {
throw new IllegalArgumentException(
"Only MessageListener and SessionAwareMessageListener supported: " + listener);
}
else {
throw new IllegalStateException("No message listener specified - see property 'messageListener'");
}
}