[本文地址:http://www.tkk7.com/Files/xylz/Inside.Java.Concurrency_35.ThreadPool.part8_Future.ScheduledThreadPoolExecutor.pdf]
線程池任務(wù)執(zhí)行結(jié)果
這一節(jié)來探討下線程池中任務(wù)執(zhí)行的結(jié)果以及如何阻塞線程、取消任務(wù)等等。
1 package info.imxylz.study.concurrency.future;
2
3 public class SleepForResultDemo implements Runnable {
4
5 static boolean result = false;
6
7 static void sleepWhile(long ms) {
8 try {
9 Thread.sleep(ms);
10 } catch (Exception e) {}
11 }
12
13 @Override
14 public void run() {
15 //do work
16 System.out.println("Hello, sleep a while.");
17 sleepWhile(2000L);
18 result = true;
19 }
20
21 public static void main(String[] args) {
22 SleepForResultDemo demo = new SleepForResultDemo();
23 Thread t = new Thread(demo);
24 t.start();
25 sleepWhile(3000L);
26 System.out.println(result);
27 }
28
29 }
30
在沒有線程池的時代里面,使用Thread.sleep(long)去獲取線程執(zhí)行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務(wù)可能的執(zhí)行時間,并且還會阻塞主線程,不管任務(wù)有沒有執(zhí)行完畢。
1 package info.imxylz.study.concurrency.future;
2
3 public class SleepLoopForResultDemo implements Runnable {
4
5 boolean result = false;
6
7 volatile boolean finished = false;
8
9 static void sleepWhile(long ms) {
10 try {
11 Thread.sleep(ms);
12 } catch (Exception e) {}
13 }
14
15 @Override
16 public void run() {
17 //do work
18 try {
19 System.out.println("Hello, sleep a while.");
20 sleepWhile(2000L);
21 result = true;
22 } finally {
23 finished = true;
24 }
25 }
26
27 public static void main(String[] args) {
28 SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
29 Thread t = new Thread(demo);
30 t.start();
31 while (!demo.finished) {
32 sleepWhile(10L);
33 }
34 System.out.println(demo.result);
35 }
36
37 }
38
使用volatile與while死循環(huán)的好處就是等待的時間可以稍微小一點,但是依然有CPU負(fù)載高并且阻塞主線程的問題。最簡單的降低CPU負(fù)載的方式就是使用Thread.join().
SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
Thread t = new Thread(demo);
t.start();
t.join();
System.out.println(demo.result);
顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質(zhì)上講就是利用while和wait來實現(xiàn)的。
上面的方式中都存在一個問題,那就是會阻塞主線程并且任務(wù)不能被取消。為了解決這個問題,線程池中提供了一個Future接口。

在Future接口中提供了5個方法。
- V get() throws InterruptedException, ExecutionException: 等待計算完成,然后獲取其結(jié)果。
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待為使計算完成所給定的時間之后,獲取其結(jié)果(如果結(jié)果可用)。
- boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務(wù)的執(zhí)行。
- boolean isCancelled():如果在任務(wù)正常完成前將其取消,則返回 true。
- boolean isDone():如果任務(wù)已完成,則返回 true。 可能由于正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true。
API看起來容易,來研究下異常吧。get()請求獲取一個結(jié)果會阻塞當(dāng)前進程,并且可能拋出以下三種異常:
- InterruptedException:執(zhí)行任務(wù)的線程被中斷則會拋出此異常,此時不能知道任務(wù)是否執(zhí)行完畢,因此其結(jié)果是無用的,必須處理此異常。
- ExecutionException:任務(wù)執(zhí)行過程中(Runnable#run())方法可能拋出RuntimeException,如果提交的是一個java.util.concurrent.Callable<V>接口任務(wù),那么java.util.concurrent.Callable.call()方法有可能拋出任意異常。
- CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務(wù)被取消了但是依然去獲取結(jié)果。
對于get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,由于有超時機制,因此還可能得到一個TimeoutException。
boolean cancel(boolean mayInterruptIfRunning)方法比較復(fù)雜,各種情況比較多:
- 如果任務(wù)已經(jīng)執(zhí)行完畢,那么返回false。
- 如果任務(wù)已經(jīng)取消,那么返回false。
- 循環(huán)直到設(shè)置任務(wù)為取消狀態(tài),對于未啟動的任務(wù)將永遠(yuǎn)不再執(zhí)行,對于正在運行的任務(wù),將根據(jù)mayInterruptIfRunning是否中斷其運行,如果不中斷那么任務(wù)將繼續(xù)運行直到結(jié)束。
- 此方法返回后任務(wù)要么處于運行結(jié)束狀態(tài),要么處于取消狀態(tài)。isDone()將永遠(yuǎn)返回true,如果cancel()方法返回true,isCancelled()始終返回true。
來看看Future接口的實現(xiàn)類java.util.concurrent.FutureTask<V>具體是如何操作的。
在FutureTask中使用了一個AQS數(shù)據(jù)結(jié)構(gòu)來完成各種狀態(tài)以及加鎖、阻塞的實現(xiàn)。
在此AQS類java.util.concurrent.FutureTask.Sync中一個任務(wù)用4中狀態(tài):

初始情況下任務(wù)狀態(tài)state=0,任務(wù)執(zhí)行(innerRun)后狀態(tài)變?yōu)檫\行狀態(tài)RUNNING(state=1),執(zhí)行完畢后變成運行結(jié)束狀態(tài)RAN(state=2)。任務(wù)在初始狀態(tài)或者執(zhí)行狀態(tài)被取消后就變?yōu)闋顟B(tài)CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態(tài)變更的。
void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
執(zhí)行一個任務(wù)有四步:設(shè)置運行狀態(tài)、設(shè)置當(dāng)前線程(AQS需要)、執(zhí)行任務(wù)(Runnable#run或者Callable#call)、設(shè)置執(zhí)行結(jié)果。這里也可以看到,一個任務(wù)只能執(zhí)行一次,因為執(zhí)行完畢后它的狀態(tài)不在為初始值0,要么為CANCELLED,要么為RAN。
取消一個任務(wù)(cancel)又是怎樣進行的呢?對比下前面取消任務(wù)的描述是不是很簡單,這里無非利用AQS的狀態(tài)來改變?nèi)蝿?wù)的執(zhí)行狀態(tài),最終達(dá)到放棄未啟動或者正在執(zhí)行的任務(wù)的目的。
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
return true;
}
到目前為止我們依然沒有說明到底是如何阻塞獲取一個結(jié)果的。下面四段代碼描述了這個過程。
1 V innerGet() throws InterruptedException, ExecutionException {
2 acquireSharedInterruptibly(0);
3 if (getState() == CANCELLED)
4 throw new CancellationException();
5 if (exception != null)
6 throw new ExecutionException(exception);
7 return result;
8 }
9 //AQS#acquireSharedInterruptibly
10 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
11 if (Thread.interrupted())
12 throw new InterruptedException();
13 if (tryAcquireShared(arg) < 0)
14 doAcquireSharedInterruptibly(arg); //park current Thread for result
15 }
16 protected int tryAcquireShared(int ignore) {
17 return innerIsDone()? 1 : -1;
18 }
19
20 boolean innerIsDone() {
21 return ranOrCancelled(getState()) && runner == null;
22 }
當(dāng)調(diào)用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這里獲取一個共享變量的狀態(tài)是任務(wù)是否結(jié)束(innerIsDone()),也就是任務(wù)是否執(zhí)行完畢或者被取消。如果不滿足條件,那么在AQS中就會doAcquireSharedInterruptibly(arg)掛起當(dāng)前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。
至于將Runnable接口轉(zhuǎn)換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現(xiàn)。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
延遲、周期性任務(wù)調(diào)度的實現(xiàn)
java.util.concurrent.ScheduledThreadPoolExecutor是默認(rèn)的延遲、周期性任務(wù)調(diào)度的實現(xiàn)。
有了整個線程池的實現(xiàn),再回頭來看延遲、周期性任務(wù)調(diào)度的實現(xiàn)應(yīng)該就很簡單了,因為所謂的延遲、周期性任務(wù)調(diào)度,無非添加一系列有序的任務(wù)隊列,然后按照執(zhí)行順序的先后來處理整個任務(wù)隊列。如果是周期性任務(wù),那么在執(zhí)行完畢的時候加入下一個時間點的任務(wù)即可。
由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區(qū)別在于任務(wù)是有序(按照執(zhí)行時間順序)的,并且需要到達(dá)時間點(臨界點)才能執(zhí)行,并不是任務(wù)隊列中有任務(wù)就需要執(zhí)行的。也就是說唯一不同的就是任務(wù)隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務(wù)隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現(xiàn)。
DelayQueue是基于有序隊列PriorityQueue實現(xiàn)的。PriorityQueue 也叫優(yōu)先級隊列,按照自然順序?qū)υ剡M行排序,類似于TreeMap/Collections.sort一樣。
同樣是有序隊列,DelayQueue和PriorityQueue區(qū)別在什么地方?
由于DelayQueue在獲取元素時需要檢測元素是否“可用”,也就是任務(wù)是否達(dá)到“臨界點”(指定時間點),因此加入元素和移除元素會有一些額外的操作。
典型的,移除元素需要檢測元素是否達(dá)到“臨界點”,增加元素的時候如果有一個元素比“頭元素”更早達(dá)到臨界點,那么就需要通知任務(wù)隊列。因此這需要一個條件變量final Condition available 。
移除元素(出隊列)的過程是這樣的:
- 總是檢測隊列的頭元素(順序最小元素,也是最先達(dá)到臨界點的元素)
- 檢測頭元素與當(dāng)前時間的差,如果大于0,表示還未到底臨界點,因此等待響應(yīng)時間(使用條件變量available)
- 如果小于或者等于0,說明已經(jīng)到底臨界點或者已經(jīng)過了臨界點,那么就移除頭元素,并且喚醒其它等待任務(wù)隊列的線程。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
同樣加入元素也會有相應(yīng)的條件變量操作。當(dāng)前僅當(dāng)隊列為空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒“等待線程”去檢測元素。因為頭元素都沒有喚醒那么比頭元素更延遲的元素就更加不會喚醒。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}
有了任務(wù)隊列后再來看Future在ScheduledThreadPoolExecutor中是如何操作的。
java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是繼承java.util.concurrent.FutureTask<V>的,區(qū)別在于執(zhí)行任務(wù)是否是周期性的。
private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0)
time += p;
else
time = now() - p;
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
// This might have been the final executed delayed
// task. Wake up threads to check.
else if (down)
interruptIdleWorkers();
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
if (isPeriodic())
runPeriodic();
else
ScheduledFutureTask.super.run();
}
}
如果不是周期性任務(wù)調(diào)度,那么就和java.util.concurrent.FutureTask.Sync的調(diào)度方式是一樣的。如果是周期性任務(wù)(isPeriodic())那么就稍微有所不同的。

