1.ThreadPoolExecutor#execute(Runnable command)
public void execute(Runnable command)
{
// 如果任務為空,則直接拋出空指針異常
if (command == null)
throw new NullPointerException();
// 1.如果線程池線程數目UnderCorePoolSize且RUNNING則直接添加worker線程并啟動
// 2.如果超過了corePoolSize或者addIfUnderCorePoolSize失敗則

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
{
// 如果線程池是RUNNING狀態且可將任務command加入workQueue(即不違反容量限制)

if (runState == RUNNING && workQueue.offer(command))
{
// 因為是并發執行.如果此時發現線程池狀態不再是RUNNING(可能執行了類似shutdown的操作)或者線程池中已無Worker線程
if (runState != RUNNING || poolSize == 0)
//1.如果線程池狀態不再是RUNNING且此時command依然在隊列中,即還未執行則直接拒絕.
// 2.否則如果線程池狀態 < STOP,即可能是SHUTDOWN狀態且任務隊列中依然有任務且工作線程的數目不足corePoolSize,則額外添加一個Worker線程并啟動
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
// 如果在UnderMaximumPoolSize下增加worker線程失敗則執行拒絕策略,直接調用RejectedExecutionHandler#rejectedExecution
reject(command); // is shutdown or saturated
}
} 2. addIfUnderCorePoolSize(Runnable firstTask)
// poolSize < corePoolSize && RUNNING的情況下添加worker線程并啟動worker線程

private boolean addIfUnderCorePoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
// 鎖
mainLock.lock();

try
{
// 初始poolSize為0,runState為0,即RUNNING
// RUNNING = 0 / SHUTDOWN = 1 / STOP = 2
// TERMINATED = 3
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);

} finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
} 3.addThread(Runnable firstTask)

