Posted on 2009-06-02 21:35
啥都寫點(diǎn) 閱讀(179)
評論(0) 編輯 收藏 所屬分類:
J2SE
關(guān)鍵技術(shù):
- 線程組ThreadGroup可以管理多個(gè)線程,所以讓線程池繼承ThreadGroup。
- 無條件關(guān)閉線程池時(shí),通過ThreadGroup的interrupt方法中斷池中所有線程。
- 有條件關(guān)閉線程池時(shí),通過ThreadGroup獲得池中所有活動線程的引用,依次調(diào)用Thread的join方法等待活動線程執(zhí)行完畢。當(dāng)所有線程運(yùn)行結(jié)束時(shí),線程池才算被關(guān)閉。
- 將任務(wù)放在LinkedList中,由于LinkedList不支持同步,所以在添加任務(wù)和獲取任務(wù)的方法聲明中必須使用synchronized關(guān)鍵字。
package book.thread.pool;
/**
* 定義任務(wù)的接口類
*/
public interface Task {
/**
* 執(zhí)行任務(wù)
* @throws Exception 執(zhí)行過程中可能出現(xiàn)的異常
*/
public void perform() throws Exception;
}
package book.thread.pool;
/**
* 一個(gè)簡單的任務(wù)
*/
public class MyTask implements Task{
/** 任務(wù)的ID */
private int taskID = 0;
public MyTask(int id){
this.taskID = id;
}
/**
* 實(shí)現(xiàn)Task接口的perform方法。
*/
public void perform() throws Exception{
System.out.println("MyTask " + taskID + ": start");
// 休眠一秒
try {
Thread.sleep(1000);
}
catch (InterruptedException ex) {
}
System.out.println("MyTask " + taskID + ": end");
}
}
package book.thread.pool;
import java.util.LinkedList;
/**
* 線程池,繼承ThreadGroup。
* ThreadGroup用于處理一組線程的類,它是一種樹狀結(jié)構(gòu),他的下層節(jié)點(diǎn)還可以是ThreadGroup對象
*/
public class MyThreadPool extends ThreadGroup {
/** 標(biāo)志線程池是否開啟 */
private boolean isAlive;
/** 線程池中的任務(wù)隊(duì)列 */
private LinkedList taskQueue;
/** 線程池中的線程ID */
private int threadID;
/** 線程池ID */
private static int threadPoolID;
/**
* 創(chuàng)建新的線程池,numThreads是池中的線程數(shù)
*/
public MyThreadPool(int numThreads) {
super("ThreadPool-" + (threadPoolID++));
//設(shè)置為該線程池是的daemon屬性為true,
//表示當(dāng)該線程池中所有線程都被銷毀時(shí),該線程池會自動被銷毀
super.setDaemon(true);
this.isAlive = true;
//新建一個(gè)任務(wù)隊(duì)列
this.taskQueue = new LinkedList();
//啟動numThreads個(gè)工作線程
for (int i = 0; i < numThreads; i++) {
new PooledThread().start();
}
}
/**
* 添加新任務(wù)
*/
public synchronized void performTask(Task task) {
if (!this.isAlive) {
// 線程被關(guān)則拋出IllegalStateException異常
throw new IllegalStateException();
}
if (task != null) {
//將任務(wù)放到任務(wù)隊(duì)列的尾部
this.taskQueue.add(task);
//通知工作線程取任務(wù)
notify();
}
}
/**
* 獲取任務(wù)
*/
protected synchronized Task getTask() throws InterruptedException {
//如果任務(wù)列表為空,而且線程池沒有被關(guān)閉,則繼續(xù)等待任務(wù)
while (this.taskQueue.size() == 0) {
if (!this.isAlive) {
return null;
}
wait();
}
//取任務(wù)列表的第一個(gè)任務(wù)
return (Task) this.taskQueue.removeFirst();
}
/**
* 關(guān)閉線程池,所有線程停止,不再執(zhí)行任務(wù)
*/
public synchronized void close() {
if (isAlive) {
this.isAlive = false;
//清除任務(wù)
this.taskQueue.clear();
//中止線程池中所有線程
this.interrupt();
}
}
/**
* 關(guān)閉線程池,并等待線程池中的所有任務(wù)被運(yùn)行完。
* 但是不能接受新的任務(wù)。
*/
public void join() {
//通知其他等待線程“該線程池已關(guān)閉”的消息
synchronized (this) {
isAlive = false;
notifyAll();
}
// 等待所有線程完成
// 首先建立一個(gè)新的線程數(shù)組。activeCount方法獲取線程池中活動線程的估計(jì)數(shù)
Thread[] threads = new Thread[this.activeCount()];
// 將線程池中的活動線程拷貝到新創(chuàng)建的線程數(shù)組threads中。
int count = this.enumerate(threads);
for (int i = 0; i < count; i++) {
try {
// 等待線程運(yùn)行結(jié)束
threads[i].join();
} catch (InterruptedException ex) {
}
}
}
/**
* 內(nèi)部類,用于執(zhí)行任務(wù)的工作線程
*/
private class PooledThread extends Thread {
//構(gòu)造方法
public PooledThread() {
//第一個(gè)參數(shù)為該線程所在的線程組對象,即當(dāng)前線程池對象
//第二個(gè)參數(shù)為線程名字
super(MyThreadPool.this, "PooledThread-" + (threadID++));
}
public void run() {
//如果該線程沒有被中止
while (!isInterrupted()) {
// 獲取任務(wù)
Task task = null;
try {
task = getTask();
} catch (InterruptedException ex) {
}
//只要線程池的任務(wù)列表不為空,getTask方法總能得到一個(gè)任務(wù)。
//若getTask()返回null,則表示線程池中已經(jīng)沒有任務(wù),而且線程池已被關(guān)閉。
if (task == null) {
return;
}
// 運(yùn)行任務(wù),吸收異常
try {
task.perform();
} catch (Throwable t) {
//當(dāng)線程組中的線程有未被捕獲的異常發(fā)生時(shí),JVM就會去調(diào)用uncaughtException方法。
uncaughtException(this, t);
}
}
}
}
}
package book.thread.pool;
/**
* 測試線程池
*/
public class PoolTest {
public static void main(String[] args) {
//線程池中的線程數(shù)
int numThreads = 3;
// 生成線程池
MyThreadPool threadPool = new MyThreadPool(numThreads);
// 任務(wù)數(shù)
int numTasks = 10;
// 運(yùn)行任務(wù)
for (int i=0; i<numTasks; i++) {
threadPool.performTask(new MyTask(i));
}
// 關(guān)閉線程池并等待所有任務(wù)完成
threadPool.join();
}
}
--
學(xué)海無涯