先從功能/結(jié)構(gòu)上分析下。第一種情況假設(shè)提交的任務(wù)每次執(zhí)行花費10s,間隔(delay/period)為20s,對于scheduleAtFixedRate而言,每次執(zhí)行開始時間20s,對于scheduleWithFixedDelay來說每次執(zhí)行開始時間30s。第二種情況假設(shè)提交的任務(wù)每次執(zhí)行時間花費20s,間隔(delay/period)為10s,對于scheduleAtFixedRate而言,每次執(zhí)行開始時間10s,對于scheduleWithFixedDelay來說每次執(zhí)行開始時間30s。(具體分析可以參考這里)
也就是說scheduleWithFixedDelay的執(zhí)行開始時間為(delay+cost),而對于scheduleAtFixedRate來說執(zhí)行開始時間為max(period,cost)。
回頭再來看上面源碼runPeriodic()就很容易了。但特別要提醒的,如果任務(wù)的任何一個執(zhí)行遇到異常,則后續(xù)執(zhí)行都會被取消,這從runPeriodic()就能看出。要強調(diào)的第二點就是同一個周期性任務(wù)不會被同時執(zhí)行。就比如說盡管上面第二種情況的scheduleAtFixedRate任務(wù)每隔10s執(zhí)行到達(dá)一個時間點,但是由于每次執(zhí)行時間花費為20s,因此每次執(zhí)行間隔為20s,只不過執(zhí)行的任務(wù)次數(shù)會多一點。但從本質(zhì)上講就是每隔20s執(zhí)行一次,如果任務(wù)隊列不取消的話。
為什么不會同時執(zhí)行?
這是因為ScheduledFutureTask執(zhí)行的時候會將任務(wù)從隊列中移除來,執(zhí)行完畢以后才會添加下一個同序列的任務(wù),因此任務(wù)隊列中其實最多只有同序列的任務(wù)的一份副本,所以永遠(yuǎn)不會同時執(zhí)行(盡管要執(zhí)行的時間在過去)。
ScheduledThreadPoolExecutor使用一個無界(容量無限,整數(shù)的最大值)的容器(DelayedWorkQueue隊列),根據(jù)ThreadPoolExecutor的原理,只要當(dāng)容器滿的時候才會啟動一個大于corePoolSize的線程數(shù)。因此實際上ScheduledThreadPoolExecutor是一個固定線程大小的線程池,固定大小為corePoolSize,構(gòu)造函數(shù)里面的Integer.MAX_VALUE其實是不生效的(盡管PriorityQueue使用數(shù)組實現(xiàn)有PriorityQueue大小限制,如果你的任務(wù)數(shù)超過了2147483647就會導(dǎo)致OutOfMemoryError,這個參考PriorityQueue的grow方法)。
再回頭看scheduleAtFixedRate等方法就容易多了。無非就是往任務(wù)隊列中添加一個未來某一時刻的ScheduledFutureTask任務(wù),如果是scheduleAtFixedRate那么period/delay就是正數(shù),如果是scheduleWithFixedDelay那么period/delay就是一個負(fù)數(shù),如果是0那么就是一次性任務(wù)。直接調(diào)用父類ThreadPoolExecutor的execute/submit等方法就相當(dāng)于period/delay是0,并且initialDelay也是0。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
if (initialDelay < 0) initialDelay = 0;
long triggerTime = now() + unit.toNanos(initialDelay);
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Object>(command,
null,
triggerTime,
unit.toNanos(period)));
delayedExecute(t);
return t;
}
另外需要補充說明的一點,前面說過java.util.concurrent.FutureTask.Sync任務(wù)只能執(zhí)行一次,那么在runPeriodic()里面怎么又將執(zhí)行過的任務(wù)加入隊列中呢?這是因為java.util.concurrent.FutureTask.Sync提供了一個innerRunAndReset()方法,此方法不僅執(zhí)行任務(wù)還將任務(wù)的狀態(tài)還原成0(初始狀態(tài))了,所以此任務(wù)就可以重復(fù)執(zhí)行。這就是為什么runPeriodic()里面調(diào)用runAndRest()的緣故。
boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}
后話
整個并發(fā)實踐原理和實現(xiàn)(源碼)上的東西都講完了,后面幾個小節(jié)是一些總結(jié)和掃尾的工作,包括超時機制、異常處理等一些細(xì)節(jié)問題。也就是說大部分只需要搬出一些理論和最佳實踐知識出來就好了,不會有大量費腦筋的算法分析和原理、思想探討之類的。后面的章節(jié)也會加快一些進度。
老實說從剛開始的好奇到中間的興奮,再到現(xiàn)在的徹悟,收獲還是很多,個人覺得這是最認(rèn)真、最努力也是自我最滿意的一次技術(shù)研究和探討,同時在這個過程中將很多技術(shù)細(xì)節(jié)都串聯(lián)起來了,慢慢就有了那種技術(shù)相通的感覺。原來有了理論以后再去實踐、再去分析問題、解決問題和那種純解決問題得到的經(jīng)驗完全不一樣。整個專輯下來不僅僅是并發(fā)包這一點點知識,設(shè)計到硬件、軟件、操作系統(tǒng)、網(wǎng)絡(luò)、安全、性能、算法、理論等等,總的來說這也算是一次比較成功的研究切入點,這比Guice那次探討要深入和持久的多。
--
©2009-2014 IMXYLZ
|求賢若渴