.概述
1.1 JMS與ActiveMQ特性
JMS始終在JavaEE五花八門的協議里,WebService滿天飛的時候占一位置,是因為:
- 它可以把不影響用戶執行結果又比較耗時的任務(比如發郵件通知管理員)異步的扔給JMS 服務端去做,而盡快的把屏幕返還給用戶。
- 服務端能夠多線程排隊響應高并發的請求,并保證請求不丟失。
- 可以在Java世界里達到最高的解耦。客戶端與服務端無需直連,甚至無需知曉對方是誰、在哪里、有多少人,只要對流過的信息作響應就行了,在企業應用環境復雜時作用明顯。
ActiveMQ
的特性:
- 完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,也是Apache Geronimo默認的JMS provider。
- POJO withdout EJB Container,不需要實現EJB繁瑣復雜的Message Bean接口和配置。
- Spring Base,可以使用Spring的各種特性如IOC、AOP 。
- Effective,基于Jencks的JCA Container實現 pool connection,control transactions and manage security。
1.2 SpringSide 的完全POJO的JMS方案
SpringSide 2.0
在BookStore示例中,演示了用戶下訂單時,將發通知信到用戶郵箱的動作,通過JMS交給JMS服務端異步完成,避免了郵件服務器的堵塞而影響用戶的下訂。
全部代碼于examples\bookstore\src\java\org\springside\bookstore\components\activemq 目錄中。
一個JMS場景通常需要三者參與:
- 一個POJO的的Message Producer,負責使用Spring的JMS Template發送消息。
- 一個Message Converter,負責把Java對象如訂單(Order)轉化為消息,使得Producer能夠直接發送POJO。
- 一個MDP Message Consumer,負責接收并處理消息。
SpringSide 2.0
采用了ActiveMQ 4.1-incubator 與Spring 2.0 集成,對比SS1.0M3,有三個值得留意的地方,使得代碼中幾乎不見一絲JMS的侵入代碼:
- 采用Spring2.0的Schema式簡化配置。
- 實現Message Converter轉化消息與對象,使得Producer能夠直接發送POJO而不是JMS Message。
- 使用了Spring2.0的DefaultMessageListenerContainer與MessageListenerAdapter,消息接收者不用實現MessageListener 接口。
- 同時,Spring 2.0 的DefaultMessageListenerContainer 代替了SS1.0M3中的Jenck(JCA Container)
,充當MDP Container的角色。
2.引入ActiveMQ的XSD
ActiveMQ4.1 響應Spring 2.0號召,支持了引入XML Schema namespace的簡單配置語法,簡化了配置的語句。
在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ的XML Scheam 配置文件),如下:
<beans
xmlns="http:
xmlns:amq="http:
xmlns:xsi="http:
xsi:schemaLocation="http: http:
由于ActiveMQ4.1 SnapShot的那個XSD有部分錯誤,因此使用的是自行修改過的XSD。
先在ClassPath根目錄放一個修改過的activemq-core-4.1-incubator-SNAPSHOT.xsd。
在ClassPath 下面建立META-INF\spring.schemas 內容如下。這個spring.schemas是spring自定義scheam的配置文件,請注意"http:\://"部分寫法
3. 配置方案
3.1 基礎零件
1. 配置ActiveMQ Broker
暫時采用在JVM中嵌入這種最簡單的模式, 當spring初始化時候,ActiveMQ embedded Broker 就會啟動了。
<!-- lets create an embedded ActiveMQ Broker -->
<amq:broker useJmx="false" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp:/>
</amq:transportConnectors>
</amq:broker>
2. 配置(A)ConnectionFactory
由于前面配置的Broker是JVM embedded 所以URL為:vm://localhost
<!-- ActiveMQ connectionFactory to use -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm:/>
3 配置(B)Queue
<!-- ActiveMQ destinations to use -->
<amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/>
4. 配置(C)Converter
配置Conveter,使得Producer能夠直接發送Order對象,而不是JMS的Message對象。
<!-- OrderMessage converter -->
<bean id="orderMessageConverter" class="org.springside.bookstore.components.activemq.OrderMessageConverter"/>
3.2 發送端
1 配置JmsTemplate
Spring提供的Template,綁定了(A)ConnectionFactory與(C)Converter。
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean>
2.Producer
消息發送者,使用JmsTemplate發送消息,綁定了JmsTemplate (含A、C)與(B)Queue。
<!-- POJO which send Message uses Spring JmsTemplate,綁定JMSTemplate 與Queue -->
<bean id="orderMessageProducer" class="org.springside.bookstore.components.activemq.OrderMessageProducer">
<property name="template" ref="jmsTemplate"/>
<property name="destination" ref="destination"/>
</bean>
3.3 接收端
1.接收處理者(MDP)
使用Spring的MessageListenerAdapter,指定負責處理消息的POJO及其方法名,綁定(C)Converter。
<!-- Message Driven POJO (MDP),綁定Converter -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="org.springside.bookstore.components.activemq.OrderMessageConsumer">
<property name="mailService" ref="mailService"/>
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="sendEmail"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean>
2. listenerContainer
負責調度MDP, 綁定(A) connectionFactory, (B)Queue和MDP。
<!-- this is the attendant message listener container -->
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
互相綁定的關系有點暈,發送端和接收端都以不同形式綁定了(A) connectionFactory, (B)Queue和 (C)Converter。
4. 下篇
1. 說明
請先閱讀ActiveMQ4.1 +Spring2.0的POJO JMS方案(上)
本篇將補充說明了:
1) 使用數據庫持久化消息,保證服務器重啟時消息不會丟失
2) 使用Jencks作正宗的JCA Container。
2.持久化消息
2.1 給Broker加入Persistence 配置
在配置文件applicationContext-activemq-embedded-persitence.xml中的<amq:broker>節點加入
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#hsql-ds" createTablesOnStartup="true" useDatabaseLock="false"/>
</amq:persistenceAdapter>
請注意MSSQL(2000/2005)和HSQL由于不支持[SELECT * ACTIVEMQ_LOCK FOR UPDATE ]語法,因此不能使用默認的userDatabaseLock="true",只能設置成useDatabaseLock="false"
2.2 配置多種數據源
配置多種數據源,給jdbcPersistenceAdapter使用,SpringSide 中使用的內嵌HSQL
<!-- The HSQL Datasource that will be used by the Broker -->
<bean id="hsql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
<property name="url" value="jdbc:hsqldb:res:hsql/activemq"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
<property name="poolPreparedStatements" value="true"/>
</bean>
2. 3 說明
筆者僅僅使用了jdbcPersistenceAdapter,其實在ActiveMQ的XSD已經描述了多種PersistenceAdapter,可以參考對應的XSD文件.
另外對于數據庫的差異主要表現在設置了userDatabaseLock="true"之后,ActiveMQ使用的[SELECT * ACTIVEMQ_LOCK FOR UPDATE] 上面,會導致一些數據庫出錯(測試中MSSQL2000/2005,HSQL都會導致出錯)。另外HSQL的腳本請參見activemq.script。
3. Jenck(JCA Container)
Spring 2.0本身使用DefaultMessageListenerContainer 可以充當MDP中的Container角色,但是鑒于Jencks是JCA標準的,它不僅僅能夠提供jms的jca整合,包括其他資源比如jdbc都可以做到jca管理
所以,同時完成了這個ActiveMQ+Spring+Jencks 配置演示,更多的針對生產系統的JCA特性展示,會在稍后的開發計劃討論中確定。
此文檔適用于說明使用 Jecncks 和 使用Spring 2.0(DefaultMessageListenerContainer) 充當MDP Container時的區別,同時演示Jecnks 的Spring 2.0 新配置實例。
3.1 引入ActiveMQ ResourceAdapter 和Jencks 的XSD
在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ ResourceAdapter 和Jencks 的XML Scheam 配置文件),如下:
ActiveMQ4.1 響應Spring 2.0號召,支持了引入XML Schema namespace的簡單配置語法,簡化了配置的語句。
在ApplicationContext.xml(Spring的配置文件)中引入ActiveMQ的XML Scheam 配置文件),如下:
<beans
xmlns="http: xmlns:amq="http://activemq.org/config/1.0" xmlns:ampra="http://activemq.org/ra/1.0" xmlns:jencks="http://jencks.org/1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http: http: http: http:
由于ActiveMQ RA和Jencks 那個XSD 仍然有部分錯誤,因此使用的是自行修改過的XSD。(是xs:any元素引起的錯誤)
先在ClassPath根目錄放一個修改過的activemq-ra-4.1-incubator-SNAPSHOT.xsd和jencks-1.3.xsd。
同樣修改 ClassPath 下面META-INF\spring.schemas 增加內容如下。這個spring.schemas是spring自定義scheam的配置文件,請注意"http:\://"部分寫法
3.2 配置方案
3.2.1 基礎零件
1. 配置ActiveMQ Broker 參見 ActiveMQ+Spring
2. 配置ActiveMQ Resource Adapter
<amqra:managedConnectionFactory id="jmsManagedConnectionFactory" resourceAdapter="#resourceAdapter"/><amqra:resourceAdapter id="resourceAdapter" serverUrl="vm: />
3. 配置Jencks 基礎配置
具體的配置可以參見Jencks的XSD
<!-- jencks PoolFactory config-->
<jencks:singlePoolFactory id="poolingSupport" maxSize="16" minSize="5" blockingTimeoutMilliseconds="60" idleTimeoutMinutes="60" matchOne="true" matchAll="true" selectOneAssumeMatch="true" /> <!-- jencks XATransactionFactory -->
<jencks:xATransactionFactory id="transactionSupport" useTransactionCaching="true" useThreadCaching="true" />
<!-- jencks ConnectionManagerFactory -->
<jencks:connectionManagerFactory id="connectionManager" containerManagedSecurity="false" poolingSupport="#poolingSupport" transactionSupport="#transactionSupport" /> <!-- jencks TransactionContextManagerFactory -->
<jencks:transactionContextManagerFactory id="transactionContextManagerFactory"/>
4. 配置給JmsTemplate使用的connectionFactory (主要是生成者/發送者 使用)
這里注意下,在配置jmsTemplate的使用的targetConnectionFactory就是使用jencks配置的connectionManager
<!-- spring config jms with jca-->
<bean id="jmsManagerConnectionFactory" class="org.springframework.jca.support.LocalConnectionFactoryBean">
<property name="managedConnectionFactory">
<ref local="jmsManagedConnectionFactory" />
</property>
<property name="connectionManager">
<ref local="connectionManager" />
</property>
</bean>
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsManagerConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="orderMessageConverter" />
</bean>
5. 配置Spring 2.0的MessageListenerAdapter,保證不需要用戶實現MessageListener
見ActiveMQ+Spring
6.配置Jecnks 充當MDP的Container
就是把上面的MessageListenerAdapter配置到Jencks里面,完成整個MDP的配置
<!-- Jencks Container-->
<jencks:jcaContainer> <jencks:bootstrapContext>
<jencks:bootstrapContextFactory threadPoolSize="25" />
</jencks:bootstrapContext>
<jencks:connectors>
<!-- use jencks container (use spring MessageListenerAdapter)-->
<jencks:connector ref="messageListener">
<jencks:activationSpec>
<amqra:activationSpec destination="org.apache.activemq.spring.Test.spring.embedded" destinationType="javax.jms.Queue" />
</jencks:activationSpec>
</jencks:connector> </jencks:connectors> <jencks:resourceAdapter>
<amqra:resourceAdapter serverUrl="vm: />
</jencks:resourceAdapter>
</jencks:jcaContainer>