使用Spring集成ActiveMQ時,可以使用如下配置
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>
<bean id="queueDestination"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="caojh" />
</bean>
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination"></property>
<property name="messageConverter" ref="voteMsgConverter"></property>
</bean>
<bean id="voteMsgConverter"
class="com.netease.queue.domain.VoteMsgConverter">
</bean>
然后使用如下的代碼發(fā)送message
public void templateSend(long id, String location) {
JmsTemplate template = (JmsTemplate) context.getBean("jmsTemplate");
Vote vote = new Vote();
vote.setId(id);
vote.setUserid("caojh");
vote.setLocation(location);
template.convertAndSend(vote);
}
發(fā)送消息是沒有問題的,但是當密集發(fā)送大量消息時,會拋出地址占用,無法創(chuàng)建connection(或者在某些較老版本下,當消息數(shù)達到65535條的時候,也會無法再次發(fā)送,只能重啟隊列),具體異常如下
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS
processing; nested exception is javax.jms.JMSException: Could not connect to broker
URL: tcp://192.168.20.23:61616. Reason: java.net.BindException: Address already in use: connect
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
at net.kentop.astoam.device.MG800DeviceService.excute(MG800DeviceService.java:423)
at net.kentop.astoam.device.MG800DeviceService$HandlerReceiveMessage.
handlerUdpData(MG800DeviceService.java:936)
at net.kentop.mon4mg.monitor.UDPReceiverThread.run(UDPReceiverThread.java:51)
Caused by: javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616.
Reason: java.net.BindException: Address already in use: connect
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:35)
at org.apache.activemq.ActiveMQConnectionFactory.
createActiveMQConnection(ActiveMQConnectionFactory.java:286)
at org.apache.activemq.ActiveMQConnectionFactory.
createActiveMQConnection(ActiveMQConnectionFactory.java:230)
at org.apache.activemq.ActiveMQConnectionFactory.
createConnection(ActiveMQConnectionFactory.java:178)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:456)
4 more
無法發(fā)送的真正原因就在這里了
http://activemq.apache.org/jmstemplate-gotchas.html
這里有這么兩段話
The thing to remember is JmsTemplate is designed for use in EJBs using the EJB containers JMS pooling abstraction. So every method will typically create a connection, session, producer or consumer, do something, then close them all down again. The idea being that this will use the J2EE containers pooling mechanism to pool the JMS resources under the covers. Without using a pooled JMS provider from the EJB container this is the worst possible way of working with JMS; since typically each create/close of a connection, producer/consumer results in a request-response with the JMS broker.
You should only use JmsTemplate with a pooled JMS provider. In J2EE 1.4 or later that typically means a JCA based JMS ConnectionFactory. If you are in an EJB then make sure you use your J2EE containers ConnectionFactory, never a plain-old-connection factory. If you are not inside an EJB Then you should use our PooledConnectionFactory, then things will be nicely pooled. If you need to take part in XA transactions then look into our spring based JCA Container. 大致意思就是說,每次發(fā)送一條消息時,都會創(chuàng)建一個connection和session,發(fā)送/接收完畢后再全部銷毀。如果沒有相應的pool機制,要發(fā)送大量消息,就會頻繁的創(chuàng)建、銷毀連接,這將是一個相當糟糕的選擇。
至于解決的方法,第二段話也說的很清楚,要不就使用EJB容器的
ConnectionFactory,要么就使用ActiveMQ提供的
PooledConnectionFactory,這個類其實也是實現(xiàn)了ConnectionFactory接口
使用如下的配置,就可以避免上面的異常了
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>
【題外話】是否可以使用同一個connection并發(fā)發(fā)送/接收消息
可以參考
http://activemq.apache.org/can-i-send-and-receive-messages-concurrently-on-one-jms-connection.html
文中寫的很明白,每個發(fā)送者/接收者應該使用獨立的session,每個connection可以隨意創(chuàng)建任意多個session