JMS消息傳遞是一種異步過(guò)程,如果沒(méi)有接受者JMS Provider應(yīng)該選擇將消息持久化,策略主要有文件持久化和基于數(shù)據(jù)庫(kù)的持久化兩種,下文是關(guān)于將未消費(fèi)的消息持久化到MySql數(shù)據(jù)庫(kù)的。
首先,我們要做的是將MySql數(shù)據(jù)庫(kù)的驅(qū)動(dòng)包放置到ActiveMQ的安裝目錄下的lib里。
其次,我們需要改寫(xiě)activemq.xml文件,它在ActiveMQ的conf目錄中。具體修改部分請(qǐng)見(jiàn)下文中粗體部分:
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>

<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>

<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysqlDataSource"/>
</persistenceAdapter>
<!--
For better performances use VM cursor and small memory limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>

</broker>

<!-- MySql dataSource -->
<bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="hy"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

<!--
Uncomment to enable Camel
Take a look at activemq-camel.xml for more details
<import resource="camel.xml"/>
-->

<!--
Enable web consoles, REST and Ajax APIs and demos
Take a look at activemq-jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>

如上,在persistenceAdapter節(jié)點(diǎn)中指定mySql數(shù)據(jù)源,在broker節(jié)點(diǎn)外配置了bean:
<!-- MySql dataSource -->
<bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="hy"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
上面加粗的一句指明了消息存儲(chǔ)在mysql的test數(shù)據(jù)庫(kù)中,當(dāng)然你可以指定成任何你想要的數(shù)據(jù)庫(kù),注意在啟動(dòng)ActiveMq之前這個(gè)數(shù)據(jù)庫(kù)是要存在的,否則報(bào)錯(cuò)。另外需要贅述的一點(diǎn)是activemq.xml中不要有中文,看來(lái)白狗子對(duì)非英語(yǔ)的抵觸真是無(wú)處不在,我們所有中國(guó)人應(yīng)該奮發(fā)圖強(qiáng)恢復(fù)漢唐明榮光,讓異族不得不仰視我們,當(dāng)然這是后話。
下面就可以啟動(dòng)ActiveMQ了,向隊(duì)列發(fā)送幾條消息后,我們可以看看數(shù)據(jù)庫(kù)里發(fā)生了什么變化,如下:
上圖是ActiveMQ為了持久化消息而設(shè)立的三張表。

上圖是持久化到數(shù)據(jù)庫(kù)的未被消費(fèi)掉的消息。
如果這些待消費(fèi)消息被接收后,這張表就會(huì)空空如也,如下:
你可以使用
這個(gè)程序(注意包要自己導(dǎo)入一下)來(lái)測(cè)試一下。
以上就是將ActiveMQ中未消費(fèi)的消息持久化到MySql數(shù)據(jù)庫(kù)的過(guò)程,但愿對(duì)你有所幫助。