private Thread addThread(Runnable firstTask)
{
// 初始化Worker,傳入firstTask
Worker w = new Worker(firstTask);
// 利用線程工廠新建線程,注意這里傳入的參數是w
Thread t = threadFactory.newThread(w);

if (t != null)
{
w.thread = t;
// 添加至workers
workers.add(w);
// ++poolSize
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
} 4.Worker
private final class Worker implements Runnable
{

/** *//**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();


/** *//**
* Initial task to run before entering run loop. Possibly null.
*/
private Runnable firstTask;


/** *//**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
volatile long completedTasks;


/** *//**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;


Worker(Runnable firstTask)
{
this.firstTask = firstTask;
}


boolean isActive()
{
return runLock.isLocked();
}


/** *//**
* 中斷線程如果沒有正在運行任務(可能在等待任務)
* {@link ThreadPoolExecutor#interruptIdleWorkers}
* {@link ThreadPoolExecutor#getTask}
* {@link ThreadPoolExecutor#shutdown}
*/

void interruptIfIdle()
{
final ReentrantLock runLock = this.runLock;

if (runLock.tryLock())
{

try
{
// 注意只有該方法是被其他線程調用才會執行interrupt.
// 1.個人認為如果是當前自身線程執行到這里的時候,說明getTask返回了null.線程就會結束了.
// 2.Worker線程在自身任務的執行中調用此方法時沒有作用的.即恰恰說明了運行時不被中斷.(因為不太可能存在這樣的類似業務,內部線程自己在運行任務的時候中斷自己.沒有任何作用.你懂的.這壓根就是錯誤的做法)
// 3.還有一個很重要的原因是:這里加了運行鎖.即如果此時有任務正在運行則獨占runLock,則其他線程必須等待任務完畢釋放鎖才可以進行interrupt.
if (thread != Thread.currentThread())
thread.interrupt();

} finally
{
runLock.unlock();
}
}
}


/** *//**
* Interrupts thread even if running a task.
*/

void interruptNow()
{
// 直接進行中斷,無論是內部線程還是其他線程
// 無論是否正在運行任務
// 沒有獲得鎖
// 此時如果線程正在等待任務或者任務執行過程中阻塞都可以被中斷
// 個人認為該方法也肯定是由外部線程進行調用的,而非內部的線程,你懂的.用了也沒有作用.
thread.interrupt();
}


/** *//**
* 運行任務在beforeExecute/afterExecute之間
*/

private void runTask(Runnable task)
{
final ReentrantLock runLock = this.runLock;
runLock.lock();

try
{

/**//*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
// 這段代碼乍看起來可能有些奇怪.個人認為是因為多線程的原因,如線程池調用了shutdownNow方法.
// 1.如果線程池是RUNNING/SHUTDOWN且之前被中斷過,則清除中斷狀態(interrupted) 2.再次檢查如果執行了shutdownNow的話,則會直接interrupt thread.而此時的中斷狀態可能被清除了.->需要需要再次調用interrupt重置中斷狀態.(還需要仔細考證)
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();

boolean ran = false;
// 任務執行前的一些業務,空實現,子類可覆蓋
// 任務完成或者任務執行出現異常則可通過afterExecute(空實現)追蹤
beforeExecute(thread, task);

try
{
task.run();
ran = true;
afterExecute(task, null);
// 任務計數
++completedTasks;

} catch (RuntimeException ex)
{
if (!ran)
afterExecute(task, ex);
throw ex;
}

} finally
{
runLock.unlock();
}
}


/** *//**
* Work線程主任務循環
*/

public void run()
{

try
{
Runnable task = firstTask;
firstTask = null;
// 1.如果第一個任務不為null則一定會執行第一個任務
// 2.如果getTask為null則線程結束.

while (task != null || (task = getTask()) != null)
{
runTask(task);
task = null;
}

} finally
{
// 跳出while,線程即結束
// 1.completedTaskCount 計數
// 2.workers.remove(w) 從workers移除
// 3.--poolSize,如果poolSize為0則tryTerminate
workerDone(this);
}
}
} 5.Runnable getTask()
Runnable getTask()
{

for (;;)
{

try
{
int state = runState;
// 線程池運行狀態為STOP或者TERMINATED,直接返回null,則Worker線程跳出while,終止
if (state > SHUTDOWN)
return null;
Runnable r;
// 如果線程池運行狀態恰好是SHUTDOWN,則繼續從隊列獲取任務(隊列為空則返回null),也在該狀態下如果線程池不為空則一直獲取任務
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
// RUNNING狀態下,poolSize超出了corePoolSize 或者allowCoreThreadTimeOut(允許核心線程超時) {@link ThreadPoolExecutor#allowCoreThreadTimeOut(boolean value)}
// 在keepAliveTime時間內等待可用的元素,等待時可被中斷.如果超時則返回null.
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
// Running狀態下,poolSize未超出corPoolSize且不允許核心線程超時,則在元素變得可用之前一直等待,可被中斷
r = workQueue.take();
if (r != null)
return r;
// 如果此時返回的任務為null且worker線程可退出(該方法其實是重復校驗,因為是并發執行.所以可能任務隊列已經有了任務等條件出現)

if (workerCanExit())
{
// 如果此時線程池狀態不是RUNNING
if (runState >= SHUTDOWN) // Wake up others
// 喚醒可能阻塞的任務,{@link Worker#interruptIfIdle}
interruptIdleWorkers();
// 返回null,結束任務
return null;
}
// Else retry
// 繼續for-循環

} catch (InterruptedException ie)
{
// On interruption, re-check runState
}
}
} 6.workerCanExit()
// 判斷worker線程是否可退出

private boolean workerCanExit()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;

try
{
// 運行狀態為STOP或者TERMINATED
// 或者任務隊列為空
// 或者池中至少有一個線程且允許核心線程超時
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));

} finally
{
mainLock.unlock();
}
return canExit;
} 7.tryTerminate()
// 嘗試終止

private void tryTerminate()
{
// 如果當前池中沒有線程

if (poolSize == 0)
{
int state = runState;
// 如果當前運行狀態時是Running/SHUTDOWN且任務隊列不為空

if (state < STOP && !workQueue.isEmpty())
{
// 重新設置為運行狀態
state = RUNNING; // disable termination check below
// 添加一個firstTask為null的worker并啟動.因為隊列不為空則可以getTask
Thread t = addThread(null);
if (t != null)
t.start();
}
// 如果運行狀態為STOP或者SHUTDOWN則置狀態為TERMINATED并喚醒等待終止的線程 {@link #awaitTermination(long timeout, TimeUnit unit)}

if (state == STOP || state == SHUTDOWN)
{
runState = TERMINATED;
termination.signalAll();
terminated();// 此方法暫時未實現
}
}
} 8.awaitTermination(long timeout, TimeUnit unit)
// 等待線程池終止 {@link #tryTerminate()}
public boolean awaitTermination(long timeout, TimeUnit unit)

