接系列3,在該系列中我們一起探討一下CachedThreadPool。
線程池是Conncurrent包提供給我們的一個(gè)重要的禮物。使得我們沒有必要維護(hù)自個(gè)實(shí)現(xiàn)的心里很沒底的線程池了。但如何充分利用好這些線程池來加快我們開發(fā)與測(cè)試效率呢?當(dāng)然是知己知彼。本系列就說說對(duì)CachedThreadPool使用的一下問題。
下面是對(duì)CachedThreadPool的一個(gè)測(cè)試,程序有問題嗎?
package net.blogjava.vincent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CachedThreadPoolIssue {
/**
* @param args
*/
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
for(int i = 1; i<8000; i++)
es.submit(new task());
}
}
class task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如果對(duì)JVM沒有特殊的設(shè)置,并在Window平臺(tái)上,那么就會(huì)有一下異常的發(fā)生:
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at java.util.concurrent.AbstractExecutorService.submit(Unknown Source)
at net.blogjava.vincent.CachedThreadPoolIssue.main(CachedThreadPoolIssue.java:19)
看看Doc對(duì)該線程池的介紹:
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.
有以下幾點(diǎn)需要注意:
1. 指出會(huì)重用先前的線程,不錯(cuò)。
2. 提高了短Task的吞吐量。
3. 線程如果60s沒有使用就會(huì)移除出Cache。
好像跟剛才的錯(cuò)誤沒有關(guān)系,其實(shí)就第一句話說了問題,它會(huì)按需要?jiǎng)?chuàng)建新的線程,上面的例子一下提交8000個(gè)Task,意味著該線程池就會(huì)創(chuàng)建8000線程,當(dāng)然,這遠(yuǎn)遠(yuǎn)高于JVM限制了。
注:在JDK1.5中,默認(rèn)每個(gè)線程使用1M內(nèi)存,8000M !!! 可能嗎!!
所以我感覺這應(yīng)該是我遇到的第一個(gè)Concurrent不足之處,既然這么設(shè)計(jì),那么就應(yīng)該在中Doc指出,應(yīng)該在使用避免大量Task提交到給CachedThreadPool.
可能讀者不相信,那么下面的例子說明了他創(chuàng)建的Thread。
在ThreadPoolExecutor提供的API中,看到它提供beforeExecute 和afterExecute兩個(gè)可以在子類中重載的方法,該方法在線程池中線程執(zhí)行Task之前與之后調(diào)用。所以我們?cè)赽eforeExexute中查看目前線程編號(hào)就可以確定目前的線程數(shù)目.
package net.blogjava.vincent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CachedThreadPoolIssue {
/**
* @param args
*/
public static void main(String[] args) {
ExecutorService es = new LogThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for(int i = 1; i<8000; i++)
es.submit(new task());
}
}
class task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(600000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class LogThreadPoolExecutor extends ThreadPoolExecutor{
public LogThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getName());
}
protected void afterExecute(Runnable r, Throwable t) {
}
}
測(cè)試結(jié)果如圖:

當(dāng)線程數(shù)達(dá)到5592是,只有在任務(wù)管理器Kill該進(jìn)程了。
如何解決該問題呢,其實(shí)在剛才實(shí)例化時(shí)就看出來了,只需將
new LogThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
Integer.MAX_VALUE 改為合適的大小。對(duì)于該參數(shù)的含義,涉及到線程池的實(shí)現(xiàn),將會(huì)在下個(gè)系列中指出。
當(dāng)然,其他的解決方案就是控制Task的提交速率,避免超過其最大限制。