場(chǎng)景,餐廳:
- 食客下單,有飲品、食物、甜點(diǎn)
- 侍應(yīng)接單,傳送給廚房
- 廚房分三個(gè)子流程處理,即飲品、食物、甜點(diǎn)子流程
- 等待三個(gè)子流程處理完,合并成一份交付
- 如果廚房發(fā)現(xiàn)某食物欠缺,會(huì)通知侍應(yīng),展開錯(cuò)誤處理,即通知食客更改食物,再交給廚房
- 侍應(yīng)將交付品傳送給食客
有一個(gè)主流程、三個(gè)子流程和一個(gè)聚合流程,聚合流程會(huì)聚合三個(gè)子流程的產(chǎn)物,通知主流程,再往下走。
并且主流程會(huì)感知子流程的錯(cuò)誤,并會(huì)交給相應(yīng)錯(cuò)誤處理流程處理,且將結(jié)果再交給聚合流程。
對(duì)應(yīng)SPRING INTEGRATION 的SCATTERGATHER模式:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
https://github.com/adnanmamajiwala/spring-integration-sample/tree/master/dsl-scatter-gather/src/main/java/com/si/dsl/scattergatherhttps://docs.spring.io/spring-integration/docs/5.1.x/reference/html/#scatter-gather