Lessons
In our first lesson, you will get introduced to the concepts of Enterprise Application Integration. You will learn about the and Enterprise integration patterns that can be applied to simplify integration between different platforms and the Integration strategies that can be followed for this purpose. Finally, we will discuss how and why to implement a Message driven architecture and how to achieve both Synchronous and asynchronous communication among nodes.
In this lesson, you will get to understand how Spring Integration works under the hood. The core concepts of Spring Integration messaging system (like message channels and endpoints) will be introduced. Additionally, the components that build the framework will be discussed, including the channel adapters, transformers, filters, routers etc. Finally, the two distinct methods of communication (synchronous and asynchronous) are explained and the lesson ends with a discussion on error handling.
In this lesson, we will focus on the integration with external web services. Spring Integration comes with the necessary functionality (adapters, channels etc.) to support web services out of the box. A full example is built from scratch in order to better understand the topic.
In this lesson, we will focus on integrating our application with JMS messaging. For this purpose, we will use Active MQ, which will be our broker. We will show examples of sending and receiving JMS messages by using the Spring Integration JMS channel adapters. Following these examples, we will see some ways of customizing these invocations by configuring message conversion and destination resolution.
In this lesson, we will wrap everything up by providing a complete application that uses several of the components provided by Spring Integration in order to provide a service to its users. We will discuss the system architecture, the actual implementation and the relevant error handling.
In this lesson, we will examine different mechanisms of monitoring or gathering more information about what is going on within the messaging system. Some of these mechanisms consist of managing or monitoring the application through MBeans, which are part of the JMX specification. Another mechanism discussed in this chapter is how we will implement the EIP idempotent receiver pattern using a metadata store. Finally, the last mechanism described is the control bus. This will let us send messages that will invoke operations on components in the application context.
vi /etc/resolv.conf
nameserver 8.8.8.8
如果要對JMS BROKER生產和消費MESSAGE,一種方式是用JmsTemplate發送和消費消息,另一種方式是SPRING INTEGRATION。
SPRING INTEGRATION是實現了EIP模式的一種框架,即使用CHANNEL和JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER,完全脫離了JmsTemplate的API。
如果需要實現這種場景:從BROKER取一條消息,處理消息,且處理途中不要再從BROKER再取消息,處理完后再取消息,再處理。
這樣要求手動開始和停止JMS LISTENER,即手動開始和停止JMS-INBOUND-ADAPTER、JMS-OUTBOUND-ADAPTER。
@Bean
@InboundChannelAdapter(value = "loaderResponseChannel")
public MessageSource loaderResponseSource() throws Exception {
return Jms
.inboundAdapter(oracleConnectionFactory())
.configureJmsTemplate(
t -> t.deliveryPersistent(true)
.jmsMessageConverter(jacksonJmsMessageConverter())
).destination(jmsInbound).get();
}
當使用@InboundChannelAdapter時,會自動注冊一個SourcePollingChannelAdapter ,但這個名字比較長:configrationName.loaderResponseSource.inboundChannelAdapter。
呼叫這個實例的start()和stop()方法即可。
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("controlBus")
.controlBus()
.get();
}
Message operation = MessageBuilder.withPayload("@configrationName.loaderResponseSource.inboundChannelAdapter.start()").build();
operationChannel.send(operation)
https://stackoverflow.com/questions/45632469/shutdown-spring-integration-with-jms-inboundadapterhttps://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html#control-bushttps://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.javahttps://stackoverflow.com/questions/50428552/how-to-stop-or-suspend-polling-after-batch-job-fail
CountDownLatch、CyclicBarrier和Semaphore這三個并發輔助類,可以在線程中呼叫,使得線程暫停等,但各有不同。
1、初始化,并傳入計數器
2、向不同的線程傳入CountDownLatch實例
3、如果在某一線程中呼叫await(),則此線程被掛起,直到計數器為0,才往下執行
4、如果在某一線程中呼叫countDown(),計數器減1
5、最終如果計數器值為0時,則CountDownLatch實例不再起作用了,即為一次性的
1、初始化,并傳入計數器值,也可傳入一個Runnable類,會在計數器為0時,被執行
2、向不同的線程傳入CyclicBarrier實例
3、如果在某一線程中呼叫await(),則此線程被掛起,直到計數器為0,才往下執行
4、其他線程呼叫await(),則此線程被掛起,直到計數器為0,才往下執行
5、最終如果計數器值為0時,則CyclicBarrier實例會將計數器值恢復,又可重用
1、初始化,并傳入計數器值
2、向不同的線程傳入Semaphore實例
3、如果在某一線程中呼叫acquire(),則Semaphore實例會將計數器值減1,如果計數器值為-1,則將計數器值置為0,此線程被掛起,直到計數器值大于1時,才往下執行
4、此線程需呼叫release(),使得計數器值+1,以便其他線程在計數器值為0時不受阻
CountDownLatch 例子:
public class Test {
public static void main(String[] args) {
final CountDownLatch latch =
new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("子線程"+Thread.currentThread().getName()+"正在執行");
Thread.sleep(3000);
System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢");
latch.countDown();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("子線程"+Thread.currentThread().getName()+"正在執行");
Thread.sleep(3000);
System.out.println("子線程"+Thread.currentThread().getName()+"執行完畢");
latch.countDown();
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
System.out.println("等待2個子線程執行完畢

");
latch.await();
System.out.println("2個子線程已經執行完畢");
System.out.println("繼續執行主線程");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
結果:
線程Thread-0正在執行
線程Thread-1正在執行
等待2個子線程執行完畢

線程Thread-0執行完畢
線程Thread-1執行完畢
2個子線程已經執行完畢
繼續執行主線程
CyclicBarrier例子:
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier =
new CyclicBarrier(N,
new Runnable() {
@Override
public void run() {
System.out.println("當前線程"+Thread.currentThread().getName());
}
});
for(
int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer
extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據

");
try {
Thread.sleep(5000);
//以睡眠來模擬寫入數據操作
System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其他線程寫入完畢");
cyclicBarrier.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有線程寫入完畢,繼續處理其他任務

");
}
}
}
執行結果:
線程Thread-0正在寫入數據

線程Thread-1正在寫入數據

線程Thread-2正在寫入數據

線程Thread-3正在寫入數據

線程Thread-0寫入數據完畢,等待其他線程寫入完畢
線程Thread-1寫入數據完畢,等待其他線程寫入完畢
線程Thread-2寫入數據完畢,等待其他線程寫入完畢
線程Thread-3寫入數據完畢,等待其他線程寫入完畢
當前線程Thread-3
所有線程寫入完畢,繼續處理其他任務

所有線程寫入完畢,繼續處理其他任務

所有線程寫入完畢,繼續處理其他任務

所有線程寫入完畢,繼續處理其他任務

Semaphore例子:
public class Test {
public static void main(String[] args) {
int N = 8;
//工人數
Semaphore semaphore =
new Semaphore(5);
//機器數目
for(
int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker
extends Thread{
private int num;
private Semaphore semaphore;
public Worker(
int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+
this.num+"占用一個機器在生產

");
Thread.sleep(2000);
System.out.println("工人"+
this.num+"釋放出機器");
semaphore.release();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
執行結果:
工人0占用一個機器在生產

工人1占用一個機器在生產

工人2占用一個機器在生產

工人4占用一個機器在生產

工人5占用一個機器在生產

工人0釋放出機器
工人2釋放出機器
工人3占用一個機器在生產

工人7占用一個機器在生產

工人4釋放出機器
工人5釋放出機器
工人1釋放出機器
工人6占用一個機器在生產

工人3釋放出機器
工人7釋放出機器
工人6釋放出機器
https://www.cnblogs.com/dolphin0520/p/3920397.html
https://juejin.im/post/5aeec3ebf265da0ba76fa327