并發(fā) Collections 提供了線程安全、經(jīng)過良好調(diào)優(yōu)的數(shù)據(jù)結(jié)構(gòu),簡(jiǎn)化了并發(fā)編程。然而,在一些情形下,開發(fā)人員需要更進(jìn)一步,思考如何調(diào)節(jié)和/或限制線程執(zhí)行。由于 java.util.concurrent
的總體目標(biāo)是簡(jiǎn)化多線程編程,您可能希望該包包含同步實(shí)用程序,而它確實(shí)包含。
本文是 第 1 部分 的延續(xù),將介紹幾個(gè)比核心語言原語(監(jiān)視器)更高級(jí)的同步結(jié)構(gòu),但它們還未包含在 Collection 類中。一旦您了解了這些鎖和門的用途,使用它們將非常直觀。
1. Semaphore
在一些企業(yè)系統(tǒng)中,開發(fā)人員經(jīng)常需要限制未處理的特定資源請(qǐng)求(線程/操作)數(shù)量,事實(shí)上,限制有時(shí)候能夠提高系統(tǒng)的吞吐量,因?yàn)樗鼈儨p少了對(duì)特定資源的爭(zhēng)用。盡管完全可以手動(dòng)編寫限制代碼,但使用 Semaphore 類可以更輕松地完成此任務(wù),它將幫您執(zhí)行限制,如清單 1 所示:
清單 1. 使用 Semaphore 執(zhí)行限制
import java.util.*;import java.util.concurrent.*;
public class SemApp
{
public static void main(String[] args)
{
Runnable limitedCall = new Runnable() {
final Random rand = new Random();
final Semaphore available = new Semaphore(3);
int count = 0;
public void run()
{
int time = rand.nextInt(15);
int num = count++;
try
{
available.acquire();
System.out.println("Executing " +
"long-running action for " +
time + " seconds... #" + num);
Thread.sleep(time * 1000);
System.out.println("Done with #" +
num + "!");
available.release();
}
catch (InterruptedException intEx)
{
intEx.printStackTrace();
}
}
};
for (int i=0; i<10; i++)
new Thread(limitedCall).start();
}
}
|
即使本例中的 10 個(gè)線程都在運(yùn)行(您可以對(duì)運(yùn)行 SemApp
的 Java 進(jìn)程執(zhí)行 jstack
來驗(yàn)證),但只有 3 個(gè)線程是活躍的。在一個(gè)信號(hào)計(jì)數(shù)器釋放之前,其他 7 個(gè)線程都處于空閑狀態(tài)。(實(shí)際上,Semaphore
類支持一次獲取和釋放多個(gè) permit,但這不適用于本場(chǎng)景。)
回頁(yè)首
2. CountDownLatch
如果 Semaphore
是允許一次進(jìn)入一個(gè)(這可能會(huì)勾起一些流行夜總會(huì)的保安的記憶)線程的并發(fā)性類,那么 CountDownLatch
就像是賽馬場(chǎng)的起跑門柵。此類持有所有空閑線程,直到滿足特定條件,這時(shí)它將會(huì)一次釋放所有這些線程。
清單 2. CountDownLatch:讓我們?nèi)ベ愸R吧!
import java.util.*;
import java.util.concurrent.*;
class Race
{
private Random rand = new Random();
private int distance = rand.nextInt(250);
private CountDownLatch start;
private CountDownLatch finish;
private List<String> horses = new ArrayList<String>();
public Race(String... names)
{
this.horses.addAll(Arrays.asList(names));
}
public void run()
throws InterruptedException
{
System.out.println("And the horses are stepping up to the gate...");
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch finish = new CountDownLatch(horses.size());
final List<String> places =
Collections.synchronizedList(new ArrayList<String>());
for (final String h : horses)
{
new Thread(new Runnable() {
public void run() {
try
{
System.out.println(h +
" stepping up to the gate...");
start.await();
int traveled = 0;
while (traveled < distance)
{
// In a 0-2 second period of time....
Thread.sleep(rand.nextInt(3) * 1000);
// ... a horse travels 0-14 lengths
traveled += rand.nextInt(15);
System.out.println(h +
" advanced to " + traveled + "!");
}
finish.countDown();
System.out.println(h +
" crossed the finish!");
places.add(h);
}
catch (InterruptedException intEx)
{
System.out.println("ABORTING RACE!!!");
intEx.printStackTrace();
}
}
}).start();
}
System.out.println("And... they're off!");
start.countDown();
finish.await();
System.out.println("And we have our winners!");
System.out.println(places.get(0) + " took the gold...");
System.out.println(places.get(1) + " got the silver...");
System.out.println("and " + places.get(2) + " took home the bronze.");
}
}
public class CDLApp
{
public static void main(String[] args)
throws InterruptedException, java.io.IOException
{
System.out.println("Prepping...");
Race r = new Race(
"Beverly Takes a Bath",
"RockerHorse",
"Phineas",
"Ferb",
"Tin Cup",
"I'm Faster Than a Monkey",
"Glue Factory Reject"
);
System.out.println("It's a race of " + r.getDistance() + " lengths");
System.out.println("Press Enter to run the race....");
System.in.read();
r.run();
}
}
|
注意,在 清單 2 中,CountDownLatch
有兩個(gè)用途:首先,它同時(shí)釋放所有線程,模擬馬賽的起點(diǎn),但隨后會(huì)設(shè)置一個(gè)門閂模擬馬賽的終點(diǎn)。這樣,“主” 線程就可以輸出結(jié)果。 為了讓馬賽有更多的輸出注釋,可以在賽場(chǎng)的 “轉(zhuǎn)彎處” 和 “半程” 點(diǎn),比如賽馬跨過跑道的四分之一、二分之一和四分之三線時(shí),添加 CountDownLatch
。
回頁(yè)首
3. Executor
清單 1 和 清單 2 中的示例都存在一個(gè)重要的缺陷,它們要求您直接創(chuàng)建 Thread
對(duì)象。這可以解決一些問題,因?yàn)樵谝恍?JVM 中,創(chuàng)建 Thread
是一項(xiàng)重量型的操作,重用現(xiàn)有 Thread
比創(chuàng)建新線程要容易得多。而在另一些 JVM 中,情況正好相反:Thread
是輕量型的,可以在需要時(shí)很容易地新建一個(gè)線程。當(dāng)然,如果 Murphy 擁有自己的解決辦法(他通常都會(huì)擁有),那么您無論使用哪種方法對(duì)于您最終將部署的平臺(tái)都是不對(duì)的。
JSR-166 專家組(參見 參考資料)在一定程度上預(yù)測(cè)到了這一情形。Java 開發(fā)人員無需直接創(chuàng)建 Thread
,他們引入了 Executor
接口,這是對(duì)創(chuàng)建新線程的一種抽象。如清單 3 所示,Executor
使您不必親自對(duì) Thread
對(duì)象執(zhí)行 new
就能夠創(chuàng)建新線程:
清單 3. Executor
Executor exec = getAnExecutorFromSomeplace();
exec.execute(new Runnable() { ... });
|
使用 Executor
的主要缺陷與我們?cè)谒泄S中遇到的一樣:工廠必須來自某個(gè)位置。不幸的是,與 CLR 不同,JVM 沒有附帶一個(gè)標(biāo)準(zhǔn)的 VM 級(jí)線程池。
Executor
類實(shí)際上 充當(dāng)著一個(gè)提供 Executor
實(shí)現(xiàn)實(shí)例的共同位置,但它只有 new
方法(例如用于創(chuàng)建新線程池);它沒有預(yù)先創(chuàng)建實(shí)例。所以您可以自行決定是否希望在代碼中創(chuàng)建和使用 Executor
實(shí)例。(或者在某些情況下,您將能夠使用所選的容器/平臺(tái)提供的實(shí)例。)
ExecutorService 隨時(shí)可以使用
盡管不必?fù)?dān)心 Thread
來自何處,但 Executor
接口缺乏 Java 開發(fā)人員可能期望的某種功能,比如結(jié)束一個(gè)用于生成結(jié)果的線程并以非阻塞方式等待結(jié)果可用。(這是桌面應(yīng)用程序的一個(gè)常見需求,用戶將執(zhí)行需要訪問數(shù)據(jù)庫(kù)的 UI 操作,然后如果該操作花費(fèi)了很長(zhǎng)時(shí)間,可能希望在它完成之前取消它。)
對(duì)于此問題,JSR-166 專家創(chuàng)建了一個(gè)更加有用的抽象(ExecutorService
接口),它將線程啟動(dòng)工廠建模為一個(gè)可集中控制的服務(wù)。例如,無需每執(zhí)行一項(xiàng)任務(wù)就調(diào)用一次 execute()
,ExecutorService
可以接受一組任務(wù)并返回一個(gè)表示每項(xiàng)任務(wù)的未來結(jié)果的未來列表。
回頁(yè)首
4. ScheduledExecutorServices
盡管 ExecutorService
接口非常有用,但某些任務(wù)仍需要以計(jì)劃方式執(zhí)行,比如以確定的時(shí)間間隔或在特定時(shí)間執(zhí)行給定的任務(wù)。這就是 ScheduledExecutorService
的應(yīng)用范圍,它擴(kuò)展了 ExecutorService
。
如果您的目標(biāo)是創(chuàng)建一個(gè)每隔 5 秒跳一次的 “心跳” 命令,使用 ScheduledExecutorService
可以輕松實(shí)現(xiàn),如清單 4 所示:
清單 4. ScheduledExecutorService 模擬心跳
import java.util.concurrent.*;
public class Ping
{
public static void main(String[] args)
{
ScheduledExecutorService ses =
Executors.newScheduledThreadPool(1);
Runnable pinger = new Runnable() {
public void run() {
System.out.println("PING!");
}
};
ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS);
}
}
|
這項(xiàng)功能怎么樣?不用過于擔(dān)心線程,不用過于擔(dān)心用戶希望取消心跳時(shí)會(huì)發(fā)生什么,也不用明確地將線程標(biāo)記為前臺(tái)或后臺(tái);只需將所有的計(jì)劃細(xì)節(jié)留給 ScheduledExecutorService
。
順便說一下,如果用戶希望取消心跳,scheduleAtFixedRate
調(diào)用將返回一個(gè) ScheduledFuture
實(shí)例,它不僅封裝了結(jié)果(如果有),還擁有一個(gè) cancel
方法來關(guān)閉計(jì)劃的操作。
回頁(yè)首
5. Timeout 方法
為阻塞操作設(shè)置一個(gè)具體的超時(shí)值(以避免死鎖)的能力是 java.util.concurrent
庫(kù)相比起早期并發(fā)特性的一大進(jìn)步,比如監(jiān)控鎖定。
這些方法幾乎總是包含一個(gè) int
/TimeUnit
對(duì),指示這些方法應(yīng)該等待多長(zhǎng)時(shí)間才釋放控制權(quán)并將其返回給程序。它需要開發(fā)人員執(zhí)行更多工作 — 如果沒有獲取鎖,您將如何重新獲取? — 但結(jié)果幾乎總是正確的:更少的死鎖和更加適合生產(chǎn)的代碼。(關(guān)于編寫生產(chǎn)就緒代碼的更多信息,請(qǐng)參見 參考資料 中 Michael Nygard 編寫的 Release It!。)