場景,餐廳:
- 食客下單,有飲品、食物、甜點
- 侍應(yīng)接單,傳送給廚房
- 廚房分三個子流程處理,即飲品、食物、甜點子流程
- 等待三個子流程處理完,合并成一份交付
- 如果廚房發(fā)現(xiàn)某食物欠缺,會通知侍應(yīng),展開錯誤處理,即通知食客更改食物,再交給廚房
- 侍應(yīng)將交付品傳送給食客
有一個主流程、三個子流程和一個聚合流程,聚合流程會聚合三個子流程的產(chǎn)物,通知主流程,再往下走。
并且主流程會感知子流程的錯誤,并會交給相應(yīng)錯誤處理流程處理,且將結(jié)果再交給聚合流程。
對應(yīng)SPRING INTEGRATION 的SCATTERGATHER模式:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
https://github.com/adnanmamajiwala/spring-integration-sample/tree/master/dsl-scatter-gather/src/main/java/com/si/dsl/scattergatherhttps://docs.spring.io/spring-integration/docs/5.1.x/reference/html/#scatter-gather