java.util.concurrent包分成了三個部分,分別是:
java.util.concurrent、
java.util.concurrent.atomic
java.util.concurrent.lock
內(nèi)容涵蓋了并發(fā)集合類、線程池機(jī)制、同步互斥機(jī)制、線程安全的變量更新工具類、鎖等等常用工具。
并發(fā)編程的一種編程方式是把任務(wù)拆分為一些列的小任務(wù),即Runnable,然后在提交給一個Executor執(zhí)行,Executor.execute(Runnalbe) 。Executor在執(zhí)行時使用內(nèi)部的線程池完成操作。
例子:
有一個很大的整數(shù)數(shù)組,需要求這個數(shù)組中所有整數(shù)的和,來計算結(jié)果。
(JDK 7 中的 Fork/Join模式可以解決該問題,http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/)
分析:
采用多線程(任務(wù)),并且還要分割List,每一小塊的數(shù)組采用一個線程(任務(wù))進(jìn)行計算其和,那么我們必須要等待所有的線程(任務(wù))完成之后才能得到正確的結(jié)果.
步驟:
- 分割數(shù)組,根據(jù)采用的線程(任務(wù))數(shù)平均分配,即array.length/threadCounts。
- 定義一個記錄“很大數(shù)組”中所有整數(shù)和的變量sum,采用一個線程(任務(wù))處理一個分割后的子數(shù)組,計算子數(shù)組中所有整數(shù)和(subSum),然后把和(subSum)累加到sum上。
- 等待所有線程(任務(wù))完成后輸出總和(sum)的值。

/** *//**
* 并行計算數(shù)組的和, 測試類
*
* @author lsb
*
*/

public class MainTest {

public static void main(String[] args) {

int[] numbers = new int[]
{ 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };
CalcArrayTotal calc = new CalcArrayTotal();
Long sum = calc.sum(numbers);
System.out.println(sum);
calc.close();
}
}
主要實(shí)現(xiàn)類:
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.li.senbiao.Thread.concurrent.test1.SumCalculator;


public class CalcArrayTotal {

private ExecutorService exec;

private CompletionService<Long> completionService;

private int cpuCoreNumber;


public CalcArrayTotal() {
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
completionService = new ExecutorCompletionService<Long>(exec);
}


public Long sum(final int[] numbers) {
// 根據(jù)CPU核心個數(shù)拆分任務(wù),創(chuàng)建FutureTask并提交到Executor

for (int i = 0; i < cpuCoreNumber; i++) {
int increment = numbers.length / cpuCoreNumber + 1;
int start = increment * i;
int end = increment * i + increment;

if (end > numbers.length) {
end = numbers.length;
}
SumCalculator subCalc = new SumCalculator(numbers, start, end);

if (!exec.isShutdown()) {

/** *//**
* 生產(chǎn)者 submit() 執(zhí)行的 任務(wù)。使用者 take() 已完成的任務(wù),
* 并按照完成這些任務(wù)的順序處理它們的結(jié)果 。
* 也就是調(diào)用CompletionService 的 take 方法是,
* 會返回按完成順序放回任務(wù)的結(jié)果, CompletionService 內(nèi)部維護(hù)了一個 阻塞隊(duì)列 BlockingQueue ,
* 如果沒有任務(wù)完成, take() 方法也會阻塞。
*/
completionService.submit(subCalc);
}
}
return getResult();
}


/** *//**
* 迭代每個只任務(wù),獲得部分和,相加返回
*/

public Long getResult() {
Long result = 0L;

for (int i = 0; i < cpuCoreNumber; i++) {

try {
Long subSum = completionService.take().get();
result += subSum;

} catch (InterruptedException e) {
e.printStackTrace();

} catch (ExecutionException e) {
e.printStackTrace();
}
}
return result;
}


public void close() {
exec.shutdown();
}

}

一組的計算和:
import java.util.concurrent.Callable;


/** *//**
* 一組計算和值
*
* @author lsb
*
*/

public class SumCalculator implements Callable<Long> {

private int[] numbers;

private int start;

private int end;


public SumCalculator(final int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

@Override

public Long call() throws Exception {
Long sum = 0L;

for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
一、Executors創(chuàng)建線程池
Executors類,提供了一系列工廠方法用于創(chuàng)先線程池,返回的線程池都實(shí)現(xiàn)了ExecutorService接口。
// 創(chuàng)建固定數(shù)目線程的線程池
public static ExecutorService newFixedThreadPool(int nThreads)
// 創(chuàng)建一個可緩存的線程池,調(diào)用execute 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newCachedThreadPool()
// 創(chuàng)建一個單線程化的Executor
public static ExecutorService newSingleThreadExecutor()
// 創(chuàng)建一個支持定時及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
二、ExecutorService 與生命周期
ExecutorService 擴(kuò)展了Executor 并添加了一些生命周期管理的方法。一個Executor 的生命周期有三種狀態(tài),運(yùn)行 ,關(guān)閉 ,終止 。Executor 創(chuàng)建時處于運(yùn)行狀態(tài)。當(dāng)調(diào)用ExecutorService.shutdown() 后,處于關(guān)閉狀態(tài),isShutdown() 方法返 回true 。這時,不應(yīng)該再想Executor 中添加任務(wù),所有已添加的任務(wù)執(zhí)行完畢后,Executor 處于終止?fàn)顟B(tài),isTerminated() 返 回true 。
如果Executor 處于關(guān)閉狀態(tài),往Executor 提交任務(wù)會拋出unchecked exception RejectedExecutionException 。
三、使用Callable ,F(xiàn)uture 返回結(jié)果
Future<V> 代表一個異步執(zhí)行的操作,通過get() 方法可以獲得操作的結(jié)果,如果異步操作還沒有完成,則,get() 會使當(dāng)前 線程阻塞。FutureTask<V> 實(shí)現(xiàn)了Future<V> 和Runable<V> 。Callable 代表一個 有返回值得操作。
ExecutoreService 提供了submit() 方法,傳遞一個Callable ,或Runnable ,返回Future 。如果Executor 后臺線程池還沒有完成Callable 的計算,這調(diào)用返回Future 對象的get() 方法,會阻塞直到計算完成。