如何在 Spring 使用@Async,@EnableAsync注釋進行異步處理:
異步處理適用那些與業務邏輯(橫切關注點)不直接相關或者不作為其他業務邏輯輸入的部分,也可在分布式系統中解耦。
*譯注:橫切關注點(cross-cutting concerns)指一些具有橫越多個模塊的行為,使用傳統的軟件開發方法不能夠達到有效模塊化的一類特殊關注點。*
Spring 中,`@Async`注解可以標記異步操作。然而,使用`@Async`時有一些限制,僅僅把它加在方法上并不能確保方法會在獨立的線程中執行。如果你只是偶爾用到 `@Async`,需要格外當心。
1. @Async 的工作機制
首先為方法添加 `Async` 注解。接著,Spring 會基于 `proxyTargetClass` 屬性,為包含 `Async` 定義的對象創建代理(JDK Proxy/CGlib)。
最后,Spring 會嘗試搜索與當前上下文相關的線程池,把該方法作為獨立的執行路徑提交。確切地說,Spring 會搜索唯一的 `TaskExecutor` bean 或者名為 `taskExecutor` 的 bean。如果找不到,則使用默認的 `SimpleAsyncTaskExecutor`。
要完成上面的過程,使用中需要注意幾個限制,否則會出現 `Async` 不起作用的情況。
2. @Async 的限制
1. 必須在標記 `@ComponentScan` 或 `@configuration` 的類中使用 `@Async`。
未來實現類獲取異步處理結果
如果想要獲取異步處理的結果,可以通過未來接口的實現類調用得到()方法獲得。
未來接口的常見實現類有FutureTask。
在SpringBoot中,一般用AsyncResult作為異步結果。
future 缺點:
使用Future
獲得初始化執行結果時,可以使用初始化附加方法get()
,或者替換看isDone()
是否為true
,這兩種方法都不是很好,因為主線程也會被迫等待。
從Java 8開始約會了CompletableFuture
,它針對Future
了改進之處,可以針對某些對象,當初始化任務完成或發生異常時,自動調用對象的替代方法。下面會詳細解釋:
示例:spring boot工程初步處理業務類
1.AsyncTaskManager
@Service
@EnableAsync
public class AsyncTaskManager {
/**
* 這個業務注入的類
*/
@Autowired
private MessageDao messageDao;
/**
* @Async注解表示異步,后面的參數對應于線程池配置類ExecutorConfig中的方法名asyncServiceExecutor()
* 如果不寫后面的參數,直接使用@Async注解,則是使用默認的線程池
* Future<String>為異步返回的結果。可以通過get()方法獲取結果。
* @param s
* @throws Exception
*/
@Async(value = "asyncTaskExecutor")
public void transTask(String s) throws Exception {
messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+"--"+s+" ;time="+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
TimeUnit.SECONDS.sleep(6);
}
/**
* 異步調用,有返回值,必須是Future類型,不然報錯
* 如果不寫后面的參數,直接用@Async,則是使用默認的線程池。
* 使用Future獲得異步執行結果時,要么調用阻塞方法get(),要么輪詢看isDone()是否為true,這兩種方法都不是很好,因為主線程也會被迫等待
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public Future<String> transTaskForFuture(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}
/**
* 基于回調的listenableFuture比上種子線程直接返回Future優質是,主線程不用等待,任務在完成后會自動執行回調代碼。
* 因此在調用時要注冊回調代碼,包括成功回調和失敗回調
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public ListenableFuture<String> transTaskForCallback(String s) {
String result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
return AsyncResult.forExecutionException(e);
}
return AsyncResult.forValue(result);
}
/**
* 從Java 8開始引入了CompletableFuture,它針對Future做了改進,可以傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法
* 最主要是可以提供復雜的
* CompletableFuture可以指定異步處理流程:
* thenAccept()處理正常結果;
* exceptional()處理異常結果;
* thenApplyAsync()用于串行化另一個CompletableFuture;
* anyOf()和allOf()用于并行化多個CompletableFuture。
* 詳解請看 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
* @param s
* @return
*/
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture(String s) {
Object result=null;
try {
result=messageDao.getMessage(s);
System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
TimeUnit.SECONDS.sleep(6);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
@Async(value = "asyncTaskExecutor")
public CompletableFuture<Object> transTaskForCompletableFuture2(int s) {
Object result=null;
try {
result=messageDao.getUserCode(s);
System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
return AsyncResult.forExecutionException(e).completable();
}
return AsyncResult.forValue(result).completable();
}
}
Dao層業務類:
@Repository
public class MessageDao {
public String getMessage(String s){
return s;
}
public String callBackMessage(String s){
return "這是注冊回調返回結果s="+s;
}
public String getUserCode(int id){
return "000"+id;
}
public String getUserName(String code){
return "李四";
}
public String getUserDepartment(String code){
return "技術開發部";
}
}
線程池ThreadPoolTaskExecutor
SpringBoot中的線程池一般用ThreadPoolTaskExecutor類
。ThreadPoolTaskExecutor繼承關系如下:
ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor
關系結構圖為:

2.自定義線程池配置如下:
@Configuration
public class AsyncTaskConfig {
/**
* IO密集型任務 = 一般為2*CPU核心數(常出現于線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等)
* CPU密集型任務 = 一般為CPU核心數+1(常出現于線程中:復雜算法)
* 混合型任務 = 視機器配置和復雜度自測而定
*/
@Bean(name = "asyncTaskExecutor")
public ThreadPoolTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//1: 核心線程數目
executor.setCorePoolSize(4);
//2: 指定最大線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
executor.setMaxPoolSize(10);
//3: 隊列中最大的數目
executor.setQueueCapacity(200);
//4: 線程名稱前綴
executor.setThreadNamePrefix("LocustTask-");
//5:當pool已經達到max size的時候,如何處理新任務
// CallerRunsPolicy: 會在execute 方法的調用線程中運行被拒絕的任務,如果執行程序已關閉,則會丟棄該任務
// AbortPolicy: 拋出java.util.concurrent.RejectedExecutionException異常
// DiscardOldestPolicy: 拋棄舊的任務
// DiscardPolicy: 拋棄當前的任務
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//6: 線程空閑后的最大存活時間(默認值 60),當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
executor.setKeepAliveSeconds(60);
//7:線程空閑時間,當線程空閑時間達到keepAliveSeconds(秒)時,線程會退出,直到線程數量等于corePoolSize,如果allowCoreThreadTimeout=true,則會直到線程數量等于0
executor.setAllowCoreThreadTimeOut(false);
executor.initialize();
return executor;
}
}
@EnableAsync開啟初步
@EnableAsync表示開啟初始,可以放在@Controller層上方,也可以放在Applicationclass的上方,也可以直接放在業務類上例:
AsyncTaskManager
@Controller
@EnableAsync
public class XXXController {
@Autowired
private
AsyncTaskManager
asyncTaskManager;
@GetMapping("/user/getList")
@ResponseBody
public String getUserData(){
return
asyncTaskManager
.getAsyncResult();
}
}
Junint 4單元測試類如下
AsyncTaskTest:


1 public class AsyncTaskTest extends BaseTest {
2
3 @Autowired
4 private AsyncTaskManager asyncTaskManager;
5
6 @Autowired
7 private MessageDao messageDao;
8
9 /**
10 * 單無測試方法,沒有辦法測試多線程池郊果,因為單測試方法運行完后,整個JVM進程會水銷毀,所有測試只能啟動tomcat進行測試。
11 *
12 * @throws Exception
13 */
14 @Test
15 public void testAsyncTask() throws Exception {
16 for (int i = 1; i <= 10; i++) {
17 asyncTaskManager.transTask("2222");
18 }
19 }
20
21 /**
22 * 主線等待子線完成后,獲取返回結果
23 *
24 * @throws Exception
25 */
26 @Test
27 public void testAsyncTaskForFuture() throws Exception {
28 Future<String> future = asyncTaskManager.transTaskForFuture("AAA---BBB");
29 while (true) {
30 if (future.isDone() && !future.isCancelled()) {
31 System.out.println(Thread.currentThread().getName() + "子線程執行完畢");
32 break;
33 } else {
34 Thread.sleep(2000);
35 System.out.println("主線程" + Thread.currentThread().getName() + "待子線程執行完畢
");
36 }
37 }
38 }
39
40 /**
41 * 在調用時候,主線不用等待,可以注冊回調類和方法進行
42 *
43 * @throws Exception
44 */
45 @Test
46 public void testAsyncTaskForCallback() throws Exception {
47 // 在主要線程設置 獨有上下文變量
48 ThreadContext.setUserId(222222222222L);
49 ListenableFuture<String> future = asyncTaskManager.transTaskForCallback("AAA---BBB");
50 future.addCallback(
51 successCallback -> {
52 try {
53 String s = future.get(2L, TimeUnit.SECONDS);
54 String result = messageDao.callBackMessage(s);
55 //在線程池中子線程獲取父線程設置變量
56 System.out.println("回調結果:" + result + ";parent thread value:" + ThreadContext.getUserId());
57 } catch (Exception e) {
58 e.printStackTrace();
59 }
60 },
61 FailureCallback -> {
62 System.out.println("子線程執行失敗
.");
63 }
64 );
65 Thread.sleep(20000);
66 }
67
68 /**
69 * 驗證多線程常用的場景比如有: 4個任務需要4個線程去執行,同時成功后才執行相應操作
70 * A,B,C,D 4 個任務
71 * CompletableFuture.allOf()方法
72 * 由于 allOf 聚合了多個 CompletableFuture 實例,所以它是沒有返回值的。這也是它的一個缺點
73 * @throws Exception
74 */
75 @Test
76 public void testAsyncTaskForAllOf() throws Exception {
77 CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
78 CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
79 CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
80 CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
81 // 1: 把4個線程返回 completableFuture_3 組合成一個
82 CompletableFuture alloff=CompletableFuture.allOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
83 // 2:如果沒有后續的動作,可以直接 join()和get() 執行結果,主線程一直被阻塞,一直等到用戶線程返回,如果不使用join 和get 主線程不會被阻塞
84 // CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區別在于 join() 拋出的是 unchecked Exception。
85 String result= (String)alloff.join();
86 System.out.println("所有任務同時完成"+result);
87 Thread.sleep(20000);
88 }
89
90 /**
91 * 驗證多線程常用的場景比如有: 4個任務需要4個線程去執行,同時成功后才執行相應操作
92 * A,B,C,D 4 個任務
93 * CompletableFuture.anyOf()方法 其中有一個執行成功,就算完成
94 *
95 * @throws Exception
96 */
97 @Test
98 public void testAsyncTaskForAnyOf() throws Exception {
99 CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
100 CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
101 CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
102 CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
103 CompletableFuture anyOf=CompletableFuture.anyOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
104 //這里利用Jdk8函數式接口lambda表達式來實現匿名內部類,?是泛型通配符
105 Object s=anyOf.get(1500,TimeUnit.MILLISECONDS);
106 System.out.println(" anyof 輸出結果 s="+s);
107 Thread.sleep(20000);
108 }
109
110 /**
111 * 驗證多線程常用的場景比如有: 3個任務需要3個線程去執行
112 * 根據 A 方法 異步返回結果,分別去異步執行 查詢員工名稱和部門,然后返回結果
113 * @throws Exception
114 */
115 @Test
116 public void testAsyncTaskForCompletableFuture2() throws Exception {
117 CompletableFuture<Object> completableFuture_A = asyncTaskManager.task1("task-1");
118 // 1: 如果A成功后返回結果,作為B的入參去執行(thenApply 方法 都是在自己當前線程中執行)
119 CompletableFuture<Object> fetchNameFuture_B = completableFuture_A.thenApplyAsync((result) ->{
120 return messageDao.getUserName((String)result);
121 }
122 );
123 //2:B 執行成功后結果作為入參,執行C,然后返回
124 CompletableFuture<Object> fetchNameFuture_C=fetchNameFuture_B.thenApplyAsync((result)->{
125 return messageDao.getUserDepartment((String)result);
126 });
127 // join()會一直程序會一直block
128 System.out.println(fetchNameFuture_C.join());
129 // 手動完成一個complete,會立即執行,可以看到future調用complete(T t)會立即執行。但是complete(T t)只能調用一次,后續的重復調用會失效
130 //future已經執行完畢能夠返回結果,此時再調用complete(T t)則會無效
131 System.out.println(fetchNameFuture_B.complete("complete"));
132 Thread.sleep(90000);
133 }
134
135 /**
136 * 這個方法驗證把兩個異步線程的結果聚合起來返回
137 * @throws Exception
138 */
139 @Test
140 public void testAsyncTaskForThenCombine() throws Exception {
141 //1: 第一個查詢查詢員工消息,
142 CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
143 CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
144
145 CompletableFuture<Object> future=futureA.thenCombine(futureB,(resultA,resultB)->{
146 return resultA+";"+resultB;
147 });
148 Object s=future.join();
149 System.out.println(" result:"+ s);
150 // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
151 Thread.sleep(20000);
152 }
153 /**
154 * 這個方法驗證thenAcceptBoth接口是指,接受兩個異步線程,等待兩個完成后,做下一步動作,它的第二個參數是一個消費型的函數接口
155 * BiConsumer 這就標明它可以對上邊傳入的異步線程的結果做處理(改變傳入線程結果的值),并且沒有返回值
156 * @throws Exception
157 */
158 @Test
159 public void testAsyncTaskForThenAcceptBoth() throws Exception {
160 //1: 第一個查詢查詢員工消息
161 CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
162 CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
163
164 CompletableFuture<Void> allResult=futureA.thenAcceptBoth(futureB,(resultA,resultB)->{
165 String result=messageDao.getUserDepartment(resultA+";"+resultB);
166 System.out.println("======result="+result);
167 });
168 // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
169 Thread.sleep(20000);
170 }
171
172 /**
173 * 驗證futureA,futureB 兩個異步線程,其中一個返回,就返回。
174 * @throws Exception
175 */
176
177 @Test
178 public void testAsyncTaskForAcceptEither() throws Exception {
179 //1: 第一個查詢查詢員工消息
180 CompletableFuture <Object> futureA = asyncTaskManager.task1(“ task-1”);
181 CompletableFuture <Object> futureB = asyncTaskManager.task2(“ task-2”);
182 futureA.acceptEither(futureB,(result)-> {
183 字符串s = messageDao.getUserName(result +“”);
184 System.out.println(“它的一個串行返回返回的結果:” + s);
185 });
186 // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:187
線程。睡眠(20000);
188 }
189 }
190