一.簡介ActiveMQ
ActiveMQ 是最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現
二.下載ActiveMQ
首先去http://activemq.apache.org/download.html
下載穩定版本4.1.0release
Download Here
Description ==Download Link ==PGP Signature file of download
Binary for Windows== apache-activemq-4.1.0-incubator.zip== incubator-activemq-4.1.0.zip.asc
Binary-for-Unix/Linux/Cygwin==apache-activemq-4.1.0-incubator.tar.gz==incubator-activemq-4.1.0.tar.gz.as
解壓后目錄如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)
+data (默認是空的)
+docs (index,replease版本里面沒有文檔,-.-b不知道為啥不帶)
+example (幾個例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
三.啟動ActiveMQ
可以使用bin\activemq.bat(activemq) 啟動,如果一切順利,你就會看見類似下面的信息(此處解壓到D盤根目錄下).幾個小提示
1. 這個僅僅是最基礎的ActiveMQ的配置,很多地方都沒有配置因此不要直接使用這個配置用于生產系統
2. 有的時候由于端口被占用,導致ActiveMQ錯誤,ActiveMQ可能需要以下端口1099(JMX),61616(默認的TransportConnector)
3. 如果沒有物理網卡,或者MS的LoopBackAdpater Multicast會報一個錯誤
四.監控ActiveMQ
啟動JDK自帶的java控制臺查看程序查看客戶端(如C:\jdk1.6.0_07\bin\jconsole.exe)
遠程進程:127.0.0.1:1099
五. 測試ActiveMQ
由于ActiveMQ是一個獨立的jms provider,所以我們不需要其他任何第三方服務器就可以馬上做我們的測試了.編譯example目錄下面的程序ProducerTool/ConsumerTool 是JMS參考里面提到的典型應用,Producer產生消息,Consumer消費消息,而且這個例子還可以加入參數幫助你測試剛才啟動的本地ActiveMQ或者是遠程的ActiveMQ
ProducerTool [url] broker的地址,默認的是tcp://localhost:61616
[true|flase] 是否使用topic,默認是false
[subject] subject的名字,默認是TOOL.DEFAULT
[durabl] 是否持久化消息,默認是false
[messagecount] 發送消息數量,默認是10
[messagesize] 消息長度,默認是255
[clientID] durable為true的時候,需要配置clientID
[timeToLive] 消息存活時間
[sleepTime] 發送消息中間的休眠時間
[transacte] 是否采用事務
ConsumerTool [url] broker的地址,默認的是tcp://localhost:61616
[true|flase] 是否使用topic,默認是false
[subject] subject的名字,默認是TOOL.DEFAULT
[durabl] 是否持久化消息,默認是false
[maxiumMessages] 接受最大消息數量,0表示不限制
[clientID] durable為true的時候,需要配置clientID
[transacte] 是否采用事務
[sleepTime] 接受消息中間的休眠時間,默認是0,onMeesage方法不休眠
[receiveTimeOut] 接受超時
六.使用應用程序發送點到點消息隊列
TestSender類
package test;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class TestSender {
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("conf/applicationContext.xml");
JmsTemplate template = (JmsTemplate) ctx.getBean("myJmsTemplate");
template.setDeliveryMode(DeliveryMode.PERSISTENT);
template.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = session.createTextMessage();
message.setStringProperty("name", "wangwu");
message.setStringProperty("password", "ww");
System.out.println("send success");
return message;
}
});
}
}
applicationContext.xml配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0
http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd
">
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
</property>
</bean>
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestinationName" value="Hello.Queue" />
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
</bean>
</beans>
七. 使用應用程序接受點到點消息隊列
ExampleListener類
package test;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ExampleListener implements SessionAwareMessageListener {
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage) {
System.out.println("TextMessage begin");
System.out.println("name = " + message.getStringProperty("name"));
System.out.println("password = " +message.getStringProperty("password"));
}
if (message instanceof ObjectMessage) {
System.out.println("ObjectMessage");
} else if (message instanceof TextMessage) {
System.out.println("TextMessage");
} else if (message instanceof StreamMessage) {
System.out.println("StreamMessage");
} else if (message instanceof MapMessage) {
System.out.println("MapMessage");
}
}
}
TestReceiver類
package test;
import javax.jms.JMSException;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class TestReceiver {
public static void main(String[] args) throws JMSException {
new FileSystemXmlApplicationContext("conf/context.xml");
}
}
context.xml配置文件.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0
http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd
">
<bean id="ExampleListener" class="test.ExampleListener" />
<bean id="destinationa" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>Hello.Queue</value>
</constructor-arg>
</bean>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
</property>
</bean>
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destinationa" />
<property name="messageListener" ref="ExampleListener" />
</bean>
</beans>
八. 使用WEB程序發送點到點消息隊列
TestWebSender類
package com.jl.material.adapter.impl;
import javax.jms.JMSException;
import com.jl.framework.jms.JmsManager;
import com.jl.framework.jms.JmsManagerFactory;
import com.jl.framework.jms.MessageCreateable;
import com.jl.framework.jms.MessageCreatorDefault;
import com.jl.framework.jms.util.JmsUtil;
import com.jl.material.util.MaterialConstants;
public class TestWebSender{
public void sender(String name, String password) {
MessageCreateable mc = new MessageCreatorDefault();
mc.setStringProperty("name", name);
mc.setStringProperty("password", password);
JmsManagerFactory jmsManagerFactory = new JmsManagerFactory();
JmsManager jmsTXManager = jmsManagerFactory.createJmsManager(MaterialConstants.MATERIAL_MODULE_NAME);
try {
jmsTXManager.send(JmsUtil.getDestinationFromConfig("quality_synVerifyBatch_queue_M2Q"), mc);
jmsTXManager.commit();
} catch (JMSException e) {
jmsTXManager.rollback();
throw new RuntimeException(e);
}
}
}
九. 使用WEB程序接受點到點消息隊列
TestWebReceiver類
package com.jl.material.adapter.impl;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import com.jl.framework.jms.util.support.JMSCallbackable;
public class TestWebReceiver implements JMSCallbackable {
public void mdCallback(Object TextMessage_textMessage) {}
public void logJMSMessageInfo(String arg0) {}
public void mdCallback(ObjectMessage arg0) throws Exception {}
public void mdCallback(TextMessage textMessage) throws Exception {
String name = textMessage.getStringProperty("name");
String password = textMessage.getStringProperty("password");
System.out.println("name = " + name);
System.out.println("password = " + password);
}
public void mdCallback(StreamMessage arg0) throws Exception {}
public void mdCallback(MapMessage arg0) throws Exception {}
}
test_JMS_Spring_Listener.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<bean id="test_testWebReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
<property name="jmsCallbackable">
<bean class="com.jl.material.adapter.impl.TestWebReceiver "/>
</property>
</bean>
<bean id="test_testWebReceiver _queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${quality_assayVerifyBatch_queue_Q2M}</value>
</constructor-arg>
</bean>
<bean id="test_listenerContainerA"
class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsRecieveFactory" />
<property name="destination" ref=" test_testWebReceiver _queue ">
<property name="messageListener" ref=" test_testWebReceiver " />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
</bean>
</beans>
jms.properties資源文件
jms.sendip=tcp://10.1.1.40:61616,tcp://activemq:61616
jms.listenip=tcp://10.1.1.40:61616,tcp://activemq:61616
quality_assayVerifyBatch_queue_Q2M=
Quality_Assay_VerifyBatch_Q2M.Queue
context.xml配置文件
<?xml version='1.0' encoding='utf-8'?>
<Context path="/identity" reloadable="false">
<Environment name="jms/jms.sendip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />
<Environment name="jms/jms.listenip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />
</Context>
十. 使用發布/訂閱方式
使用該方式的發布方基本與點到點方式一樣,區別只在隊列名的后綴從 .Queue 變成了 .Topic
區別主要在接收方配置文件
<bean id="pound_removeBindingInfoReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
<property name="jmsCallbackable">
<bean class="com.jl.pound.adapter.impl.RemoveBindingInfoReceiver">
<property name="initialInfomationService" ref="initialInfomationService" />
<property name="productPoundServiceFacade" ref="productPoundServiceFacade" />
</bean>
</property>
</bean>
<bean id="pound_removeBindingInfomation_topic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg><value>${CraftPound_RemoveBindingInfomation}</value></constructor-arg>
</bean>
<bean id="pound_listenerContainerB"
class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="jmsRecieveFactory" />
<property name="destination" ref="pound_removeBindingInfomation_topic" />
<property name="messageListener" ref="pound_removeBindingInfoReceiver" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
<property name="subscriptionDurable" value="true"></property>
<property name="pubSubDomain" value="true"></property>
<!-- <property name="clientId" value="100554"></property> -->
<property name="clientId" ref="generateClientIdentityCode1"></property>
</bean>
<bean id="generateClientIdentityCode1"
class="com.jl.pound.security.GenerateClientIdentityCode">
<property name="prefix" value="1"></property>
</bean>
import org.apache.log4j.Logger;
import org.springframework.beans.factory.FactoryBean;
import com.**.util.NetworkUtils;
public class GenerateClientIdentityCode implements FactoryBean {
/**
* LOGGER for sample classify delegate
*/
private static final Logger LOGGER = Logger.getLogger(GenerateClientIdentityCode.class);
/**
* prefix
*/
private String prefix;
/**
* @return the prefix
*/
public String getPrefix() {
return prefix;
}
/**
* @param prefix the prefix to set
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public String generateIdentityCode() {
String mac = NetworkUtils.getMACAddress();
mac = prefix + mac;
LOGGER.info("this client identity code is:" + mac);
return mac;
}
public Object getObject() throws Exception {
return generateIdentityCode();
}
public Class getObjectType() {
return String.class;
}
public boolean isSingleton() {
return false;
}
}
在JMS中,Topic實現publish和subscribe語義。一條消息被publish時,它將發到所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。
JMS Queue執行load balancer語義。一條消息僅能被一個consumer收到。如果在message發送的時候沒有可用的consumer,那么它將被保存一直到能處理該message的consumer可用。如果一個consumer收到一條message后卻不響應它,那么這條消息將被轉到另一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負載均衡
可以使用queue方式發送注冊郵件 好友動態數據等
<一>表說明:
當在啟動ActiveMQ時,先判斷表是否存在,如果不存在,將去創建表,如下:
(1)ACTIVEMQ_ACKS:持久訂閱者列表
1.CONTAINER:類型://主題
如:topic://basicInfo.topic
2.SUB_DEST:應該是描述,與1內容相同
3.CLIENT_ID:持久訂閱者的標志ID,必須唯一
4.SUB_NAME:持久訂閱者的名稱.(durableSubscriptionName)
5.SELECTOR:消息選擇器,consumer可以選擇自己想要的
6.LAST_ACKED_ID:最后一次確認ID,這個字段存的該該訂閱者最后一次收到的消息的ID
(2)ACTIVEMQ_LOCK:進行數據訪問的排斥鎖
1.ID:值為1
2.TIME:時間
3.BROKER_NAME:broker的名稱
這個表似為集群使用,但現在ActiveMQ并不能共享數據庫.
(3)ACTIVEMQ_MSGS:存儲Queue和Topic消息的表
1.ID:消息的ID
2.CONTAINER: 類型://主題
如:queue://my.queue
Topic://basicInfo.topic
3.MSGID_PROD:發送消息者的標志
MSGID_PROD =ID:[computerName][…..]
注意computerName,不要使用中文,消息對象中會存儲這個部分,解析connectID時會出現Bad String錯誤.
4.MSGID_SEQ:還不知用處
5.EXPIRATION:到期時間.
6.MSG:消息本身,Blob類型.
可以在JmsTemplate發送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直沒有被處理,消息會被刪除,但是表中會存在CONTAINER為queue://ActiveMQ.DLQ的記錄.也就是說,相當于將過期的消息發給了一個ActiveMQ自定義的刪除隊列..
<二>關于ActiveMQ的持久訂閱消息刪除操作
1.主題消息只有一條,所有訂閱了這個消息的持久訂閱者都要收到消息,只有所有訂閱者收到消息并確認(Acknowledge)之后.才會刪除.
說明:ActiveMQ支持批量(optimizeAcknowledge為true)確認,以提高性能
2.ActiveMQ執行刪除Topic消息的cleanup()操作的時間間隔為5 minutes..
柳德才
13691193654
18942949207
QQ:422157370
liudecai_zan@126.com湖北-武漢-江夏-廟山