CompletionService
接口的實(shí)例可以充當(dāng)生產(chǎn)者和消費(fèi)者的中間處理引擎,從而達(dá)到將提交任務(wù)和處理結(jié)果的代碼進(jìn)行解耦的目的。生產(chǎn)者調(diào)用 submit
方法提交任務(wù),而消費(fèi)者調(diào)用 poll
(非阻塞)或 take
(阻塞)方法獲取下一個結(jié)果:這一特征看起來和阻塞隊(duì)列(BlockingQueue
)類似,兩者的區(qū)別在于 CompletionService
要負(fù)責(zé)任務(wù)的處理,而阻塞隊(duì)列則不會。
在 JDK 中,該接口只有一個實(shí)現(xiàn)類 ExecutorCompletionService
,該類使用創(chuàng)建時提供的 Executor
對象(通常是線程池)來執(zhí)行任務(wù),然后將結(jié)果放入一個阻塞隊(duì)列中:果然本就是一家親啊!ExecutorCompletionService
將線程池和阻塞隊(duì)列糅合在一起,僅僅通過三個方法,就實(shí)現(xiàn)了任務(wù)的異步處理,可謂并發(fā)編程初學(xué)者的神兵利器!
接下來看一個例子。樓主有一大堆 *.java 文件,需要計算它們的代碼總行數(shù)。利用 ExecutorCompletionService
可以寫出很簡單的多線程處理代碼:
public int countLines(List<Path> javaFiles) throws Exception {
// 根據(jù)處理器數(shù)量創(chuàng)建線程池。雖然多線程并不保證能夠提升性能,但適量地
// 開線程一般可以從系統(tǒng)騙取更多資源。
ExecutorService es = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2);
// 使用 ExecutorCompletionService 內(nèi)建的阻塞隊(duì)列。
CompletionService cs = new ExecutorCompletionService(es);
// 按文件向 CompletionService 提交任務(wù)。
for (final Path javaFile : javaFiles) {
cs.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// 略去計算單個文件行數(shù)的代碼。
return countLines(javaFile);
}
});
}
try {
int loc = 0;
int size = javaFiles.size();
for (int i = 0; i < size; i++) {
// take 方法等待下一個結(jié)果并返回 Future 對象。不直接返回計算結(jié)果是為了
// 捕獲計算時可能拋出的異常。
// poll 不等待,有結(jié)果就返回一個 Future 對象,否則返回 null。
loc += cs.take().get();
}
return loc;
} finally {
// 關(guān)閉線程池。也可以將線程池提升為字段以便重用。
// 如果任務(wù)線程(Callable#call)能響應(yīng)中斷,用 shutdownNow 更好。
es.shutdown();
}
}
最后,CompletionService
也不是到處都能用,它不適合處理任務(wù)數(shù)量有限但個數(shù)不可知的場景。例如,要統(tǒng)計某個文件夾中的文件個數(shù),在遍歷子文件夾的時候也會“遞歸地”提交新的任務(wù),但最后到底提交了多少,以及在什么時候提交完了所有任務(wù),都是未知數(shù),無論 CompletionService
還是線程池都無法進(jìn)行判斷。這種情況只能直接用線程池來處理。