java.util.concurrent的作者是Doug Lea : 世界上對Java影響力最大的個人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."這個鼻梁掛著眼鏡,留著德王威廉二世的胡子,臉上永遠掛著謙遜靦腆笑容,服務于紐約州立大學Oswego分校計算器科學系的老大爺。",他可是并發編程的大師級人物哦!
Since jdk1.5,在java.util.concurrent包下的線程池模型是基于queue的,threadpool只有一個,而queue卻有多個LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可參見java.util.concurrent.Executors.注意:我下面的問題是針對LinkedBlockingQueue的,參考的src為jdk1.6.
Threadpool通過以下的3個屬性來標志池中的線程數:
corePoolSize(類似minimumPoolSize),poolSize(當前池中的線程數),maximumPoolSize(最大的線程數).
這3個屬性表達的意思是每次新創建或結束一個線程poolSize++/--,在最忙的情況下threadpool創建的線程數不能超過maximumPoolSize,
當空閑的情況下poolSize應該降到corePoolSize,當然threadpool如果從創建時它就從來沒有處理過一次請求的話,那么poolSize當然為0.
通過以上2段的說明下面我要引出我所要講的問題:
我們來看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
它表達的主體意思是:如果當前的poolSize<corePoolSize,那么就增加線程直到poolSize==corePoolSize.
如果poolSize已經到達corePoolSize,那么就把command(task) put to workQueue,如果workQueue為LinkedBlockingQueue的話,
那么只有當workQueue offer commands達到workQueue.capacity后,threadpool才會繼續增加線程直到maximumPoolSize.
1.*****如果LinkedBlockingQueue.capacity被設置為Integer.MAX_VALUE,那么池中的線程幾乎不可能到達maximumPoolSize.*****
所以你如果使用了Executors.newFixedThreadPool的話,那么maximumPoolSize和corePoolSize是一樣的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果這樣new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的話,
上述的使用都將導致maximumPoolSize是無效的,也就是說線程池中的線程數不會超出corePoolSize.
這個也讓那些tomcat6的開發人員可能也郁悶了,他們不得不改寫LinkedBlockingQueue,以tomcat-6.0.20-src為例:
org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method:
public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
parent = tp;
this.endpoint = ep;
}
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
//this is an approximation, so it could use some tuning
if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
org.apache.tomcat.util.net.NioEndpoint.start()-->
TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor, this);
2.*****如果把LinkedBlockingQueue.capacity設置為一個適當的值遠小于Integer.MAX_VALUE,那么只有put到queue的任務數到達LinkedBlockingQueue的capacity后,才會繼續增加池中的線程,使得poolSize超出corePoolSize但不超過maximumPoolSize,這個時候來增加線程數是不是有點晚了呢??????*****.
這樣一來reject(command)也可能隨之而來了,LinkedBlockingQueue.capacity設置為何值又是個頭疼的問題.
所以ThreadPoolExecutor+LinkedBlockingQueue表達的意思是首先會增加線程數到corePoolSize,但只有queue的任務容量到達最大capacity后,才會繼續在corePoolSize的基數上增加線程來處理任務,直到maximumPoolSize.
但為什么我們不能這樣呢:將LinkedBlockingQueue.capacity設置為Integer.MAX_VALUE,讓task盡可能的得到處理,同時在忙的情況下,增加池中的線程充到maximumPoolSize來盡快的處理這些任務.即便是把LinkedBlockingQueue.capacity設置為一個適當的值<<<遠小于Integer.MAX_VALUE,也不一定非得在任務數到達LinkedBlockingQueue的capacity之后才去增加線程使poolSize超出corePoolSize趨向maximumPoolSize.
所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue組合的缺點也就出來了:如果我們想讓線程池盡可能多的處理大量的任務的話,我們會把LinkedBlockingQueue.capacity設置為Integer.MAX_VALUE,但是如果這樣的話池中的線程數量就不能充到最大maximumPoolSize,也就不能充分發揮線程池的最大處理能力.如果我們把LinkedBlockingQueue.capacity設置為一個較小的值,那么線程池中的線程數量會充到最大maximumPoolSize,但是如果池中的線程都忙的話,線程池又會reject請求的任務,因為隊列已滿.
如果我們把LinkedBlockingQueue.capacity設置為一個較大的值但不是Integer.MAX_VALUE,那么等到線程池的線程數量準備開始超出corePoolSize時,也就是任務隊列滿了,這個時候才去增加線程的話,請求任務的執行會有一定的延時,也就是沒有得到及時的處理.
其實也就是說ThreadPoolExecutor缺乏靈敏的線程調度機制,沒有根據當前任務的執行情況,是忙,還是閑,以及隊列中的待處理任務的數量級進行動態的調配線程數,使得它的處理效率受到影響.
那么什么是忙的情況的判斷呢?
busy[1]:如果poolSize==corePoolSize,并且現在忙著執行任務的線程數(currentBusyWorkers)等于poolSize.[而不管現在put到queue的任務數是否到達queue.capacity]
busy[2].1:如果poolSize==corePoolSize,并且put到queue的任務數已到達queue.capacity.[queue.capacity是針對有任務隊列極限限制的情況]
busy[2].2:線程池的基本目標是盡可能的快速處理大量的請求任務,那么就不一定非得在put到queue的任務數到達queue的capacity之后才判斷為忙的情況,只要queue中現有的任務數(task_counter)與poolSize或者maximumPoolSize存在一定的比例時就可以判斷為忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,這樣queue.capacity這個限制可以取消了.
在上述busy[1],busy[2]這2種情況下都應增加線程數,直至maximumPoolSize,使請求的任務得到最快的處理.
前面講的是忙的時候ThreadPoolExecutor+LinkedBlockingQueue在處理上的瑕疵,那么空閑的時候又要如何呢?
如果corePoolSize<poolSize<maximumPoolSize,那么線程等待keepAliveTime之后應該降為corePoolSize,嘿嘿,這個就真的成了bug了哦,一個很難發現的bug,poolSize是被降下來了,可是很可能降過了頭<corePoolSize,甚至降為0也有可能.
ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
/*queue is empty,這里timeout之后,return null,之后call workerCanExit() return true.*/
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}//end getTask.
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}//end workerCanExit.
在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值沒有變化,
ThreadPoolExecutor.Worker.run()將結束-->ThreadPoolExecutor.Worker.workerDone-->這個時候才將poolSize--,可惜晚了,在多線程的環境下,poolSize的值將變為小于corePoolSize,而不是等于corePoolSize!!!!!!
例如:如果poolSize(6)大于corePoolSize(5),那么同時timeout的就不一定是一條線程,而是多條,它們都有可能退出run,使得poolSize--減過了corePoolSize.
提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
它表達的意思是在空閑的時候讓線程等待keepAliveTime,timeout后使得poolSize能夠降為0.[其實我是希望它降為minimumPoolSize,特別是在服務器的環境下,我們需要線程池保持一定數量的線程來及時處理"零零碎碎的,斷斷續續的,一股一波的,不是很有壓力的"請求],當然你可以把corePoolSize當作minimumPoolSize,而不調用該方法.
針對上述java util concurrent線程池的瑕疵,我對java util concurrent線程池模型進行了修正,特別是在"忙"(busy[1],busy[2])的情況下的任務處理進行了優化,使得線程池盡可能快的處理盡可能多的任務.
下面提供了高效的線程池的源碼購買:
java版threadpool:
http://item.taobao.com/auction/item_detail-0db2-9078a9045826f273dcea80aa490f1a8b.jhtml
c [not c++]版threadpool in windows NT:
http://item.taobao.com/auction/item_detail-0db2-28e37cb6776a1bc526ef5a27aa411e71.jhtml