[本文地址:http://www.tkk7.com/Files/xylz/Inside.Java.Concurrency_34.ThreadPool.part7_ThreadPoolExecutor_execute.pdf]
線程池任務(wù)執(zhí)行流程
我們從一個(gè)API開始接觸Executor是如何處理任務(wù)隊(duì)列的。
java.util.concurrent.Executor.execute(Runnable)
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
線程池中所有任務(wù)執(zhí)行都依賴于此接口。這段話有以下幾個(gè)意思:
- 任務(wù)可能在將來某個(gè)時(shí)刻被執(zhí)行,有可能不是立即執(zhí)行。為什么這里有兩個(gè)“可能”?繼續(xù)往下面看。
- 任務(wù)可能在一個(gè)新的線程中執(zhí)行或者線程池中存在的一個(gè)線程中執(zhí)行。
- 任務(wù)無法被提交執(zhí)行有以下兩個(gè)原因:線程池已經(jīng)關(guān)閉或者線程池已經(jīng)達(dá)到了容量限制。
- 所有失敗的任務(wù)都將被“當(dāng)前”的任務(wù)拒絕策略RejectedExecutionHandler 處理。
回答上面兩個(gè)“可能“。任務(wù)可能被執(zhí)行,那不可能的情況就是上面說的情況3;可能不是立即執(zhí)行,是因?yàn)槿蝿?wù)可能還在隊(duì)列中排隊(duì),因此還在等待分配線程執(zhí)行。了解完了字面上的問題,我們再來看具體的實(shí)現(xiàn)。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
這一段代碼看起來挺簡單的,其實(shí)這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個(gè)執(zhí)行流程是這樣的:
- 如果任務(wù)command為空,則拋出空指針異常,返回。否則進(jìn)行2。
- 如果當(dāng)前線程池大小 大于或等于 核心線程池大小,進(jìn)行4。否則進(jìn)行3。
- 創(chuàng)建一個(gè)新工作隊(duì)列(線程,參考上一節(jié)),成功直接返回,失敗進(jìn)行4。
- 如果線程池正在運(yùn)行并且任務(wù)加入線程池隊(duì)列成功,進(jìn)行5,否則進(jìn)行7。
- 如果線程池已經(jīng)關(guān)閉或者線程池大小為0,進(jìn)行6,否則直接返回。
- 如果線程池已經(jīng)關(guān)閉則執(zhí)行拒絕策略返回,否則啟動(dòng)一個(gè)新線程來進(jìn)行執(zhí)行任務(wù),返回。
- 如果線程池大小 不大于 最大線程池?cái)?shù)量,則啟動(dòng)新線程來進(jìn)行執(zhí)行,否則進(jìn)行拒絕策略,結(jié)束。
文字描述步驟不夠簡單?下面圖形詳細(xì)表述了此過程。

老實(shí)說這個(gè)圖比上面步驟更難以理解,那么從何入手呢。
流程的入口很簡單,我們就是要執(zhí)行一個(gè)任務(wù)(Runnable command),那么它的結(jié)束點(diǎn)在哪或者有哪幾個(gè)?
根據(jù)左邊這個(gè)圖我們知道可能有以下幾種出口:
(1)圖中的P1、P7,我們根據(jù)這條路徑可以看到,僅僅是將任務(wù)加入任務(wù)隊(duì)列(offer(command))了;
(2)圖中的P3,這條路徑不將任務(wù)加入任務(wù)隊(duì)列,但是啟動(dòng)了一個(gè)新工作線程(Worker)進(jìn)行掃尾操作,用戶處理為空的任務(wù)隊(duì)列;
(3)圖中的P4,這條路徑?jīng)]有將任務(wù)加入任務(wù)隊(duì)列,但是啟動(dòng)了一個(gè)新工作線程(Worker),并且工作現(xiàn)場的第一個(gè)任務(wù)就是當(dāng)前任務(wù);
(4)圖中的P5、P6,這條路徑?jīng)]有將任務(wù)加入任務(wù)隊(duì)列,也沒有啟動(dòng)工作線程,僅僅是拋給了任務(wù)拒絕策略。P2是任務(wù)加入了任務(wù)隊(duì)列卻因?yàn)榫€程池已經(jīng)關(guān)閉于是又從任務(wù)隊(duì)列中刪除,并且拋給了拒絕策略。
如果上面的解釋還不清楚,可以去研究下面兩段代碼:
java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)
那么什么時(shí)候一個(gè)任務(wù)被立即執(zhí)行呢?
在線程池運(yùn)行狀態(tài)下,如果線程池大小 小于 核心線程池大小或者線程池已滿(任務(wù)隊(duì)列已滿)并且線程池大小 小于 最大線程池大小(此時(shí)線程池大小 大于 核心線程池大小的),用程序描述為:
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())
上面的條件就是一個(gè)任務(wù)能夠被立即執(zhí)行的條件。
有了execute的基礎(chǔ),我們看看ExecutorService中的幾個(gè)submit方法的實(shí)現(xiàn)。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
很簡單,不是么?對于一個(gè)線程池來說復(fù)雜的地方也就在execute方法的執(zhí)行流程。在下一節(jié)中我們來討論下如何獲取任務(wù)的執(zhí)行結(jié)果,也就是Future類的使用和原理。
©2009-2014 IMXYLZ
|求賢若渴