<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Concurrent學習—Executor框架


        java.util.concurrent包分成了三個部分,分別是:
                               java.util.concurrent
     
                               java.util.concurrent.atomic
                               java.util.concurrent.lock
        
    內容涵蓋了并發集合類、線程池機制、同步互斥機制、線程安全的變量更新工具類、鎖等等常用工具。 

     

    并發編程的一種編程方式是把任務拆分為一些列的小任務,即Runnable,然后在提交給一個Executor執行,Executor.execute(Runnalbe) Executor在執行時使用內部的線程池完成操作。

        
    例子:
                  
    有一個很大的整數數組,需要求這個數組中所有整數的和,來計算結果。  
                
    JDK 7 中的 Fork/Join模式可以解決該問題,http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/)

        
    分析:
                 
    采用多線程(任務),并且還要分割List,每一小塊的數組采用一個線程(任務)進行計算其和,那么我們必須要等待所有的線程(任務)完成之后才能得到正確的結果.

    步驟:

    • 分割數組,根據采用的線程(任務)數平均分配,即array.length/threadCounts。
    • 定義一個記錄“很大數組”中所有整數和的變量sum,采用一個線程(任務)處理一個分割后的子數組,計算子數組中所有整數和(subSum),然后把和(subSum)累加到sum上。
    • 等待所有線程(任務)完成后輸出總和(sum)的值。

     

    /**
     * 并行計算數組的和, 測試類
     * 
     * 
    @author lsb
     *
     
    */

    public class MainTest {
        
    public static void main(String[] args) {
            
    int[] numbers = new int[] 123456781011 };
            CalcArrayTotal calc 
    = new CalcArrayTotal();
            Long sum 
    = calc.sum(numbers);
            System.out.println(sum);
            calc.close();
        }

    }


    主要實現類:

    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import com.li.senbiao.Thread.concurrent.test1.SumCalculator;

    public class CalcArrayTotal {

        
    private ExecutorService exec;

        
    private CompletionService<Long> completionService;

        
    private int cpuCoreNumber;

        
    public CalcArrayTotal() {
            cpuCoreNumber 
    = Runtime.getRuntime().availableProcessors();
            exec 
    = Executors.newFixedThreadPool(cpuCoreNumber);
            completionService 
    = new ExecutorCompletionService<Long>(exec);
        }


        
    public Long sum(final int[] numbers) {
            
    // 根據CPU核心個數拆分任務,創建FutureTask并提交到Executor
            for (int i = 0; i < cpuCoreNumber; i++{
                
    int increment = numbers.length / cpuCoreNumber + 1;
                
    int start = increment * i;
                
    int end = increment * i + increment;
                
    if (end > numbers.length) {
                    end 
    = numbers.length;
                }

                SumCalculator subCalc 
    = new SumCalculator(numbers, start, end);
                
    if (!exec.isShutdown()) {
                    
    /**
                     * 生產者 submit() 執行的 任務。使用者 take() 已完成的任務, 
                     * 并按照完成這些任務的順序處理它們的結果  。
                     * 也就是調用CompletionService 的 take 方法是,
                     * 會返回按完成順序放回任務的結果, CompletionService 內部維護了一個 阻塞隊列 BlockingQueue ,
                     * 如果沒有任務完成, take() 方法也會阻塞。
                     
    */

                    completionService.submit(subCalc);
                }

            }

            
    return getResult();
        }


        
    /**
         * 迭代每個只任務,獲得部分和,相加返回
         
    */

        
    public Long getResult() {
            Long result 
    = 0L;
            
    for (int i = 0; i < cpuCoreNumber; i++{
                
    try {
                    Long subSum 
    = completionService.take().get();
                    result 
    += subSum;
                }
     catch (InterruptedException e) {
                    e.printStackTrace();
                }
     catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }

            
    return result;
        }


        
    public void close() {
            exec.shutdown();
        }


    }


    一組的計算和:

    import java.util.concurrent.Callable;

    /**
     * 一組計算和值 
     * 
     * 
    @author lsb
     *
     
    */

    public class SumCalculator implements Callable<Long> {

        
    private int[] numbers;

        
    private int start;

        
    private int end;

        
    public SumCalculator(final int[] numbers, int start, int end) {
            
    this.numbers = numbers;
            
    this.start = start;
            
    this.end = end;
        }


        @Override
        
    public Long call() throws Exception {
            Long sum 
    = 0L;
            
    for (int i = start; i < end; i++{
                sum 
    += numbers[i];
            }

            
    return sum;
        }

    }






     


         一、Executors創建線程池

            Executors類,提供了一系列工廠方法用于創先線程池,返回的線程池都實現了ExecutorService接口。

             // 創建固定數目線程的線程池
            public static ExecutorService newFixedThreadPool(int nThreads)

           
            // 創建一個可緩存的線程池,調用execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

            public static ExecutorService newCachedThreadPool() 
        
            // 創建一個單線程化的Executor

            public static ExecutorService newSingleThreadExecutor()

           
           //  創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類

            public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)



       二、ExecutorService 與生命周期

          ExecutorService 擴展了Executor 并添加了一些生命周期管理的方法。一個Executor 的生命周期有三種狀態,運行 ,關閉 ,終止 。Executor 創建時處于運行狀態。當調用ExecutorService.shutdown() 后,處于關閉狀態,isShutdown() 方法返 回true 。這時,不應該再想Executor 中添加任務,所有已添加的任務執行完畢后,Executor 處于終止狀態,isTerminated() 返 回true 。

       如果Executor 處于關閉狀態,往Executor 提交任務會拋出unchecked exception RejectedExecutionException 。

     

     

        三、使用Callable ,Future 返回結果

         Future<V> 代表一個異步執行的操作,通過get() 方法可以獲得操作的結果,如果異步操作還沒有完成,則,get() 會使當前 線程阻塞。FutureTask<V> 實現了Future<V> 和Runable<V> 。Callable 代表一個 有返回值得操作。

    ExecutoreService 提供了submit() 方法,傳遞一個Callable ,或Runnable ,返回Future 。如果Executor 后臺線程池還沒有完成Callable 的計算,這調用返回Future 對象的get() 方法,會阻塞直到計算完成。





        

    posted on 2011-10-12 17:25 胡鵬 閱讀(2442) 評論(0)  編輯  收藏 所屬分類: java基礎

    導航

    <2011年10月>
    2526272829301
    2345678
    9101112131415
    16171819202122
    23242526272829
    303112345

    統計

    常用鏈接

    留言簿(3)

    隨筆分類

    隨筆檔案

    agile

    搜索

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 免费欧洲美女牲交视频| 女人18毛片a级毛片免费视频| 国产伦精品一区二区三区免费迷 | 久久亚洲精品无码网站| 免费无码黄十八禁网站在线观看| 久久国产亚洲高清观看| 亚洲综合激情视频| 久久免费国产视频| 亚洲国产精品久久久久婷婷软件 | 免费国产va在线观看| 亚洲AV伊人久久青青草原 | 亚洲av不卡一区二区三区| 一级毛片在线观看免费| 亚洲精品国产电影午夜| 无码专区永久免费AV网站| 亚洲色偷偷偷综合网| 国产男女猛烈无遮挡免费视频| 无码一区二区三区亚洲人妻| 亚洲高清无码在线观看| a级片免费观看视频| 亚洲综合区图片小说区| 免费无码黄网站在线观看| 亚洲成AV人片高潮喷水| 99精品热线在线观看免费视频| 亚洲国产美女精品久久久久| 无码视频免费一区二三区| 美女扒开屁股让男人桶爽免费| 国产91精品一区二区麻豆亚洲| a毛片视频免费观看影院| 亚洲的天堂av无码| 毛片a级毛片免费播放下载| 黄网站色视频免费看无下截| 亚洲AV综合色区无码另类小说 | 亚洲人成影院在线观看| 国产麻豆一精品一AV一免费| 亚洲成aⅴ人在线观看| 日本人护士免费xxxx视频| 亚洲精品日韩专区silk| 日韩免费视频观看| 久久国产精品一区免费下载| 最新国产精品亚洲|