1 DestinationResolver
??? DestinationResolver接口的作用是將指定的目的地名解析為目的地實例。其定義如下:
-
public
?
interface
?DestinationResolver?{??
-
????Destination?resolveDestinationName(Session?session,?String?destinationName,???
-
????????boolean?pubSubDomain)?throws?JMSException;??
-
}??
??? 參數(shù)pubSubDomain用于指定是使用“發(fā)布/訂閱”模式(解析后的目的地是Topic),還是使用“點對點”模式(解析后的目的地是Queue)。
?
??? CachingDestinationResolver接口繼承了DestinationResolver,增加了緩存的功能,其接口定義如下:
-
public
?
interface
?CachingDestinationResolver?
extends
?DestinationResolver?{??
-
????void?removeFromCache(String?destinationName);??
-
????void?clearCache();??
-
}??
??? 在目的地失效的時候,removeFromCache方法會被調(diào)用;在JMS provider失效的時候,clearCache方法會被調(diào)用。
1.1 DynamicDestinationResolver
??? DynamicDestinationResolver實現(xiàn)了DestinationResolver接口。根據(jù)指定的目的地名,DynamicDestinationResolver會動態(tài)創(chuàng)建目的地實例。針對JMS1.1規(guī)范,它采用如下方法創(chuàng)建目的地:
-
session.createTopic(topicName)??
-
session.createQueue(queueName);??
1.2 JndiDestinationResolver
??? JndiDestinationResolver繼承自JndiLocatorSupport,
同時實現(xiàn)了CachingDestinationResolver接口。如果在JMS
provider中配置了靜態(tài)目的地,那么JndiDestinationResolver通過JNDI查找的方式獲得目的地實例。
???
JndiDestinationResolver的fallbackToDynamicDestination屬性用于指定在JNDI查找失敗后,是否使
用動態(tài)目的地,默認值是false。JndiDestinationResolver的cache屬性用于指定是否對目的地實例進行緩存,默認值是
true。
?
1.3 BeanFactoryDestinationResolver
??? BeanFactoryDestinationResolver實現(xiàn)了DestinationResolver接口和BeanFactoryAware接口。它會根據(jù)指定的目的地名從BeanFactory中查找目的地實例。以下是相關(guān)的代碼:
-
public
?Destination?resolveDestinationName(Session?session,?String?destinationName,???
-
boolean
?pubSubDomain)?
throws
?JMSException?{??
-
????Assert.state(this.beanFactory?!=?null,?"BeanFactory?is?required");??
-
????try?{??
-
????????return?(Destination)?this.beanFactory.getBean(destinationName,?Destination.class);??
-
????}??
-
????catch?(BeansException?ex)?{??
-
????????throw?new?DestinationResolutionException(??
-
????????????"Failed?to?look?up?Destinaton?bean?with?name?'"?+?destinationName?+?"'",?ex);??
-
????}??
-
}??
?
2 JmsAccessor
???
抽象類JmsAccessor是JmsTemplate、SimpleMessageListenerContainer和
DefaultMessageListenerContainer等concrete
class的基類。JmsAccessor定義了如下幾個用于訪問JMS服務的共通屬性。
-
private
?ConnectionFactory?connectionFactory;??
-
private
?
boolean
?sessionTransacted?=?
false
;??
-
private
?
int
?sessionAcknowledgeMode?=?Session.AUTO_ACKNOWLEDGE;??
??? JmsAccessor提供了創(chuàng)建Connection和Session的方法,如下:
-
protected
?Connection?createConnection()?
throws
?JMSException?{??
-
????return?getConnectionFactory().createConnection();??
-
}??
-
??
-
protected
?Session?createSession(Connection?con)?
throws
?JMSException?{??
-
????return?con.createSession(isSessionTransacted(),?getSessionAcknowledgeMode());??
-
}??
?
2.1 JmsDestinationAccessor
???
抽象類JmsDestinationAccessor繼承自JmsAccessor,增加了destinationResolver和
pubSubDomain屬性。destinationResolver的默認值是DynamicDestinationResolver的實例,也就是
說默認采用動態(tài)目的地解析的方式;pubSubDomain用于指定是使用“發(fā)布/訂閱”模式還是使用“點對點”模式,默認值是false。
??? JmsDestinationAccessor提供了用于解析目的地的方法,如下:
-
protected
?Destination?resolveDestinationName(Session?session,?String?destinationName)???
-
throws
?JMSException?{??
-
????return?getDestinationResolver().resolveDestinationName(session,?destinationName,???
-
????????isPubSubDomain());??
-
}??
2.2 AbstractJmsListeningContainer
??? AbstractJmsListeningContainer繼承自JmsDestinationAccessor,作為所有Message
Listener Container的公共基類。它主要提供了JMS
connection的生命周期管理的功能,但是沒有對消息接收的方式(主動接收方式或者異步接收方式)等做任何假定。該類主要的屬性如下:
-
private
?String?clientId;??
-
private
?Connection?sharedConnection;??
??? clientId通常用于持久訂閱;sharedConnection保存了被共享的JMS connection。
?
??? 該類定義了如下的抽象方法,以便子類可以決定是否使用共享的JMS connection。
-
protected
?
abstract
?
boolean
?sharedConnectionEnabled();??
?
2.3 AbstractMessageListenerContainer
??? AbstractMessageListenerContainer繼承自AbstractJmsListeningContainer,也是作為所有Message Listener Container的公共基類。該類主要的屬性如下:
-
private
?
volatile
?Object?destination;??
-
private
?
volatile
?Object?messageListener;??
-
private
?
boolean
?exposeListenerSession?=?
true
;??
??? destination用于指定接收消息的目的地。
???
messageListener用于指定處理消息的listener。對于messageListener,它既可以是符合JMS規(guī)范的
javax.jms.MessageListener,也可以是Spring特有的
org.springframework.jms.listener.SessionAwareMessageListener。
SessionAwareMessageListener的定義如下:
-
public
?
interface
?SessionAwareMessageListener?{??
-
????void?onMessage(Message?message,?Session?session)?throws?JMSException;??
-
}??
??? 跟javax.jms.MessageListener相比,這個接口的onMessage方法增加了一個Session 類型的參數(shù),可以通過這個session發(fā)送回復消息(reply message)。
??? 如果使用了SessionAwareMessageListener 類型的message
listener,那么exposeListenerSession參數(shù)指定了傳入onMessage方法的session參數(shù)是否是創(chuàng)建了
MessageConsumer的session,默認值是true。如果是false,那么
AbstractMessageListenerContainer會在connection上新建一個session,并傳入onMessage方法。
2.4 AbstractPollingMessageListenerContainer
??? AbstractPollingMessageListenerContainer繼承自AbstractMessageListenerContainer,它提供了對于主動接收消息(polling)的支持,以及支持外部的事務管理。
-
private
?
boolean
?pubSubNoLocal?=?
false
;??
-
private
?
long
?receiveTimeout?=?DEFAULT_RECEIVE_TIMEOUT;??
-
private
?PlatformTransactionManager?transactionManager;??
??? 如果使用“發(fā)布/訂閱”模式,那么pubSubNoLocal 屬性指定通過某個連接發(fā)送到某個Topic的消息,是否應該被投遞回這個連接。
??? receiveTimeout屬性用于指定調(diào)用MessageConsumer的receive方法時的超時時間,默認值是1秒。需要注意的是,這個值應該比transactionManager 中指定的事務超時時間略小。
???
通常情況下,應該為transactionManager設置一個
org.springframework.transaction.jta.JtaTransactionManager的實例,此外也要設置一個支持
XA的ConnectionFactory。需要注意的是,XA 事務對性能有較大的影響。
??? 如果只是希望使用local JMS
transaction,那么只要設置sessionTransacted為true或者使用JmsTransactionManager即可。實際上,
如果設置了非JTA的transactionManager,那么sessionTransacted屬性會自動被設置成true。
??? 由于local JMS transaction無法同其它local transaction(例如local database
transaction)進行協(xié)調(diào),因此客戶端程序可能需要對重發(fā)的消息進行檢查。JMS規(guī)范要求:JMS
provider應該將重發(fā)消息的JMSRedelivered屬性設置為true。
2.5 SimpleMessageListenerContainer
???
SimpleMessageListenerContainer繼承自AbstractMessageListenerContainer,使用異步方式
接收消息(也就是通過MessageConsumer上注冊MessageListener的方式接收消息)。該類主要的屬性如下:
-
private
?
boolean
?pubSubNoLocal?=?
false
;??
-
private
?
int
?concurrentConsumers?=?
1
;??
-
private
?Set?sessions;??
-
private
?Set?consumers;??
-
private
?TaskExecutor?taskExecutor;??
??? 如果使用“發(fā)布/訂閱”模式,那么pubSubNoLocal 屬性指定通過某個連接發(fā)送到某個Topic的消息,是否應該被投遞回這個連接。
?
???
SimpleMessageListenerContainer允許創(chuàng)建多個Session和MessageConsumer來接收消息。具體的個數(shù)由
concurrentConsumers屬性指定。需要注意的是,應該只是在Destination為Queue的時候才使用多個
MessageConsumer(Queue中的一個消息只能被一個Consumer接收),雖然使用多個MessageConsumer會提高消息處理
的性能,但是消息處理的順序卻得不到保證:消息被接收的順序仍然是消息發(fā)送時的順序,但是由于消息可能會被并發(fā)處理,因此消息處理的順序可能和消息發(fā)送的
順序不同。此外,不應該在Destination為Topic的時候使用多個MessageConsumer,這是因為多個
MessageConsumer會接收到同樣的消息。
??? SimpleMessageListenerContainer創(chuàng)建的Session和MessageConsumer分別保存在sessions和consumers屬性中。
???
taskExecutor屬性的默認值是null,也就是說,對MessageListener(或者
SessionAwareMessageListener)的回調(diào)是在MessageConsumer的內(nèi)部線程中執(zhí)行。如果指定了
taskExecutor,那么回調(diào)是在TaskExecutor內(nèi)部的線程中執(zhí)行。以下是相關(guān)的代碼:
-
protected
?MessageConsumer?createListenerConsumer(
final
?Session?session)???
-
throws
?JMSException?{??
-
????Destination?destination?=?getDestination();??
-
????if?(destination?==?null)?{??
-
????????destination?=?resolveDestinationName(session,?getDestinationName());??
-
????}??
-
????MessageConsumer?consumer?=?createConsumer(session,?destination);??
-
??
-
????if?(this.taskExecutor?!=?null)?{??
-
????????consumer.setMessageListener(new?MessageListener()?{??
-
????????????public?void?onMessage(final?Message?message)?{??
-
????????????????taskExecutor.execute(new?Runnable()?{??
-
????????????????????public?void?run()?{??
-
????????????????????????processMessage(message,?session);??
-
????????????????????}??
-
????????????????});??
-
????????????}??
-
????????});??
-
????}??
-
????else?{??
-
????????consumer.setMessageListener(new?MessageListener()?{??
-
????????????public?void?onMessage(Message?message)?{??
-
????????????????processMessage(message,?session);??
-
????????????}??
-
????????});??
-
????}??
-
??
-
????return?consumer;??
-
}??
???
需要注意的是,如果指定了taskExecutor,那么消息在被taskExecutor內(nèi)部的線程處理前,可能已經(jīng)被確認過了(外層的
onMessage方法可能已經(jīng)執(zhí)行結(jié)束了)。因此如果使用事務Session或者Session.CLIENT_ACKNOWLEDGE類型的確認模
式,那么可能會導致問題。
?
???
該類的sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定義)總是返回true,
因此SimpleMessageListenerContainer會使用共享的JMS connection。
2.6 DefaultMessageListenerContainer
???
DefaultMessageListenerContainer繼承自
AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是通過循環(huán)調(diào)用
MessageConsumer.receive的方式接收消息)。該類主要的屬性如下:
-
private
?
int
?concurrentConsumers?=?
1
;??
-
private
?
int
?maxConcurrentConsumers?=?
1
;??
-
private
?
int
?maxMessagesPerTask?=?Integer.MIN_VALUE;??
-
private
?
int
?idleTaskExecutionLimit?=?
1
;??
-
private
?
final
?Set?scheduledInvokers?=?
new
?HashSet();??
-
private
?TaskExecutor?taskExecutor;??
-
private
?
int
?cacheLevel?=?CACHE_AUTO;??
???
跟SimpleMessageListenerContainer一樣,DefaultMessageListenerContainer也支持創(chuàng)建多個
Session和MessageConsumer來接收消息。跟SimpleMessageListenerContainer不同的
是,DefaultMessageListenerContainer創(chuàng)建了concurrentConsumers所指定個數(shù)的
AsyncMessageListenerInvoker(實現(xiàn)了SchedulingAwareRunnable接口),并交給
taskExecutor運行。
???
maxMessagesPerTask屬性的默認值是Integer.MIN_VALUE,但是如果設置的taskExecutor(默認值是
SimpleAsyncTaskExecutor)實現(xiàn)了SchedulingTaskExecutor接口并且其
prefersShortLivedTasks方法返回true(也就是說該TaskExecutor傾向于短期任務),那么
maxMessagesPerTask屬性會自動被設置為10。
???
如果maxMessagesPerTask屬性的值小于0,那么AsyncMessageListenerInvoker.run方法會在循環(huán)中反復嘗試
接收消息,并在接收到消息后調(diào)用MessageListener(或者SessionAwareMessageListener);如果
maxMessagesPerTask屬性的值不小于0,那么AsyncMessageListenerInvoker.run方法里最多會嘗試接收消息
maxMessagesPerTask次,每次接收消息的超時時間由其父類
AbstractPollingMessageListenerContainer的receiveTimeout屬性指定。如果在這些嘗試中都沒有接收
到消息,那么AsyncMessageListenerInvoker的idleTaskExecutionCount屬性會被累加。在run方法執(zhí)行完
畢前會對idleTaskExecutionCount進行檢查,如果該值超過了
DefaultMessageListenerContainer.idleTaskExecutionLimit(默認值1),那么這個
AsyncMessageListenerInvoker可能會被銷毀。
???
所有AsyncMessageListenerInvoker實例都保存在scheduledInvokers中,實例的個數(shù)可以在
concurrentConsumers和maxConcurrentConsumers之間浮動。跟
SimpleMessageListenerContainer一樣,應該只是在Destination為Queue的時候才使用多個
AsyncMessageListenerInvoker實例。
?
??? cacheLevel屬性用于指定是否對JMS資源進行緩存,可選的值是CACHE_NONE = 0、CACHE_CONNECTION
= 1、CACHE_SESSION = 2、CACHE_CONSUMER = 3和CACHE_AUTO =
4。默認情況下,如果transactionManager屬性不為null,那么cacheLevel被自動設置為CACHE_NONE(不進行緩
存),否則cacheLevel被自動設置為CACHE_CONSUMER。
??? 如果cacheLevel屬性值大于等于CACHE_CONNECTION,那么sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定義)返回true,也就是說使用共享的JMS連接。
?
?
3 SingleConnectionFactory
???
SingleConnectionFactory實現(xiàn)了ConnectionFactory接口,其createConnection方法總是返回相同的
Connection??梢栽赟ingleConnectionFactory的構(gòu)造函數(shù)中傳入Connection對象或者
ConnectionFactory對象,用來創(chuàng)建被代理的連接對象。
SingleConnectionFactory.createConnection方法返回的連接是個代理,它忽略了對stop和close方法的調(diào)用
(連接會在SingleConnectionFactory.destroy方法中關(guān)閉)。
??? SingleConnectionFactory的reconnectOnException屬性用來指定是否在連接拋出JMSException的時候,對連接進行重置,重置后如果再調(diào)用createConnection方法,那么會返回一個新的連接。
???
需要注意的是,AbstractJmsListeningContainer類的抽象方法sharedConnectionEnabled指定了是否在
message listener container內(nèi)部使用共享的JMS連接。因此通常情況下不需要為單獨的message listener
container設置SingleConnectionFactory(及其子類);如果希望在不同的message listener
container之間共享JMS連接,那么可以考慮使用SingleConnectionFactory。
3.1 CachingConnectionFactory
??? CachingConnectionFactory繼承自SingleConnectionFactory,增加了對Session和MessageProducer緩存的功能。該類主要的屬性如下:
-
private
?
int
?sessionCacheSize?=?
1
;??
-
private
?
boolean
?cacheProducers?=?
true
;??
??? sessionCacheSize屬性指定了被緩存的Session實例的個數(shù)(默認值是1),也就是說,如果同時請求的Session個數(shù)大于sessionCacheSize,那么這些Session不會被緩存,而是正常的被創(chuàng)建和銷毀。
??? cacheProducers屬性指定了是否對MessageProducer進行緩存,默認值是true。