[本文地址:
http://www.tkk7.com/Files/xylz/Inside.Java.Concurrency_32.ThreadPool.part5_ScheduledExecutorService.pdf]
周期性任務(wù)調(diào)度前世
在JDK 5.0之前,java.util.Timer/TimerTask是唯一的內(nèi)置任務(wù)調(diào)度方法,而且在很長(zhǎng)一段時(shí)間里很熱衷于使用這種方式進(jìn)行周期性任務(wù)調(diào)度。
首先研究下Timer/TimerTask的特性(至于javax.swing.Timer就不再研究了)。
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}
public class Timer {
private TaskQueue queue = new TaskQueue();
/**
* The timer thread.
*/
private TimerThread thread = new TimerThread(queue);
java.util.TimerThread.mainLoop()
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
。。。。。。
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
上面三段代碼反映了Timer/TimerTask的以下特性:
- Timer對(duì)任務(wù)的調(diào)度是基于絕對(duì)時(shí)間的。
- 所有的TimerTask只有一個(gè)線程TimerThread來執(zhí)行,因此同一時(shí)刻只有一個(gè)TimerTask在執(zhí)行。
- 任何一個(gè)TimerTask的執(zhí)行異常都會(huì)導(dǎo)致Timer終止所有任務(wù)。
- 由于基于絕對(duì)時(shí)間并且是單線程執(zhí)行,因此在多個(gè)任務(wù)調(diào)度時(shí),長(zhǎng)時(shí)間執(zhí)行的任務(wù)被執(zhí)行后有可能導(dǎo)致短時(shí)間任務(wù)快速在短時(shí)間內(nèi)被執(zhí)行多次或者干脆丟棄多個(gè)任務(wù)。
由于Timer/TimerTask有這些特點(diǎn)(缺陷),因此這就導(dǎo)致了需要一個(gè)更加完善的任務(wù)調(diào)度框架來解決這些問題。
周期性任務(wù)調(diào)度今生
java.util.concurrent.ScheduledExecutorService的出現(xiàn)正好彌補(bǔ)了Timer/TimerTask的缺陷。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
首先ScheduledExecutorService基于ExecutorService,是一個(gè)完整的線程池調(diào)度。另外在提供線程池的基礎(chǔ)上增加了四個(gè)調(diào)度任務(wù)的API。
- schedule(Runnable command,long delay, TimeUnit unit):在指定的延遲時(shí)間一次性啟動(dòng)任務(wù)(Runnable),沒有返回值。
- schedule(Callable<V> callable, long delay, TimeUnit unit):在指定的延遲時(shí)間一次性啟動(dòng)任務(wù)(Callable),攜帶一個(gè)結(jié)果。
- scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期;也就是將在 initialDelay 后開始執(zhí)行,然后在 initialDelay+period 后執(zhí)行,接著在 initialDelay + 2 * period 后執(zhí)行,依此類推。如果任務(wù)的任何一個(gè)執(zhí)行遇到異常,則后續(xù)執(zhí)行都會(huì)被取消。否則,只能通過執(zhí)行程序的取消或終止方法來終止該任務(wù)。如果此任務(wù)的任何一個(gè)執(zhí)行要花費(fèi)比其周期更長(zhǎng)的時(shí)間,則將推遲后續(xù)執(zhí)行,但不會(huì)同時(shí)執(zhí)行。
- scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲。如果任務(wù)的任一執(zhí)行遇到異常,就會(huì)取消后續(xù)執(zhí)行。否則,只能通過執(zhí)行程序的取消或終止方法來終止該任務(wù)。
上述API解決了以下幾個(gè)問題:
- ScheduledExecutorService任務(wù)調(diào)度是基于相對(duì)時(shí)間,不管是一次性任務(wù)還是周期性任務(wù)都是相對(duì)于任務(wù)加入線程池(任務(wù)隊(duì)列)的時(shí)間偏移。
- 基于線程池的ScheduledExecutorService允許多個(gè)線程同時(shí)執(zhí)行任務(wù),這在添加多種不同調(diào)度類型的任務(wù)是非常有用的。
- 同樣基于線程池的ScheduledExecutorService在其中一個(gè)任務(wù)發(fā)生異常時(shí)會(huì)退出執(zhí)行線程,但同時(shí)會(huì)有新的線程補(bǔ)充進(jìn)來進(jìn)行執(zhí)行。
- ScheduledExecutorService可以做到不丟失任務(wù)。
下面的例子演示了一個(gè)任務(wù)周期性調(diào)度的例子。
package xylz.study.concurrency.executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) throws Exception{
ScheduledExecutorService execService = Executors.newScheduledThreadPool(3);
execService.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+" -> "+System.currentTimeMillis());
try {
Thread.sleep(2000L);
} catch (Exception e) {
e.printStackTrace();
}
}
}, 1, 1, TimeUnit.SECONDS);
//
execService.scheduleWithFixedDelay(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+" -> "+System.currentTimeMillis());
try {
Thread.sleep(2000L);
} catch (Exception e) {
e.printStackTrace();
}
}
}, 1, 1, TimeUnit.SECONDS);
Thread.sleep(5000L);
execService.shutdown();
}
}
一次可能的輸出如下:
pool-1-thread-1 -> 1294672392657
pool-1-thread-2 -> 1294672392659
pool-1-thread-1 -> 1294672394657
pool-1-thread-2 -> 1294672395659
pool-1-thread-1 -> 1294672396657
在這個(gè)例子中啟動(dòng)了默認(rèn)三個(gè)線程的線程池,調(diào)度兩個(gè)周期性任務(wù)。第一個(gè)任務(wù)是每隔1秒執(zhí)行一次,第二個(gè)任務(wù)是以固定1秒的間隔執(zhí)行,這兩個(gè)任務(wù)每次執(zhí)行的時(shí)間都是2秒。上面的輸出有以下幾點(diǎn)結(jié)論:
- 任務(wù)是在多線程中執(zhí)行的,同一個(gè)任務(wù)應(yīng)該是在同一個(gè)線程中執(zhí)行。
- scheduleAtFixedRate是每次相隔相同的時(shí)間執(zhí)行任務(wù),如果任務(wù)的執(zhí)行時(shí)間比周期還長(zhǎng),那么下一個(gè)任務(wù)將立即執(zhí)行。例如這里每次執(zhí)行時(shí)間2秒,而周期時(shí)間只有1秒,那么每次任務(wù)開始執(zhí)行的間隔時(shí)間就是2秒。
- scheduleWithFixedDelay描述是下一個(gè)任務(wù)的開始時(shí)間與上一個(gè)任務(wù)的結(jié)束時(shí)間間隔相同。流入這里每次執(zhí)行時(shí)間2秒,而周期時(shí)間是1秒,那么兩個(gè)任務(wù)開始執(zhí)行的間隔時(shí)間就是2+1=3秒。
事實(shí)上ScheduledExecutorService的實(shí)現(xiàn)類ScheduledThreadPoolExecutor是繼承線程池類ThreadPoolExecutor的,因此它擁有線程池的全部特性。但是同時(shí)又是一種特殊的線程池,這個(gè)線程池的線程數(shù)大小不限,任務(wù)隊(duì)列是基于DelayQueue的無(wú)限任務(wù)隊(duì)列。具體的結(jié)構(gòu)和算法在以后的章節(jié)中分析。
由于ScheduledExecutorService擁有Timer/TimerTask的全部特性,并且使用更簡(jiǎn)單,支持并發(fā),而且更安全,因此沒有理由繼續(xù)使用Timer/TimerTask,完全可以全部替換。需要說明的一點(diǎn)事構(gòu)造ScheduledExecutorService線程池的核心線程池大小要根據(jù)任務(wù)數(shù)來定,否則可能導(dǎo)致資源的浪費(fèi)。
[本文地址:
http://www.tkk7.com/Files/xylz/Inside.Java.Concurrency_32.ThreadPool.part5_ScheduledExecutorService.pdf]
©2009-2014 IMXYLZ
|求賢若渴