前一部分,通過XML的使用方式講解了
The Cafe Sample(小賣部訂餐例子)
本筆記接下講解以Annotation的方式來使用Spring-integration的例子。還是之前的那個(gè)示例:
The Cafe Sample(小賣部訂餐例子)
小賣部有一個(gè)訂飲料服務(wù),客戶可以通過訂單來訂購所需要飲料。小賣部提供兩種咖啡飲料
LATTE(拿鐵咖啡)和MOCHA(摩卡咖啡)。每種又都分冷飲和熱飲
整個(gè)流程如下:
1.有一個(gè)下訂單模塊,用戶可以按要求下一個(gè)或多個(gè)訂單。
2.有一個(gè)訂單處理模塊,處理訂單中那些是關(guān)于訂購飲料的。
3.有一個(gè)飲料訂購處理模塊,處理拆分訂購的具體是那些種類的飲料,把具體需要生產(chǎn)的飲料要求發(fā)給生產(chǎn)模塊
4.有一個(gè)生產(chǎn)模塊,進(jìn)行生產(chǎn)。
5.等生成完成后,有一個(gè)訂單確認(rèn)模塊(Waiter),把訂單的生成的飲料輸出。
Spring Integration以Annotation的方式,讓xml配置簡(jiǎn)化了很多
先來看一下XML方式,進(jìn)行示例的開發(fā):
配置文件如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans:beans xmlns="http://www.springframework.org/schema/integration"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:beans="http://www.springframework.org/schema/beans"
5 xmlns:context="http://www.springframework.org/schema/context"
6 xmlns:stream="http://www.springframework.org/schema/integration/stream"
7 xsi:schemaLocation="http://www.springframework.org/schema/beans
8 http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
9 http://www.springframework.org/schema/context
10 http://www.springframework.org/schema/context/spring-context-2.5.xsd
11 http://www.springframework.org/schema/integration
12 http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
13 http://www.springframework.org/schema/integration/stream
14 http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">
15
16 <!-- 開啟 Annotation支持 -->
17 <annotation-config />
18 <!-- 設(shè)置Spring 掃描 Annotation 包路徑 -->
19 <context:component-scan base-package="org.springframework.integration.samples.cafe.annotation" />
20 <!-- 配置一個(gè)GateWay組件,提供消息的發(fā)送和接收。接口Cafe,提供一個(gè)void placeOrder(Order order);方法
21 該方法標(biāo)記了@Gateway(requestChannel="orders"), 實(shí)現(xiàn)向orders隊(duì)列實(shí)現(xiàn)數(shù)據(jù)的發(fā)送
22 -->
23 <gateway id="cafe" service-interface="org.springframework.integration.samples.cafe.Cafe" />
24 <!-- 訂單Channel -->
25 <channel id="orders" />
26 <!-- 飲料訂單Channel,處理飲料的類別 -->
27 <channel id="drinks" />
28 <!-- 冷飲生產(chǎn)Channel 最大待處理的數(shù)據(jù)量為 10-->
29 <channel id="coldDrinks">
30 <queue capacity="10" />
31 </channel>
32 <!-- 熱飲生產(chǎn)Channel 最大待處理的數(shù)據(jù)量為 10-->
33 <channel id="hotDrinks">
34 <queue capacity="10" />
35 </channel>
36 <!-- 定義最終進(jìn)行生產(chǎn)的消息隊(duì)列 -->
37 <channel id="preparedDrinks" />
38 <!-- 定義一個(gè) stream 適配器,接收 deliveries隊(duì)列的消息后,直接輸出到屏幕-->
39 <stream:stdout-channel-adapter id="deliveries" />
40
41 </beans:beans>
我們來看一下整體服務(wù)是怎么啟動(dòng)的
首先我們來看一下CafeDemo這個(gè)類,它觸發(fā)下定單操作
1 public class CafeDemo {
2
3 public static void main(String[] args) {
4 //加載Spring 配置文件 "cafeDemo.xml"
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //取得 Cafe實(shí)列
13 Cafe cafe = (Cafe) context.getBean("cafe");
14 //準(zhǔn)備 發(fā)送100條消息(訂單)
15 for (int i = 1; i <= 100; i++) {
16 Order order = new Order(i);
17 // 一杯熱飲 參數(shù)說明1.飲料類型 2.數(shù)量 3.是否是冷飲(true表示冷飲)
18 order.addItem(DrinkType.LATTE, 2, false);
19 // 一杯冷飲 參數(shù)說明1.飲料類型 2.數(shù)量 3.是否是冷飲(true表示冷飲)
20 order.addItem(DrinkType.MOCHA, 3, true);
21 //下發(fā)訂單,把消息發(fā)給 orders 隊(duì)列
22 cafe.placeOrder(order);
23 }
24 }
25
26 }
下面是Cafe接口的源代碼
public interface Cafe {
//定義GateWay, 把消息發(fā)送到 orders 隊(duì)列, Message的payLoad屬性,保存 order參數(shù)值
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
OrderSplitter 源代碼
1 //設(shè)置成Spring-integration組件
2 @MessageEndpoint
3 public class OrderSplitter {
4
5 //實(shí)現(xiàn)Splitter模式, 接收 orders隊(duì)列的消息,調(diào)用orderSplitter Bean的split方法,進(jìn)行消息的分解
6 //并把分解后的消息,發(fā)送到drinks隊(duì)列
7 @Splitter(inputChannel="orders", outputChannel="drinks")
8 public List<OrderItem> split(Order order) {
9 return order.getItems();
10 }
11
12 }
OrderSplitter.split把消息拆分后,變成多個(gè)消息,發(fā)送到drinks隊(duì)列.由drinkRouter進(jìn)行消息的接收。
1 //設(shè)置成Spring-integration組件
2 @MessageEndpoint
3 public class DrinkRouter {
4
5 //實(shí)現(xiàn)Router模式,接收 drinks隊(duì)列的消息, 并觸發(fā) drinkRouter Bean的 resolveOrderItemChannel方法
6 //由在 resolveOrderItemChannel該方法的返回值(String--隊(duì)列名稱)表示把消息路由到那個(gè)隊(duì)列上(coldDrinks或hotDrinks)
7 @Router(inputChannel="drinks")
8 public String resolveOrderItemChannel(OrderItem orderItem) {
9 return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks";
10 }
11
12 }
下面看一下,如果是一杯冷飲,則消息發(fā)送到 coldDrinks隊(duì)列
如果是一杯熱飲,則消息發(fā)送到 hotDrinks隊(duì)列
接下來看coldDrinks, hotDrink 的隊(duì)列由誰來監(jiān)聽:
查看源代碼后,是由Barista.java來處理
1 //設(shè)置成Spring-integration組件
2 @Component
3 public class Barista {
4
5 private long hotDrinkDelay = 5000;
6
7 private long coldDrinkDelay = 1000;
8
9 private AtomicInteger hotDrinkCounter = new AtomicInteger();
10
11 private AtomicInteger coldDrinkCounter = new AtomicInteger();
12
13
14 public void setHotDrinkDelay(long hotDrinkDelay) {
15 this.hotDrinkDelay = hotDrinkDelay;
16 }
17
18 public void setColdDrinkDelay(long coldDrinkDelay) {
19 this.coldDrinkDelay = coldDrinkDelay;
20 }
21
22 //配置接收"hotDrinks"隊(duì)列,處理后,把結(jié)果發(fā)給隊(duì)列prepareColdDrink
23 @ServiceActivator(inputChannel="hotDrinks", outputChannel="preparedDrinks")
24 public Drink prepareHotDrink(OrderItem orderItem) {
25 try {
26 Thread.sleep(this.hotDrinkDelay);
27 System.out.println(Thread.currentThread().getName()
28 + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + " for order #"
29 + orderItem.getOrder().getNumber() + ": " + orderItem);
30 return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
31 orderItem.getShots());
32 } catch (InterruptedException e) {
33 Thread.currentThread().interrupt();
34 return null;
35 }
36 }
37
38 //配置接收"coldDrinks"隊(duì)列,處理后,把結(jié)果發(fā)給隊(duì)列prepareColdDrink
39 @ServiceActivator(inputChannel="coldDrinks", outputChannel="preparedDrinks")
40 public Drink prepareColdDrink(OrderItem orderItem) {
41 try {
42 Thread.sleep(this.coldDrinkDelay);
43 System.out.println(Thread.currentThread().getName()
44 + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #"
45 + orderItem.getOrder().getNumber() + ": " + orderItem);
46 return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
47 orderItem.getShots());
48 } catch (InterruptedException e) {
49 Thread.currentThread().interrupt();
50 return null;
51 }
52 }
53
54 }
接下來,已經(jīng)把訂單需要生產(chǎn)的飲料已經(jīng)完成,現(xiàn)在可以交給服務(wù)員(waier)交給客人了。
這里使用的aggregate模式,讓服務(wù)器等待這個(gè)訂單的所有飲料生產(chǎn)完后的,交給客戶.
1 //設(shè)置成Spring-integration組件
2 @MessageEndpoint
3 public class Waiter {
4
5 //配置 aggregator模式。
6 @Aggregator(inputChannel = "preparedDrinks", outputChannel = "deliveries", timeout = 5 * 60 * 1000)
7 public Delivery prepareDelivery(List<Drink> drinks) {
8 return new Delivery(drinks);
9 }
10
11 }
12
最后我們使用一個(gè) stream channel adaptor把訂單生產(chǎn)完成的飲料輸出。
<!-- 定義一個(gè) stream 適配器,接收 deliveries隊(duì)列的消息后,直接輸出到屏幕-->
<stream:stdout-channel-adapter id="deliveries"/>
spring-integration官網(wǎng):
http://www.springsource.org/spring-integration
Good Luck!
Yours Matthew!