如果要對(duì)JMS BROKER生產(chǎn)和消費(fèi)MESSAGE,一種方式是用JmsTemplate發(fā)送和消費(fèi)消息,另一種方式是SPRING INTEGRATION。
SPRING INTEGRATION是實(shí)現(xiàn)了EIP模式的一種框架,即使用CHANNEL和JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER,完全脫離了JmsTemplate的API。
如果需要實(shí)現(xiàn)這種場(chǎng)景:從BROKER取一條消息,處理消息,且處理途中不要再從BROKER再取消息,處理完后再取消息,再處理。
這樣要求手動(dòng)開始和停止JMS LISTENER,即手動(dòng)開始和停止JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER。
@Bean
@InboundChannelAdapter(value = "loaderResponseChannel")
public MessageSource loaderResponseSource() throws Exception {
return Jms
.inboundAdapter(oracleConnectionFactory())
.configureJmsTemplate(
t -> t.deliveryPersistent(true)
.jmsMessageConverter(jacksonJmsMessageConverter())
).destination(jmsInbound).get();
}
當(dāng)使用@InboundChannelAdapter時(shí),會(huì)自動(dòng)注冊(cè)一個(gè)SourcePollingChannelAdapter ,但這個(gè)名字比較長(zhǎng):configrationName.loaderResponseSource.inboundChannelAdapter。
呼叫這個(gè)實(shí)例的start()和stop()方法即可。
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("controlBus")
.controlBus()
.get();
}
Message operation = MessageBuilder.withPayload("@configrationName.loaderResponseSource.inboundChannelAdapter.start()").build();
operationChannel.send(operation)
https://stackoverflow.com/questions/45632469/shutdown-spring-integration-with-jms-inboundadapterhttps://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html#control-bushttps://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.javahttps://stackoverflow.com/questions/50428552/how-to-stop-or-suspend-polling-after-batch-job-fail