<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 22, comments - 32, trackbacks - 0, articles - 73
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    如何在 Spring 使用@Async,@EnableAsync注釋進行異步處理:

    異步處理適用那些與業務邏輯(橫切關注點)不直接相關或者不作為其他業務邏輯輸入的部分,也可在分布式系統中解耦。

    *譯注:橫切關注點(cross-cutting concerns)指一些具有橫越多個模塊的行為,使用傳統的軟件開發方法不能夠達到有效模塊化的一類特殊關注點。*

    Spring 中,`@Async`注解可以標記異步操作。然而,使用`@Async`時有一些限制,僅僅把它加在方法上并不能確保方法會在獨立的線程中執行。如果你只是偶爾用到 `@Async`,需要格外當心。

    1. @Async 的工作機制

    首先為方法添加 `Async` 注解。接著,Spring 會基于 `proxyTargetClass` 屬性,為包含 `Async` 定義的對象創建代理(JDK Proxy/CGlib)。
    最后,Spring 會嘗試搜索與當前上下文相關的線程池,把該方法作為獨立的執行路徑提交。確切地說,Spring 會搜索唯一的 `TaskExecutor` bean 或者名為 `taskExecutor` 的 bean。如果找不到,則使用默認的 `SimpleAsyncTaskExecutor`。

    要完成上面的過程,使用中需要注意幾個限制,否則會出現 `Async` 不起作用的情況。

    2. @Async 的限制

    1. 必須在標記 `@ComponentScan` 或 `@configuration` 的類中使用 `@Async`。

    未來實現類獲取異步處理結果

    如果想要獲取異步處理的結果,可以通過未來接口的實現類調用得到()方法獲得。
    未來接口的常見實現類有FutureTask。
    在SpringBoot中,一般用AsyncResult作為異步結果。

    future 缺點:

    使用Future獲得初始化執行結果時,可以使用初始化附加方法get(),或者替換看isDone()是否為true,這兩種方法都不是很好,因為主線程也會被迫等待。

    從Java 8開始約會了CompletableFuture,它針對Future了改進之處,可以針對某些對象,當初始化任務完成或發生異常時,自動調用對象的替代方法。下面會詳細解釋:

    示例:spring boot工程初步處理業務類
    1.AsyncTaskManager
    @Service
    @EnableAsync
    public class AsyncTaskManager {
    /**
    * 這個業務注入的類
    */
    @Autowired
    private MessageDao messageDao;

    /**
    * @Async注解表示異步,后面的參數對應于線程池配置類ExecutorConfig中的方法名asyncServiceExecutor()
    * 如果不寫后面的參數,直接使用@Async注解,則是使用默認的線程池
    * Future<String>為異步返回的結果。可以通過get()方法獲取結果。
    * @param s
    * @throws Exception
    */
    @Async(value = "asyncTaskExecutor")
    public void transTask(String s) throws Exception {
    messageDao.getMessage(s);
    System.out.println(Thread.currentThread().getName()+"--"+s+" ;time="+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
    TimeUnit.SECONDS.sleep(6);
    }

    /**
    * 異步調用,有返回值,必須是Future類型,不然報錯
    * 如果不寫后面的參數,直接用@Async,則是使用默認的線程池。
    * 使用Future獲得異步執行結果時,要么調用阻塞方法get(),要么輪詢看isDone()是否為true,這兩種方法都不是很好,因為主線程也會被迫等待
    * @param s
    * @return
    */
    @Async(value = "asyncTaskExecutor")
    public Future<String> transTaskForFuture(String s) {
    String result=null;
    try {
    result=messageDao.getMessage(s);
    System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
    TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
    return AsyncResult.forExecutionException(e);
    }
    return AsyncResult.forValue(result);
    }

    /**
    * 基于回調的listenableFuture比上種子線程直接返回Future優質是,主線程不用等待,任務在完成后會自動執行回調代碼。
    * 因此在調用時要注冊回調代碼,包括成功回調和失敗回調
    * @param s
    * @return
    */
    @Async(value = "asyncTaskExecutor")
    public ListenableFuture<String> transTaskForCallback(String s) {
    String result=null;
    try {
    result=messageDao.getMessage(s);
    System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
    TimeUnit.SECONDS.sleep(6);
    } catch (InterruptedException e) {
    return AsyncResult.forExecutionException(e);
    }
    return AsyncResult.forValue(result);
    }

    /**
    * 從Java 8開始引入了CompletableFuture,它針對Future做了改進,可以傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法
    * 最主要是可以提供復雜的
    * CompletableFuture可以指定異步處理流程:
    * thenAccept()處理正常結果;
    * exceptional()處理異常結果;
    * thenApplyAsync()用于串行化另一個CompletableFuture;
    * anyOf()和allOf()用于并行化多個CompletableFuture。
    * 詳解請看 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
    * @param s
    * @return
    */
    @Async(value = "asyncTaskExecutor")
    public CompletableFuture<Object> transTaskForCompletableFuture(String s) {
    Object result=null;
    try {
    result=messageDao.getMessage(s);
    System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
    TimeUnit.SECONDS.sleep(6);
    } catch (Exception e) {
    return AsyncResult.forExecutionException(e).completable();
    }
    return AsyncResult.forValue(result).completable();
    }
    @Async(value = "asyncTaskExecutor")
    public CompletableFuture<Object> transTaskForCompletableFuture2(int s) {
    Object result=null;
    try {
    result=messageDao.getUserCode(s);
    System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
    TimeUnit.SECONDS.sleep(2);
    } catch (Exception e) {
    return AsyncResult.forExecutionException(e).completable();
    }
    return AsyncResult.forValue(result).completable();
    }
    }
    Dao層業務類:


    @Repository
    public class MessageDao {

    public String getMessage(String s){
    return s;
    }

    public String callBackMessage(String s){
    return "這是注冊回調返回結果s="+s;
    }

    public String getUserCode(int id){
    return "000"+id;
    }
    public String getUserName(String code){
    return "李四";
    }
    public String getUserDepartment(String code){
    return "技術開發部";
    }
    }
     

    線程池ThreadPoolTask​​Executor

    SpringBoot中的線程池一般用ThreadPoolTask​​Executor類
    。ThreadPoolTask​​Executor繼承關系如下:

    ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor
    

    關系結構圖為:

    2.自定義線程池配置如下:

    @Configuration
    public class AsyncTaskConfig {
    /**
    * IO密集型任務 = 一般為2*CPU核心數(常出現于線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等)
    * CPU密集型任務 = 一般為CPU核心數+1(常出現于線程中:復雜算法)
    * 混合型任務 = 視機器配置和復雜度自測而定
    */
    @Bean(name = "asyncTaskExecutor")
    public ThreadPoolTaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //1: 核心線程數目
    executor.setCorePoolSize(4);
    //2: 指定最大線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
    executor.setMaxPoolSize(10);
    //3: 隊列中最大的數目
    executor.setQueueCapacity(200);
    //4: 線程名稱前綴
    executor.setThreadNamePrefix("LocustTask-");
    //5:當pool已經達到max size的時候,如何處理新任務
    // CallerRunsPolicy: 會在execute 方法的調用線程中運行被拒絕的任務,如果執行程序已關閉,則會丟棄該任務
    // AbortPolicy: 拋出java.util.concurrent.RejectedExecutionException異常
    // DiscardOldestPolicy: 拋棄舊的任務
    // DiscardPolicy: 拋棄當前的任務
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //6: 線程空閑后的最大存活時間(默認值 60),當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
    executor.setKeepAliveSeconds(60);
    //7:線程空閑時間,當線程空閑時間達到keepAliveSeconds(秒)時,線程會退出,直到線程數量等于corePoolSize,如果allowCoreThreadTimeout=true,則會直到線程數量等于0
    executor.setAllowCoreThreadTimeOut(false);
    executor.initialize();
    return executor;
    }
    }

    @EnableAsync開啟初步

    @EnableAsync表示開啟初始,可以放在@Controller層上方,也可以放在Applicationclass的上方,也可以直接放在業務類上例AsyncTaskManager

    @Controller
    @EnableAsync
    public class XXXController {
        @Autowired
        private AsyncTaskManager asyncTaskManager;
    
        @GetMapping("/user/getList")
        @ResponseBody
        public String getUserData(){
            return asyncTaskManager.getAsyncResult();
        }
    }
    
    Junint 4單元測試類如下
    AsyncTaskTest:

      1 public class AsyncTaskTest extends BaseTest {
      2 
      3     @Autowired
      4     private AsyncTaskManager asyncTaskManager;
      5 
      6     @Autowired
      7     private MessageDao messageDao;
      8 
      9     /**
     10      * 單無測試方法,沒有辦法測試多線程池郊果,因為單測試方法運行完后,整個JVM進程會水銷毀,所有測試只能啟動tomcat進行測試。
     11      *
     12      * @throws Exception
     13      */
     14     @Test
     15     public void testAsyncTask() throws Exception {
     16         for (int i = 1; i <= 10; i++) {
     17             asyncTaskManager.transTask("2222");
     18         }
     19     }
     20 
     21     /**
     22      * 主線等待子線完成后,獲取返回結果
     23      *
     24      * @throws Exception
     25      */
     26     @Test
     27     public void testAsyncTaskForFuture() throws Exception {
     28         Future<String> future = asyncTaskManager.transTaskForFuture("AAA---BBB");
     29         while (true) {
     30             if (future.isDone() && !future.isCancelled()) {
     31                 System.out.println(Thread.currentThread().getName() + "子線程執行完畢");
     32                 break;
     33             } else {
     34                 Thread.sleep(2000);
     35                 System.out.println("主線程" + Thread.currentThread().getName() + "待子線程執行完畢");
     36             }
     37         }
     38     }
     39 
     40     /**
     41      * 在調用時候,主線不用等待,可以注冊回調類和方法進行
     42      *
     43      * @throws Exception
     44      */
     45     @Test
     46     public void testAsyncTaskForCallback() throws Exception {
     47         // 在主要線程設置 獨有上下文變量
     48         ThreadContext.setUserId(222222222222L);
     49         ListenableFuture<String> future = asyncTaskManager.transTaskForCallback("AAA---BBB");
     50         future.addCallback(
     51             successCallback -> {
     52                 try {
     53                     String s = future.get(2L, TimeUnit.SECONDS);
     54                     String result = messageDao.callBackMessage(s);
     55                     //在線程池中子線程獲取父線程設置變量
     56                     System.out.println("回調結果:" + result + ";parent thread value:" + ThreadContext.getUserId());
     57                 } catch (Exception e) {
     58                     e.printStackTrace();
     59                 }
     60             },
     61             FailureCallback -> {
     62                 System.out.println("子線程執行失敗.");
     63             }
     64         );
     65         Thread.sleep(20000);
     66     }
     67 
     68     /**
     69      * 驗證多線程常用的場景比如有: 4個任務需要4個線程去執行,同時成功后才執行相應操作
     70      * A,B,C,D 4 個任務
     71      * CompletableFuture.allOf()方法
     72      * 由于 allOf 聚合了多個 CompletableFuture 實例,所以它是沒有返回值的。這也是它的一個缺點
     73      * @throws Exception
     74      */
     75     @Test
     76     public void testAsyncTaskForAllOf() throws Exception {
     77         CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
     78         CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
     79         CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
     80         CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
     81         // 1: 把4個線程返回 completableFuture_3 組合成一個
     82         CompletableFuture alloff=CompletableFuture.allOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
     83         // 2:如果沒有后續的動作,可以直接 join()和get() 執行結果,主線程一直被阻塞,一直等到用戶線程返回,如果不使用join 和get 主線程不會被阻塞
     84         // CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區別在于 join() 拋出的是 unchecked Exception。
     85         String result= (String)alloff.join();
     86         System.out.println("所有任務同時完成"+result);
     87         Thread.sleep(20000);
     88     }
     89 
     90     /**
     91      * 驗證多線程常用的場景比如有: 4個任務需要4個線程去執行,同時成功后才執行相應操作
     92      * A,B,C,D 4 個任務
     93      * CompletableFuture.anyOf()方法 其中有一個執行成功,就算完成
     94      *
     95      * @throws Exception
     96      */
     97     @Test
     98     public void testAsyncTaskForAnyOf() throws Exception {
     99         CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
    100         CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
    101         CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
    102         CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
    103         CompletableFuture anyOf=CompletableFuture.anyOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
    104         //這里利用Jdk8函數式接口lambda表達式來實現匿名內部類,?是泛型通配符
    105         Object  s=anyOf.get(1500,TimeUnit.MILLISECONDS);
    106         System.out.println(" anyof 輸出結果 s="+s);
    107         Thread.sleep(20000);
    108     }
    109 
    110     /**
    111      * 驗證多線程常用的場景比如有: 3個任務需要3個線程去執行
    112      * 根據 A 方法 異步返回結果,分別去異步執行 查詢員工名稱和部門,然后返回結果
    113      * @throws Exception
    114      */
    115     @Test
    116     public void testAsyncTaskForCompletableFuture2() throws Exception {
    117         CompletableFuture<Object> completableFuture_A = asyncTaskManager.task1("task-1");
    118         // 1: 如果A成功后返回結果,作為B的入參去執行(thenApply 方法 都是在自己當前線程中執行)
    119         CompletableFuture<Object> fetchNameFuture_B = completableFuture_A.thenApplyAsync((result) ->{
    120             return messageDao.getUserName((String)result);
    121             }
    122         );
    123         //2:B 執行成功后結果作為入參,執行C,然后返回
    124         CompletableFuture<Object> fetchNameFuture_C=fetchNameFuture_B.thenApplyAsync((result)->{
    125             return messageDao.getUserDepartment((String)result);
    126         });
    127         // join()會一直程序會一直block
    128         System.out.println(fetchNameFuture_C.join());
    129         // 手動完成一個complete,會立即執行,可以看到future調用complete(T t)會立即執行。但是complete(T t)只能調用一次,后續的重復調用會失效
    130         //future已經執行完畢能夠返回結果,此時再調用complete(T t)則會無效
    131         System.out.println(fetchNameFuture_B.complete("complete"));
    132         Thread.sleep(90000);
    133     }
    134 
    135     /**
    136      * 這個方法驗證把兩個異步線程的結果聚合起來返回
    137      * @throws Exception
    138      */
    139     @Test
    140     public void testAsyncTaskForThenCombine() throws Exception {
    141         //1: 第一個查詢查詢員工消息,
    142         CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
    143         CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
    144 
    145         CompletableFuture<Object> future=futureA.thenCombine(futureB,(resultA,resultB)->{
    146             return resultA+";"+resultB;
    147         });
    148         Object s=future.join();
    149         System.out.println(" result:"+ s);
    150         // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
    151         Thread.sleep(20000);
    152     }
    153     /**
    154      * 這個方法驗證thenAcceptBoth接口是指,接受兩個異步線程,等待兩個完成后,做下一步動作,它的第二個參數是一個消費型的函數接口
    155      *  BiConsumer 這就標明它可以對上邊傳入的異步線程的結果做處理(改變傳入線程結果的值),并且沒有返回值
    156      * @throws Exception
    157      */
    158     @Test
    159     public void testAsyncTaskForThenAcceptBoth() throws Exception {
    160         //1: 第一個查詢查詢員工消息
    161         CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
    162         CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
    163 
    164         CompletableFuture<Void> allResult=futureA.thenAcceptBoth(futureB,(resultA,resultB)->{
    165             String result=messageDao.getUserDepartment(resultA+";"+resultB);
    166             System.out.println("======result="+result);
    167         });
    168         // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
    169         Thread.sleep(20000);
    170     }
    171 
    172     /**
    173      * 驗證futureA,futureB 兩個異步線程,其中一個返回,就返回。
    174      * @throws Exception
    175      */
    176 
    177     @Test
    178     public void testAsyncTaskForAcceptEither() throws Exception {
    179         //1: 第一個查詢查詢員工消息
    180         CompletableFuture <Object> futureA = asyncTaskManager.task1(“ task-1”);
    181          CompletableFuture <Object> futureB = asyncTaskManager.task2(“ task-2”);
    182          futureA.acceptEither(futureB,(result)-> {
    183              字符串s = messageDao.getUserName(result +“”);
    184              System.out.println(“它的一個串行返回返回的結果:” + s);
    185          });
    186          //  主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:187
             線程。睡眠(20000);
    188      }
    189  }
    190 

    主站蜘蛛池模板: 亚洲伦乱亚洲h视频| 久久国产精品免费网站| 午夜亚洲WWW湿好爽| 亚洲18在线天美| 亚洲av无码久久忘忧草| 亚洲人成7777影视在线观看| 亚洲国产日韩在线成人蜜芽| 亚洲国产高清在线精品一区| 亚洲男人的天堂久久精品| 亚洲中文字幕一二三四区苍井空| 亚洲五月丁香综合视频| 亚洲中文字幕久久无码| 亚洲AV无码一区二区三区久久精品| 亚洲AV成人精品日韩一区| 羞羞网站免费观看| 亚美影视免费在线观看| 永久在线观看免费视频| 精品熟女少妇a∨免费久久| 亚洲精品视频在线观看免费| 三年片在线观看免费大全| 黄a大片av永久免费| 亚洲成人国产精品| 亚洲精品无码久久千人斩| 亚洲国产精品久久久久网站| 亚洲中文无码线在线观看| 亚洲av无码专区在线观看亚| 日韩在线视频免费| 国产一区二区免费视频| 国产成人精品免费视频大| 日本免费一区二区三区最新vr| 亚洲国产成人久久综合碰| 亚洲成AV人片在线观看| 亚洲av极品无码专区在线观看| 亚洲AV无码AV日韩AV网站| 久久免费视频一区| 国产精品免费精品自在线观看| 精品少妇人妻AV免费久久洗澡| 亚洲成人一区二区| 337p欧洲亚洲大胆艺术| 亚洲av日韩aⅴ无码色老头| 三级黄色在线免费观看|