throws InterruptedException
{
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

try
{
// 注這是一個無限循環,直到線程池終止或者超時

for (;;)
{
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
// {@link Condition#long awaitNanos(long nanosTimeout)}
// 此方法返回的是一個估算(nanosTimeout - awaitTime),如果小于等于0則表示沒有剩余時間,即超時.不過如果返回值是一個正值的話且線程池未終止的話->所以由將返回值繼續傳入了參數->確保肯定會發生超時而導致nanos<=0而跳出循環
nanos = termination.awaitNanos(nanos);
}

} finally
{
mainLock.unlock();
}
} 9.shutdown()
public void shutdown()
{
// 檢查是否有shutdown的權限
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

try
{

if (security != null)
{ // Check if caller can modify our threads
// 檢查所有的worker線程是否有修改線程的權限
for (Worker w : workers)
security.checkAccess(w.thread);
}

int state = runState;
// 設置線程池當前狀態是RUNNING,則設置為SHUTDOWN狀態
if (state < SHUTDOWN)
runState = SHUTDOWN;


try
{
// 嘗試打斷空閑的worker線程

for (Worker w : workers)
{
w.interruptIfIdle();
}

} catch (SecurityException se)
{ // Try to back out
// 如果出現異常,則還原狀態
runState = state;
// tryTerminate() here would be a no-op 這個注釋的意思是出現了這個異常,tryTerminate是不起作用的.因為tryTerminate的條件是poolSize == 0.但是異常說明interruptIfIdle失敗則不可能poolSize == 0.
// 繼續向上拋出異常,這個異常是SecurityException
throw se;
}
// 嘗試終止(隊列為空的時候直接終止)
tryTerminate(); // Terminate now if pool and queue empty

} finally
{
mainLock.unlock();
}
} 10.shutdownNow()
public List<Runnable> shutdownNow()
{
// 檢查shutdown權限以及修改工作線程的權限
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

try
{

if (security != null)
{ // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}

int state = runState;
// 置狀態為STOP(可能未RUNNING或者SHUTDOWN)
if (state < STOP)
runState = STOP;


try
{

for (Worker w : workers)
{
// 直接中斷
w.interruptNow();
}

} catch (SecurityException se)
{ // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}

// 將隊列中的所有可用元素添加list中并返回
List<Runnable> tasks = drainQueue();
// 嘗試終止
tryTerminate(); // Terminate now if pool and queue empty
return tasks;

} finally
{
mainLock.unlock();
}
}11.總結:
1.corePoolSize/maximumPoolSize/keepAliveTime/workQueue/threadFactory/rejectedExecutionHandler 為線程池6大參數.
2.corePoolSize:當線程池poolSize少于corePoolSize時,則會新增worker線程.
3.線程池數目超過corePoolSize則向workQueue offer 任務.如果offer失敗則在maximumPoolSize下新增worker線程;如果超過了maximumPoolSize,則執行拒絕策略.
4.keepAliveTime:poolSize超過了corePoolSize時(或者允許core thread timeout),此參數指明workQueue pool的超時時間,超時則返回null,即表示當前線程空閑.(workerCanExit中有判斷workQueue為空的條件)然后worker線程結束(被回收).
5.Worker有兩個方法interruptIfIdle,這個方法會先獲得運行鎖,即如果當前有任務運行(占有鎖),則其他線程無法中斷.只有執行完workQueue的任務才會結束并釋放鎖.(shutdown);而另一個方法interruptNow則是不管任何條件,直接interrupt.
posted on 2013-12-26 11:43
landon 閱讀(1665)
評論(2) 編輯 收藏 所屬分類:
Sources