SPRING CLOUD STREAM 3.x 版本時,之前的一些編程模式,如@Enablebindding,@StreamListenner等注釋被廢棄了,這是由于一些框架的代碼必需由用戶編寫,如配置框架用的Input MessageChannel,Output MessageChannel,連接MessageHandler與MessageChannel等,被視為不必要的動作。為了簡化用戶代碼,于是推出Functional Programming Model。
引入了新名詞:Supplier、Function與Consumer。實際上這幾個類可視為Adapter,如果之前已經有存在的Service類,且方法名為各種各樣,可以重新包裝成Supplier、Function與Consumer,并在固定的方法名:apply/get/accept中調用Service的方法。
Supplier
當在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Output MessageChannel,并連接上此Bean,后續只需要在BINDDING中加入對應的Destination Name,即可向BROKER發消息了。
Consumer
當在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input MessageChannel,并連接上此Bean,后續只需要在BINDDING中加入對應的Destination Name,即可收到BROKER推送關于此Destination的消息了。
Function
當在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input和Output MessageChannel,并連接上此Bean,后續只需要在BINDDING中分別對Input和Output MessageChannel加入對應的Destination Name1/Name2,即可收到BROKER推送關于此Destination的消息,也可以向BROKER發消息了。
與SPRING INTEGRATION的整合
如果要對消息進行復雜處理,如拆分消息、聚合消息、IF ELSE消息等,就要借助SPRING INTEGRATION了。
@Bean
public IntegrationFlow upperCaseFlow(LoanService loanService) {
return IntegrationFlows
//turn this IntegrationFlow as a gateway, here is a Function interface
//with loadCheckerFunction as bean name
.from(LoadCheckerFunction.class, gateway -> gateway.beanName("loadCheckerFunction"))
.handle(loanService, "check")
.logAndReply(LoggingHandler.Level.WARN);
}
public interface LoadCheckerFunction extends Function<Loan, Loan>{
}
IntegrationFlows.from(Class<?> serviceInterface)是可以將本IntegrationFlow包裝成serviceInterface的實現類,如果調用此接口,最終會返回IntegrationFlow最后一個步驟的實體,如果這個serviceInterface是Function的話,剛好和SPRING CLOUD STREAM對接上。
后續在spring.cloud.stream.function.definition加入此Bean的名稱loadCheckerFunction,SPRING CLOUD STREAM就會幫你生成一個Input和Output MessageChannel,并連接上此Bean,再在BINDDING中分別對Input和Output MessageChannel加入對應的Destination Name1/Name2,即可收到BROKER推送關于此Destination的消息,也可以向BROKER發消息。
application.yaml
# This setting can increase or decrease the rate of message production (1000 = 1s)
# spring.cloud.stream.poller.fixed-delay=1000
# This setting can control which function method in our code will be triggered if there are multiple
# spring.cloud.function.definition=supplyLoan
# Give the autogenerated binding a friendlier name
spring:
application:
name: loan-check-rabbit
banner:
location: classpath:/banner-rabbit.txt
cloud:
stream:
function.definition: loadCheckerFunction
#BindingProperties
bindings:
loadCheckerFunction-in-0:
destination: queue.pretty.log.messages
binder: local_rabbit
loadCheckerFunction-out-0:
destination: queue.pretty.approved.messages
binder: local_rabbit
#BinderProperties
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 10.80.27.69
port: 5672
username: guest
password: guest
virtual-host: my-virtual-host
Reference
https://spring.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration