<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Concurrent學(xué)習(xí)—Executor框架


        java.util.concurrent包分成了三個部分,分別是:
                               java.util.concurrent
     
                               java.util.concurrent.atomic
                               java.util.concurrent.lock
        
    內(nèi)容涵蓋了并發(fā)集合類、線程池機(jī)制、同步互斥機(jī)制、線程安全的變量更新工具類、鎖等等常用工具。 

     

    并發(fā)編程的一種編程方式是把任務(wù)拆分為一些列的小任務(wù),即Runnable,然后在提交給一個Executor執(zhí)行,Executor.execute(Runnalbe) Executor在執(zhí)行時使用內(nèi)部的線程池完成操作。

        
    例子:
                  
    有一個很大的整數(shù)數(shù)組,需要求這個數(shù)組中所有整數(shù)的和,來計算結(jié)果。  
                
    JDK 7 中的 Fork/Join模式可以解決該問題,http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/)

        
    分析:
                 
    采用多線程(任務(wù)),并且還要分割List,每一小塊的數(shù)組采用一個線程(任務(wù))進(jìn)行計算其和,那么我們必須要等待所有的線程(任務(wù))完成之后才能得到正確的結(jié)果.

    步驟:

    • 分割數(shù)組,根據(jù)采用的線程(任務(wù))數(shù)平均分配,即array.length/threadCounts。
    • 定義一個記錄“很大數(shù)組”中所有整數(shù)和的變量sum,采用一個線程(任務(wù))處理一個分割后的子數(shù)組,計算子數(shù)組中所有整數(shù)和(subSum),然后把和(subSum)累加到sum上。
    • 等待所有線程(任務(wù))完成后輸出總和(sum)的值。

     

    /**
     * 并行計算數(shù)組的和, 測試類
     * 
     * 
    @author lsb
     *
     
    */

    public class MainTest {
        
    public static void main(String[] args) {
            
    int[] numbers = new int[] 123456781011 };
            CalcArrayTotal calc 
    = new CalcArrayTotal();
            Long sum 
    = calc.sum(numbers);
            System.out.println(sum);
            calc.close();
        }

    }


    主要實(shí)現(xiàn)類:

    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import com.li.senbiao.Thread.concurrent.test1.SumCalculator;

    public class CalcArrayTotal {

        
    private ExecutorService exec;

        
    private CompletionService<Long> completionService;

        
    private int cpuCoreNumber;

        
    public CalcArrayTotal() {
            cpuCoreNumber 
    = Runtime.getRuntime().availableProcessors();
            exec 
    = Executors.newFixedThreadPool(cpuCoreNumber);
            completionService 
    = new ExecutorCompletionService<Long>(exec);
        }


        
    public Long sum(final int[] numbers) {
            
    // 根據(jù)CPU核心個數(shù)拆分任務(wù),創(chuàng)建FutureTask并提交到Executor
            for (int i = 0; i < cpuCoreNumber; i++{
                
    int increment = numbers.length / cpuCoreNumber + 1;
                
    int start = increment * i;
                
    int end = increment * i + increment;
                
    if (end > numbers.length) {
                    end 
    = numbers.length;
                }

                SumCalculator subCalc 
    = new SumCalculator(numbers, start, end);
                
    if (!exec.isShutdown()) {
                    
    /**
                     * 生產(chǎn)者 submit() 執(zhí)行的 任務(wù)。使用者 take() 已完成的任務(wù), 
                     * 并按照完成這些任務(wù)的順序處理它們的結(jié)果  。
                     * 也就是調(diào)用CompletionService 的 take 方法是,
                     * 會返回按完成順序放回任務(wù)的結(jié)果, CompletionService 內(nèi)部維護(hù)了一個 阻塞隊(duì)列 BlockingQueue ,
                     * 如果沒有任務(wù)完成, take() 方法也會阻塞。
                     
    */

                    completionService.submit(subCalc);
                }

            }

            
    return getResult();
        }


        
    /**
         * 迭代每個只任務(wù),獲得部分和,相加返回
         
    */

        
    public Long getResult() {
            Long result 
    = 0L;
            
    for (int i = 0; i < cpuCoreNumber; i++{
                
    try {
                    Long subSum 
    = completionService.take().get();
                    result 
    += subSum;
                }
     catch (InterruptedException e) {
                    e.printStackTrace();
                }
     catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }

            
    return result;
        }


        
    public void close() {
            exec.shutdown();
        }


    }


    一組的計算和:

    import java.util.concurrent.Callable;

    /**
     * 一組計算和值 
     * 
     * 
    @author lsb
     *
     
    */

    public class SumCalculator implements Callable<Long> {

        
    private int[] numbers;

        
    private int start;

        
    private int end;

        
    public SumCalculator(final int[] numbers, int start, int end) {
            
    this.numbers = numbers;
            
    this.start = start;
            
    this.end = end;
        }


        @Override
        
    public Long call() throws Exception {
            Long sum 
    = 0L;
            
    for (int i = start; i < end; i++{
                sum 
    += numbers[i];
            }

            
    return sum;
        }

    }






     


         一、Executors創(chuàng)建線程池

            Executors類,提供了一系列工廠方法用于創(chuàng)先線程池,返回的線程池都實(shí)現(xiàn)了ExecutorService接口。

             // 創(chuàng)建固定數(shù)目線程的線程池
            public static ExecutorService newFixedThreadPool(int nThreads)

           
            // 創(chuàng)建一個可緩存的線程池,調(diào)用execute 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

            public static ExecutorService newCachedThreadPool() 
        
            // 創(chuàng)建一個單線程化的Executor

            public static ExecutorService newSingleThreadExecutor()

           
           //  創(chuàng)建一個支持定時及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類

            public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)



       二、ExecutorService 與生命周期

          ExecutorService 擴(kuò)展了Executor 并添加了一些生命周期管理的方法。一個Executor 的生命周期有三種狀態(tài),運(yùn)行 ,關(guān)閉 ,終止 。Executor 創(chuàng)建時處于運(yùn)行狀態(tài)。當(dāng)調(diào)用ExecutorService.shutdown() 后,處于關(guān)閉狀態(tài),isShutdown() 方法返 回true 。這時,不應(yīng)該再想Executor 中添加任務(wù),所有已添加的任務(wù)執(zhí)行完畢后,Executor 處于終止?fàn)顟B(tài),isTerminated() 返 回true 。

       如果Executor 處于關(guān)閉狀態(tài),往Executor 提交任務(wù)會拋出unchecked exception RejectedExecutionException 。

     

     

        三、使用Callable ,F(xiàn)uture 返回結(jié)果

         Future<V> 代表一個異步執(zhí)行的操作,通過get() 方法可以獲得操作的結(jié)果,如果異步操作還沒有完成,則,get() 會使當(dāng)前 線程阻塞。FutureTask<V> 實(shí)現(xiàn)了Future<V> 和Runable<V> 。Callable 代表一個 有返回值得操作。

    ExecutoreService 提供了submit() 方法,傳遞一個Callable ,或Runnable ,返回Future 。如果Executor 后臺線程池還沒有完成Callable 的計算,這調(diào)用返回Future 對象的get() 方法,會阻塞直到計算完成。





        

    posted on 2011-10-12 17:25 胡鵬 閱讀(2442) 評論(0)  編輯  收藏 所屬分類: java基礎(chǔ)

    導(dǎo)航

    <2011年10月>
    2526272829301
    2345678
    9101112131415
    16171819202122
    23242526272829
    303112345

    統(tǒng)計

    常用鏈接

    留言簿(3)

    隨筆分類

    隨筆檔案

    agile

    搜索

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 亚洲成人免费网址| 18成禁人视频免费网站| 亚洲国产av美女网站| 亚洲伊人久久综合中文成人网| 国产成人免费在线| a毛片免费在线观看| 四虎国产精品永免费| 亚洲一卡一卡二新区无人区| 99久久精品国产亚洲| 亚洲乱码日产一区三区| 亚洲国产成人久久笫一页| 成人免费毛片观看| 2021国产精品成人免费视频| 日韩a级无码免费视频| 一级做a免费视频观看网站| 亚洲aⅴ无码专区在线观看春色| 久久精品蜜芽亚洲国产AV| 日韩亚洲一区二区三区| 国产亚洲老熟女视频| 日韩亚洲精品福利| 国产精品另类激情久久久免费| 免费无码又爽又刺激聊天APP| 91久久成人免费| 最近中文字幕大全中文字幕免费| 一个人免费视频观看在线www| 国产vA免费精品高清在线观看| 日韩免费在线中文字幕| 99亚洲男女激情在线观看| 亚洲成av人无码亚洲成av人| 亚洲日韩精品无码专区加勒比☆ | 18禁亚洲深夜福利人口| 亚洲日韩久久综合中文字幕| 一本色道久久88亚洲精品综合 | 又黄又爽又成人免费视频| 最近免费中文字幕mv电影| 亚洲视频免费在线播放| 亚洲免费一级视频| 手机在线看永久av片免费| 手机在线毛片免费播放| 成人毛片18女人毛片免费96| 处破痛哭A√18成年片免费|