關于endpoint配置補充
endpoint配置相對比較靈活,下面再來看一個例子:
<endpoint input-channel="inputChannel"
default-output-channel="outputChannel"
handler-ref="helloService"
handler-method="sayHello"/>
<beans:bean id="helloService" class="org.springframework.integration.samples.helloworld.HelloService"/>
1 public class HelloService {
2
3 public String sayHello(String name) {
4 return "Hello " + name;
5 }
6 }
上面這個例子就演示了把 HelloService配置成一個MessageEndpoint組件,消息從"inputChannel"隊列接收后,
調用HelloService.sayHello方法,等sayHello方法返回后,根據default-output-channel="outputChannel"的配置
把返回的結果保存到message.payload屬性后發送給"outputChannel"隊列
也可改成Annotation的方式,配置方法如下:
<annotation-driven/>
<message-bus auto-create-channels="true"/>
<context:component-scan base-package="org.springframework.integration.samples.helloworld"/>
<beans:bean id="helloService" class="org.springframework.integration.samples.helloworld.HelloService"/>
1 @MessageEndpoint(input="inputChannel", defaultOutput="outputChannel")
2 public class HelloService {
3
4 @Handler
5 public String sayHello(String name) {
6 return "Hello " + name;
7 }
8 }
設置并發操作屬性
xml配置:
<endpoint input-channel="exampleChannel" handler-ref="exampleHandler"/>
<!-- 設置并發設置 core核心線程數 max最大線程數 queue-capacity 隊列最大消息數 keep-alive idle線程生命時間-->
<concurrency core="5" max="25" queue-capacity="20" keep-alive="120"/>
</endpoint>
annotation配置
1 @MessageEndpoint(input="fooChannel")
2 @Concurrency(coreSize=5, maxSize=20, queueCapacity=20, keepAliveSeconds=120)
3 public class FooService {
4
5 @Handler
6 public void bar(Foo foo) {
7 
8 }
9 }
下面總結一下常見的annotation的使用方法
@MessageEndpoint
它表示處理消息對象的終端節點。一般與其它的元數據標記一起使用。
下面會具體介紹與該元數據標記一起使用的其它標識的使用方法。
@MessageEndpoint源代碼:
1 @Target(ElementType.TYPE)
2 @Retention(RetentionPolicy.RUNTIME)
3 @Inherited
4 @Documented
5 @Component
6 public @interface MessageEndpoint {
7
8 String input() default ""; //接收消息的隊列名稱
9 String defaultOutput() default ""; //默認發送消息的隊列名稱(只有在不設置@Router情況下才有效)
10 int pollPeriod() default 0; //發送消息的輪循時間間隔(只有在不設置@Router情況下才有效)
11 }
@Handler
消息回調處理的方法。與@MessageEndpoint一起配置
(只限M3版,M4以及后續版本可以單獨使用,具體使用方法還要等具體的實現出來),收到input隊列的消息后,回調@Handler標識的方法
回調方法的參數類型必須與message.payload屬性類型相同
注:如果回調方法有返回值, 則回調方法處理完成后,會將返回值設置到message.payload屬性后,
發送消息到@MessageEndpoint的defaultOutput隊列。如果defaultOutput沒有設定,則將拋出異常。
@Handler源代碼:
1 @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
2 @Retention(RetentionPolicy.RUNTIME)
3 @Inherited
4 @Documented
5 public @interface Handler {
6
7 }
下面的例子演示了,從"channel1"隊列接收消息后由@Handler標記的方法處理。消息的payload屬性值的類型必須與方法的參數類型相同
1 @MessageEndpoint(input="channel1")
2 public class FooService {
3
4 @Handler
5 public void bar(Foo foo) {
6 
7 }
8 }
@Polled
必須與@MessageEndPoint 一起使用。但也可以與@Router,@Splitter標識配合使用,后面再分別舉例說明。
@Polled 用于開啟一個輪循的方式調用方法的功能
它有三個參數:
period: 輪循時間間隔(單位:微秒) 默認為 1000
initialDelay: 輪循延遲時間(單位:微秒) 默認為0
fixedRate: 默認為false
@Polled 源代碼
1 @Target(ElementType.METHOD)
2 @Retention(RetentionPolicy.RUNTIME)
3 @Inherited
4 @Documented
5 public @interface Polled {
6 int period() default 1000;
7 long initialDelay() default PollingSchedule.DEFAULT_INITIAL_DELAY;
8 boolean fixedRate() default PollingSchedule.DEFAULT_FIXED_RATE;
9
10 }
下面來看幾個例子:
例子1:
1 @MessageEndpoint(defaultOutput="outputChannel")
2 public class Counter {
3
4 private AtomicInteger count = new AtomicInteger();
5
6 @Polled(period=3000)
7 public int getNumber() {
8 return count.incrementAndGet();
9 }
10 }
這個例子功能是,MessageBus啟動后,由@Polled標記 每3秒觸發getNumber方法,當getNumber方法返回后,
把返回值設置到message.payload屬性后,發送到outputChannel隊列中。
例子2:
1 @MessageEndpoint
2 public class Counter {
3
4 private AtomicInteger count = new AtomicInteger();
5
6 @Polled(period=3000)
7 public int getNumber() {
8 return count.incrementAndGet();
9 }
10
11 @Router
12 public String resolveChannel(int i) {
13 if (i % 2 == 0) {
14 return "even";
15 }
16 return "odd";
17 }
18 }
這個例子功能是,MessageBus啟動后,由@Polled標記 每3秒觸發getNumber方法, 當getNumber方法返回后,
由于resolveChannel方法設置了@Router 標識,則把有getNumber方法返回值都會傳給resolveChannel方法,最后
會根據方法的返回值(即隊列名稱) (@Router標記功能),把消息發到even隊列或是odd隊列
注:如果@MessageEndpoint類中,在方法上標記了@Router標記后,@MessageEndpoint的defaultOutput就變成無效了。
@Splitter
必須與@MessageEndPoint 一起使用。
該元數據標識用于分解消息內容,它所在的方法的返回值必須是一個集合(collection).如果集合元素不是Message類型
但發送時,自動把集合中的元素對象保存到Message.payload屬性后發送。集合中有多少個元素,則會發送多少次消息。
1 @Target(ElementType.METHOD)
2 @Retention(RetentionPolicy.RUNTIME)
3 @Documented
4 @Handler
5 public @interface Splitter {
6 String channel(); //消息發送的隊列名
7 }
下面兩個例子實現效果是一樣的
例子1
1 @MessageEndpoint(input="inputChannel")
2 public class HelloService {
3
4 @Splitter(channel="outputChannel")
5 public List<String> sayHello(String name) {
6 String s = "Hello " + name;
7 List<String> list = new ArrayList<String>();
8 list.add(s);
9 return list;
10 }
11 }
12
例子2
1 @MessageEndpoint(input="inputChannel")
2 public class HelloService {
3
4 @Splitter(channel="outputChannel")
5 public List<Message> sayHello(String name) {
6 String s = "Hello " + name;
7 List<Message> list = new ArrayList<Message>();
8 Message message = new GenericMessage<String>(s);
9 list.add(message);
10 return list;
11 }
12 }
@Router
消息隊列路由功能。 必須與@MessageEndPoint 一起使用。
它所在的方法的返回值類型必須是MessageChannle或是String(channel name)類型
具體的例子參考上面。
@Publisher
必須與@MessageEndPoint 一起使用。
說明:@Publisher 標識是根據 after-returning 切面的AOP 在方面返回值時,發送消息到指定消息隊列 .
下面的例子:說明 foo方法調用后,返回值會發送到fooChannel消息隊列
1 @Publisher(channel="fooChannel")
2 public String foo() {
3 return "bar";
4 }
@Subscriber
必須與@MessageEndPoint 一起使用。
接收指定隊列的消息內容。它實現的異步的消息監聽事件。
一旦有消息接收到,則會根據message.payload 值作為參數,回調@Subscriber 標識標記的方法
1 @Subscriber(channel="fooChannel")
2 public void log(String foo) {
3 System.out.println(foo);
4 }
Good Luck!
Yours Matthew!
2008年4月24日