java.util.concurrent
包含許多線程安全、測(cè)試良好、高性能的并發(fā)構(gòu)建塊。不客氣地說(shuō),創(chuàng)建 java.util.concurrent
的目的就是要實(shí)現(xiàn) Collection 框架對(duì)數(shù)據(jù)結(jié)構(gòu)所執(zhí)行的并發(fā)操作。通過(guò)提供一組可靠的、高性能并發(fā)構(gòu)建塊,開(kāi)發(fā)人員可以提高并發(fā)類(lèi)的線程安全、可伸縮性、性能、可讀性和可靠性。
如果一些類(lèi)名看起來(lái)相似,可能是因?yàn)?java.util.concurrent
中的許多概念源自 Doug Lea 的 util.concurrent
庫(kù)(請(qǐng)參閱 參考資料)。
JDK 5.0 中的并發(fā)改進(jìn)可以分為三組:
- JVM 級(jí)別更改。大多數(shù)現(xiàn)代處理器對(duì)并發(fā)對(duì)某一硬件級(jí)別提供支持,通常以 compare-and-swap (CAS)指令形式。CAS 是一種低級(jí)別的、細(xì)粒度的技術(shù),它允許多個(gè)線程更新一個(gè)內(nèi)存位置,同時(shí)能夠檢測(cè)其他線程的沖突并進(jìn)行恢復(fù)。它是許多高性能并發(fā)算法的基礎(chǔ)。在 JDK 5.0 之前,Java 語(yǔ)言中用于協(xié)調(diào)線程之間的訪問(wèn)的惟一原語(yǔ)是同步,同步是更重量級(jí)和粗粒度的。公開(kāi) CAS 可以開(kāi)發(fā)高度可伸縮的并發(fā) Java 類(lèi)。這些更改主要由 JDK 庫(kù)類(lèi)使用,而不是由開(kāi)發(fā)人員使用。
- 低級(jí)實(shí)用程序類(lèi) -- 鎖定和原子類(lèi)。使用 CAS 作為并發(fā)原語(yǔ),
ReentrantLock
類(lèi)提供與 synchronized
原語(yǔ)相同的鎖定和內(nèi)存語(yǔ)義,然而這樣可以更好地控制鎖定(如計(jì)時(shí)的鎖定等待、鎖定輪詢(xún)和可中斷的鎖定等待)和提供更好的可伸縮性(競(jìng)爭(zhēng)時(shí)的高性能)。大多數(shù)開(kāi)發(fā)人員將不再直接使用 ReentrantLock
類(lèi),而是使用在 ReentrantLock
類(lèi)上構(gòu)建的高級(jí)類(lèi)。
- 高級(jí)實(shí)用程序類(lèi)。這些類(lèi)實(shí)現(xiàn)并發(fā)構(gòu)建塊,每個(gè)計(jì)算機(jī)科學(xué)文本中都會(huì)講述這些類(lèi) -- 信號(hào)、互斥、閂鎖、屏障、交換程序、線程池和線程安全集合類(lèi)等。大部分開(kāi)發(fā)人員都可以在應(yīng)用程序中用這些類(lèi),來(lái)替換許多(如果不是全部)同步、
wait()
和 notify()
的使用,從而提高性能、可讀性和正確性。
本教程將重點(diǎn)介紹 java.util.concurrent
包提供的高級(jí)實(shí)用程序類(lèi) -- 線程安全集合、線程池和同步實(shí)用程序。這些是初學(xué)者和專(zhuān)家都可以使用的"現(xiàn)成"類(lèi)。
在第一小節(jié)中,我們將回顧并發(fā)的基本知識(shí),盡管它不應(yīng)取代對(duì)線程和線程安全的了解。那些一點(diǎn)都不熟悉線程的讀者應(yīng)該先參考一些關(guān)于線程的介紹,如"Introduction to Java Threads"教程(請(qǐng)參閱參考資料)。
接下來(lái)的幾個(gè)小節(jié)將研究 java.util.concurrent
中的高級(jí)實(shí)用程序類(lèi) -- 線程安全集合、線程池、信號(hào)和同步工具。
最后一小節(jié)將介紹 java.util.concurrent
中的低級(jí)并發(fā)構(gòu)建塊,并提供一些性能測(cè)評(píng)來(lái)顯示新 java.util.concurrent
類(lèi)的可伸縮性的改進(jìn)。
什么是線程?
所有重要的操作系統(tǒng)都支持進(jìn)程的概念 -- 獨(dú)立運(yùn)行的程序,在某種程度上相互隔離。
線程有時(shí)稱(chēng)為 輕量級(jí)進(jìn)程。與進(jìn)程一樣,它們擁有通過(guò)程序運(yùn)行的獨(dú)立的并發(fā)路徑,并且每個(gè)線程都有自己的程序計(jì)數(shù)器,稱(chēng)為堆棧和本地變量。然而,線程存在于進(jìn)程中,它們與同一進(jìn)程內(nèi)的其他線程共享內(nèi)存、文件句柄以及每進(jìn)程狀態(tài)。
今天,幾乎每個(gè)操作系統(tǒng)都支持線程,允許執(zhí)行多個(gè)可獨(dú)立調(diào)度的線程,以便共存于一個(gè)進(jìn)程中。因?yàn)橐粋€(gè)進(jìn)程中的線程是在同一個(gè)地址空間中執(zhí)行的,所以多個(gè)線程可以同時(shí)訪問(wèn)相同對(duì)象,并且它們從同一堆棧中分配對(duì)象。雖然這使線程更易于與其他線程共享信息,但也意味著您必須確保線程之間不相互干涉。
正確使用線程時(shí),線程能帶來(lái)諸多好處,其中包括更好的資源利用、簡(jiǎn)化開(kāi)發(fā)、高吞吐量、更易響應(yīng)的用戶界面以及能執(zhí)行異步處理。
Java 語(yǔ)言包括用于協(xié)調(diào)線程行為的原語(yǔ),從而可以在不違反設(shè)計(jì)原型或者不破壞數(shù)據(jù)結(jié)構(gòu)的前提下安全地訪問(wèn)和修改共享變量。
在 Java 程序中存在很多理由使用線程,并且不管開(kāi)發(fā)人員知道線程與否,幾乎每個(gè) Java 應(yīng)用程序都使用線程。許多 J2SE 和 J2EE 工具可以創(chuàng)建線程,如 RMI、Servlet、Enterprise JavaBeans 組件和 Swing GUI 工具包。
使用線程的理由包括:
- 更易響應(yīng)的用戶界面。 事件驅(qū)動(dòng)的 GUI 工具包(如 AWT 或 Swing)使用單獨(dú)的事件線程來(lái)處理 GUI 事件。從事件線程中調(diào)用通過(guò) GUI 對(duì)象注冊(cè)的事件監(jiān)聽(tīng)器。然而,如果事件監(jiān)聽(tīng)器將執(zhí)行冗長(zhǎng)的任務(wù)(如文檔拼寫(xiě)檢查),那么 UI 將出現(xiàn)凍結(jié),因?yàn)槭录€程直到冗長(zhǎng)任務(wù)完畢之后才能處理其他事件。通過(guò)在單獨(dú)線程中執(zhí)行冗長(zhǎng)操作,當(dāng)執(zhí)行冗長(zhǎng)后臺(tái)任務(wù)時(shí),UI 能繼續(xù)響應(yīng)。
- 使用多處理器。 多處理器(MP)系統(tǒng)變得越來(lái)越便宜,并且分布越來(lái)越廣泛。因?yàn)檎{(diào)度的基本單位通常是線程,所以不管有多少處理器可用,一個(gè)線程的應(yīng)用程序一次只能在一個(gè)處理器上運(yùn)行。在設(shè)計(jì)良好的程序中,通過(guò)更好地利用可用的計(jì)算機(jī)資源,多線程能夠提高吞吐量和性能。
- 簡(jiǎn)化建模。 有效使用線程能夠使程序編寫(xiě)變得更簡(jiǎn)單,并易于維護(hù)。通過(guò)合理使用線程,個(gè)別類(lèi)可以避免一些調(diào)度的詳細(xì)、交叉存取操作、異步 IO 和資源等待以及其他復(fù)雜問(wèn)題。相反,它們能專(zhuān)注于域的要求,簡(jiǎn)化開(kāi)發(fā)并改進(jìn)可靠性。
- 異步或后臺(tái)處理。 服務(wù)器應(yīng)用程序可以同時(shí)服務(wù)于許多遠(yuǎn)程客戶機(jī)。如果應(yīng)用程序從 socket 中讀取數(shù)據(jù),并且沒(méi)有數(shù)據(jù)可以讀取,那么對(duì)
read()
的調(diào)用將被阻塞,直到有數(shù)據(jù)可讀。在單線程應(yīng)用程序中,這意味著當(dāng)某一個(gè)線程被阻塞時(shí),不僅處理相應(yīng)請(qǐng)求要延遲,而且處理所有請(qǐng)求也將延遲。然而,如果每個(gè) socket 都有自己的 IO 線程,那么當(dāng)一個(gè)線程被阻塞時(shí),對(duì)其他并發(fā)請(qǐng)求行為沒(méi)有影響。
線程安全
如果將這些類(lèi)用于多線程環(huán)境中,雖然確保這些類(lèi)的線程安全比較困難,但線程安全卻是必需的。java.util.concurrent
規(guī)范進(jìn)程的一個(gè)目標(biāo)就是提供一組線程安全的、高性能的并發(fā)構(gòu)建塊,從而使開(kāi)發(fā)人員能夠減輕一些編寫(xiě)線程安全類(lèi)的負(fù)擔(dān)。
線程安全類(lèi)非常難以明確定義,大多數(shù)定義似乎都是完全循環(huán)的。快速 Google 搜索會(huì)顯示下列線程安全代碼定義的例子,但這些定義(或者更確切地說(shuō)是描述)通常沒(méi)什么幫助:
- . . . can be called from multiple programming threads without unwanted interaction between the threads.
- . . . may be called by more than on thread at a time without requiring any other action on the caller's part.
通過(guò)類(lèi)似這樣的定義,不奇怪我們?yōu)槭裁磳?duì)線程安全如此迷惑。這些定義幾乎就是在說(shuō)"如果可以從多個(gè)線程安全調(diào)用類(lèi),那么該類(lèi)就是線程安全的"。這當(dāng)然是線程安全的解釋?zhuān)珜?duì)我們區(qū)別線程安全類(lèi)和不安全類(lèi)沒(méi)有什么幫助。我們使用"安全"是為了說(shuō)明什么?
要成為線程安全的類(lèi),首先它必須在單線程環(huán)境中正確運(yùn)行。如果正確實(shí)現(xiàn)了類(lèi),那么說(shuō)明它符合規(guī)范,對(duì)該類(lèi)的對(duì)象的任何順序的操作(公共字段的讀寫(xiě)、公共方法的調(diào)用)都不應(yīng)該使對(duì)象處于無(wú)效狀態(tài);觀察將處于無(wú)效狀態(tài)的對(duì)象;或違反類(lèi)的任何變量、前置條件或后置條件。
而且,要成為線程安全的類(lèi),在從多個(gè)線程訪問(wèn)時(shí),它必須繼續(xù)正確運(yùn)行,而不管運(yùn)行時(shí)環(huán)境執(zhí)行那些線程的調(diào)度和交叉,且無(wú)需對(duì)部分調(diào)用代碼執(zhí)行任何其他同步。結(jié)果是對(duì)線程安全對(duì)象的操作將用于按固定的整體一致順序出現(xiàn)所有線程。
如果沒(méi)有線程之間的某種明確協(xié)調(diào),比如鎖定,運(yùn)行時(shí)可以隨意在需要時(shí)在多線程中交叉操作執(zhí)行。
在 JDK 5.0 之前,確保線程安全的主要機(jī)制是 synchronized
原語(yǔ)。訪問(wèn)共享變量(那些可以由多個(gè)線程訪問(wèn)的變量)的線程必須使用同步來(lái)協(xié)調(diào)對(duì)共享變量的讀寫(xiě)訪問(wèn)。java.util.concurrent
包提供了一些備用并發(fā)原語(yǔ),以及一組不需要任何其他同步的線程安全實(shí)用程序類(lèi)。
令人厭煩的并發(fā)
即使您的程序從沒(méi)有明確創(chuàng)建線程,也可能會(huì)有許多工具或框架代表您創(chuàng)建了線程,這時(shí)要求從這些線程調(diào)用的類(lèi)是線程安全的。這樣會(huì)對(duì)開(kāi)發(fā)人員帶來(lái)較大的設(shè)計(jì)和實(shí)現(xiàn)負(fù)擔(dān),因?yàn)殚_(kāi)發(fā)線程安全類(lèi)比開(kāi)發(fā)非線程安全類(lèi)有更多要注意的事項(xiàng),且需要更多的分析。
AWT 和 Swing
這些 GUI 工具包創(chuàng)建了稱(chēng)為時(shí)間線程的后臺(tái)線程,將從該線程調(diào)用通過(guò) GUI 組件注冊(cè)的監(jiān)聽(tīng)器。因此,實(shí)現(xiàn)這些監(jiān)聽(tīng)器的類(lèi)必須是線程安全的。
TimerTask
JDK 1.3 中引入的 TimerTask
工具允許稍后執(zhí)行任務(wù)或計(jì)劃定期執(zhí)行任務(wù)。在 Timer
線程中執(zhí)行 TimerTask
事件,這意味著作為 TimerTask
執(zhí)行的任務(wù)必須是線程安全的。
Servlet 和 JavaServer Page 技術(shù)
Servlet 容器可以創(chuàng)建多個(gè)線程,在多個(gè)線程中同時(shí)調(diào)用給定 servlet,從而進(jìn)行多個(gè)請(qǐng)求。因此 servlet 類(lèi)必須是線程安全的。
RMI
遠(yuǎn)程方法調(diào)用(remote method invocation,RMI)工具允許調(diào)用其他 JVM 中運(yùn)行的操作。實(shí)現(xiàn)遠(yuǎn)程對(duì)象最普遍的方法是擴(kuò)展 UnicastRemoteObject
。例示 UnicastRemoteObject
時(shí),它是通過(guò) RMI 調(diào)度器注冊(cè)的,該調(diào)度器可能創(chuàng)建一個(gè)或多個(gè)線程,將在這些線程中執(zhí)行遠(yuǎn)程方法。因此,遠(yuǎn)程類(lèi)必須是線程安全的。
正如所看到的,即使應(yīng)用程序沒(méi)有明確創(chuàng)建線程,也會(huì)發(fā)生許多可能會(huì)從其他線程調(diào)用類(lèi)的情況。幸運(yùn)的是,java.util.concurrent
中的類(lèi)可以大大簡(jiǎn)化編寫(xiě)線程安全類(lèi)的任務(wù)。
例子 -- 非線程安全 servlet
下列 servlet 看起來(lái)像無(wú)害的留言板 servlet,它保存每個(gè)來(lái)訪者的姓名。然而,該 servlet 不是線程安全的,而這個(gè) servlet 應(yīng)該是線程安全的。問(wèn)題在于它使用 HashSet
存儲(chǔ)來(lái)訪者的姓名,HashSet
不是線程安全的類(lèi)。
當(dāng)我們說(shuō)這個(gè) servlet 不是線程安全的時(shí),是說(shuō)它所造成的破壞不僅僅是丟失留言板輸入。在最壞的情況下,留言板數(shù)據(jù)結(jié)構(gòu)都可能被破壞并且無(wú)法恢復(fù)。
public class UnsafeGuestbookServlet extends HttpServlet {
private Set visitorSet = new HashSet();
protected void doGet(HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
String visitorName = httpServletRequest.getParameter("NAME");
if (visitorName != null)
visitorSet.add(visitorName);
}
}
通過(guò)將 visitorSet
的定義更改為下列代碼,可以使該類(lèi)變?yōu)榫€程安全的:
private Set visitorSet = Collections.synchronizedSet(new HashSet());
如上所示的例子顯示線程的內(nèi)置支持是一把雙刃劍 -- 雖然它使構(gòu)建多線程應(yīng)用程序變得很容易,但它同時(shí)要求開(kāi)發(fā)人員更加注意并發(fā)問(wèn)題,甚至在使用留言板 servlet 這樣普通的東西時(shí)也是如此。
線程安全集合
JDK 1.2 中引入的 Collection 框架是一種表示對(duì)象集合的高度靈活的框架,它使用基本接口 List
、Set
和 Map
。通過(guò) JDK 提供每個(gè)集合的多次實(shí)現(xiàn)(HashMap
、Hashtable
、TreeMap
、WeakHashMap
、HashSet
、TreeSet
、Vector
、ArrayList
、LinkedList
等等)。其中一些集合已經(jīng)是線程安全的(Hashtable
和 Vector
),通過(guò)同步的封裝工廠(Collections.synchronizedMap()
、synchronizedList()
和 synchronizedSet()
),其余的集合均可表現(xiàn)為線程安全的。
java.util.concurrent
包添加了多個(gè)新的線程安全集合類(lèi)(ConcurrentHashMap
、CopyOnWriteArrayList
和 CopyOnWriteArraySet
)。這些類(lèi)的目的是提供高性能、高度可伸縮性、線程安全的基本集合類(lèi)型版本。
java.util
中的線程集合仍有一些缺點(diǎn)。例如,在迭代鎖定時(shí),通常需要將該鎖定保留在集合中,否則,會(huì)有拋出 ConcurrentModificationException
的危險(xiǎn)。(這個(gè)特性有時(shí)稱(chēng)為條件線程安全;有關(guān)的更多說(shuō)明,請(qǐng)參閱 參考資料。)此外,如果從多個(gè)線程頻繁地訪問(wèn)集合,則常常不能很好地執(zhí)行這些類(lèi)。java.util.concurrent
中的新集合類(lèi)允許通過(guò)在語(yǔ)義中的少量更改來(lái)獲得更高的并發(fā)。
JDK 5.0 還提供了兩個(gè)新集合接口 -- Queue
和 BlockingQueue
。Queue
接口與 List
類(lèi)似,但它只允許從后面插入,從前面刪除。通過(guò)消除 List
的隨機(jī)訪問(wèn)要求,可以創(chuàng)建比現(xiàn)有 ArrayList
和 LinkedList
實(shí)現(xiàn)性能更好的 Queue
實(shí)現(xiàn)。因?yàn)?List
的許多應(yīng)用程序?qū)嶋H上不需要隨機(jī)訪問(wèn),所以Queue
通常可以替代 List
,來(lái)獲得更好的性能。
弱一致的迭代器
java.util
包中的集合類(lèi)都返回 fail-fast 迭代器,這意味著它們假設(shè)線程在集合內(nèi)容中進(jìn)行迭代時(shí),集合不會(huì)更改它的內(nèi)容。如果 fail-fast 迭代器檢測(cè)到在迭代過(guò)程中進(jìn)行了更改操作,那么它會(huì)拋出 ConcurrentModificationException
,這是不可控異常。
在迭代過(guò)程中不更改集合的要求通常會(huì)對(duì)許多并發(fā)應(yīng)用程序造成不便。相反,比較好的是它允許并發(fā)修改并確保迭代器只要進(jìn)行合理操作,就可以提供集合的一致視圖,如 java.util.concurrent
集合類(lèi)中的迭代器所做的那樣。
java.util.concurrent
集合返回的迭代器稱(chēng)為弱一致的(weakly consistent)迭代器。對(duì)于這些類(lèi),如果元素自從迭代開(kāi)始已經(jīng)刪除,且尚未由 next()
方法返回,那么它將不返回到調(diào)用者。如果元素自迭代開(kāi)始已經(jīng)添加,那么它可能返回調(diào)用者,也可能不返回。在一次迭代中,無(wú)論如何更改底層集合,元素不會(huì)被返回兩次。
CopyOnWriteArrayList 和 CopyOnWriteArraySet
可以用兩種方法創(chuàng)建線程安全支持?jǐn)?shù)據(jù)的 List
-- Vector
或封裝 ArrayList
和 Collections.synchronizedList()
。java.util.concurrent
包添加了名稱(chēng)繁瑣的 CopyOnWriteArrayList
。為什么我們想要新的線程安全的 List
類(lèi)?為什么 Vector
還不夠?
最簡(jiǎn)單的答案是與迭代和并發(fā)修改之間的交互有關(guān)。使用 Vector
或使用同步的 List
封裝器,返回的迭代器是 fail-fast 的,這意味著如果在迭代過(guò)程中任何其他線程修改 List,迭代可能失敗。
Vector
的非常普遍的應(yīng)用程序是存儲(chǔ)通過(guò)組件注冊(cè)的監(jiān)聽(tīng)器的列表。當(dāng)發(fā)生適合的事件時(shí),該組件將在監(jiān)聽(tīng)器的列表中迭代,調(diào)用每個(gè)監(jiān)聽(tīng)器。為了防止 ConcurrentModificationException
,迭代線程必須復(fù)制列表或鎖定列表,以便進(jìn)行整體迭代,而這兩種情況都需要大量的性能成本。
CopyOnWriteArrayList
類(lèi)通過(guò)每次添加或刪除元素時(shí)創(chuàng)建支持?jǐn)?shù)組的新副本,避免了這個(gè)問(wèn)題,但是進(jìn)行中的迭代保持對(duì)創(chuàng)建迭代器時(shí)的當(dāng)前副本進(jìn)行操作。雖然復(fù)制也會(huì)有一些成本,但是在許多情況下,迭代要比修改多得多,在這些情況下,寫(xiě)入時(shí)復(fù)制要比其他備用方法具有更好的性能和并發(fā)性。
如果應(yīng)用程序需要 Set
語(yǔ)義,而不是 List
,那么還有一個(gè) Set
版本 -- CopyOnWriteArraySet
。
ConcurrentHashMap
正如已經(jīng)存在線程安全的 List
的實(shí)現(xiàn),您可以用多種方法創(chuàng)建線程安全的、基于 hash 的 Map
-- Hashtable
,并使用 Collections.synchronizedMap()
封裝 HashMap
。JDK 5.0 添加了 ConcurrentHashMap
實(shí)現(xiàn),該實(shí)現(xiàn)提供了相同的基本線程安全的 Map
功能,但它大大提高了并發(fā)性。
Hashtable
和 synchronizedMap
所采取的獲得同步的簡(jiǎn)單方法(同步 Hashtable
中或者同步的 Map
封裝器對(duì)象中的每個(gè)方法)有兩個(gè)主要的不足。首先,這種方法對(duì)于可伸縮性是一種障礙,因?yàn)橐淮沃荒苡幸粋€(gè)線程可以訪問(wèn) hash 表。同時(shí),這樣仍不足以提供真正的線程安全性,許多公用的混合操作仍然需要額外的同步。雖然諸如 get()
和 put()
之類(lèi)的簡(jiǎn)單操作可以在不需要額外同步的情況下安全地完成,但還是有一些公用的操作序列,例如迭代或者 put-if-absent(空則放入),需要外部的同步,以避免數(shù)據(jù)爭(zhēng)用。
Hashtable
和 Collections.synchronizedMap
通過(guò)同步每個(gè)方法獲得線程安全。這意味著當(dāng)一個(gè)線程執(zhí)行一個(gè) Map
方法時(shí),無(wú)論其他線程要對(duì) Map
進(jìn)行什么樣操作,都不能執(zhí)行,直到第一個(gè)線程結(jié)束才可以。
對(duì)比來(lái)說(shuō),ConcurrentHashMap
允許多個(gè)讀取幾乎總是并發(fā)執(zhí)行,讀和寫(xiě)操作通常并發(fā)執(zhí)行,多個(gè)同時(shí)寫(xiě)入經(jīng)常并發(fā)執(zhí)行。結(jié)果是當(dāng)多個(gè)線程需要訪問(wèn)同一 Map
時(shí),可以獲得更高的并發(fā)性。
在大多數(shù)情況下,ConcurrentHashMap
是 Hashtable
或 Collections.synchronizedMap(new HashMap())
的簡(jiǎn)單替換。然而,其中有一個(gè)顯著不同,即 ConcurrentHashMap
實(shí)例中的同步不鎖定映射進(jìn)行獨(dú)占使用。實(shí)際上,沒(méi)有辦法鎖定 ConcurrentHashMap
進(jìn)行獨(dú)占使用,它被設(shè)計(jì)用于進(jìn)行并發(fā)訪問(wèn)。為了使集合不被鎖定進(jìn)行獨(dú)占使用,還提供了公用的混合操作的其他(原子)方法,如 put-if-absent。ConcurrentHashMap
返回的迭代器是弱一致的,意味著它們將不拋出 ConcurrentModificationException
,將進(jìn)行"合理操作"來(lái)反映迭代過(guò)程中其他線程對(duì) Map
的修改。
原始集合框架包含三個(gè)接口:List
、Map
和 Set
。List
描述了元素的有序集合,支持完全隨即訪問(wèn) -- 可以在任何位置添加、提取或刪除元素。
LinkedList
類(lèi)經(jīng)常用于存儲(chǔ)工作元素(等待執(zhí)行的任務(wù))的列表或隊(duì)列。然而,List
提供的靈活性比該公用應(yīng)用程序所需要的多得多,這個(gè)應(yīng)用程序通常在后面插入元素,從前面刪除元素。但是要支持完整 List 接口則意味著 LinkedList
對(duì)于這項(xiàng)任務(wù)不像原來(lái)那樣有效。Queue
接口比 List
簡(jiǎn)單得多,僅包含 put()
和 take()
方法,并允許比 LinkedList
更有效的實(shí)現(xiàn)。
Queue
接口還允許實(shí)現(xiàn)來(lái)確定存儲(chǔ)元素的順序。ConcurrentLinkedQueue
類(lèi)實(shí)現(xiàn)先進(jìn)先出(first-in-first-out,F(xiàn)IFO)隊(duì)列,而 PriorityQueue
類(lèi)實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列(也稱(chēng)為堆),它對(duì)于構(gòu)建調(diào)度器非常有用,調(diào)度器必須按優(yōu)先級(jí)或預(yù)期的執(zhí)行時(shí)間執(zhí)行任務(wù)。
interface Queue<E> extends Collection<E> {
boolean offer(E x);
E poll();
E remove() throws NoSuchElementException;
E peek();
E element() throws NoSuchElementException;
}
實(shí)現(xiàn) Queue
的類(lèi)是:
LinkedList
已經(jīng)進(jìn)行了改進(jìn)來(lái)實(shí)現(xiàn) Queue
。
PriorityQueue
非線程安全的優(yōu)先級(jí)對(duì)列(堆)實(shí)現(xiàn),根據(jù)自然順序或比較器返回元素。
ConcurrentLinkedQueue
快速、線程安全的、無(wú)阻塞 FIFO 隊(duì)列。
線程最普遍的一個(gè)應(yīng)用程序是創(chuàng)建一個(gè)或多個(gè)線程,以執(zhí)行特定類(lèi)型的任務(wù)。Timer
類(lèi)創(chuàng)建線程來(lái)執(zhí)行 TimerTask
對(duì)象,Swing 創(chuàng)建線程來(lái)處理 UI 事件。在這兩種情況中,在單獨(dú)線程中執(zhí)行的任務(wù)都假定是短期的,這些線程是為了處理大量短期任務(wù)而存在的。
在其中每種情況中,這些線程一般都有非常簡(jiǎn)單的結(jié)構(gòu):
while (true) {
if (no tasks)
wait for a task;
execute the task;
}
通過(guò)例示從 Thread
獲得的對(duì)象并調(diào)用 Thread.start()
方法來(lái)創(chuàng)建線程。可以用兩種方法創(chuàng)建線程:通過(guò)擴(kuò)展 Thread
和覆蓋 run()
方法,或者通過(guò)實(shí)現(xiàn) Runnable
接口和使用 Thread(Runnable)
構(gòu)造函數(shù):
class WorkerThread extends Thread {
public void run() { /* do work */ }
}
Thread t = new WorkerThread();
t.start();
或者:
Thread t = new Thread(new Runnable() {
public void run() { /* do work */ }
}
t.start();
重新使用線程
因?yàn)槎鄠€(gè)原因,類(lèi)似 Swing GUI 的框架為事件任務(wù)創(chuàng)建單一線程,而不是為每項(xiàng)任務(wù)創(chuàng)建新的線程。首先是因?yàn)閯?chuàng)建線程會(huì)有間接成本,所以創(chuàng)建線程來(lái)執(zhí)行
簡(jiǎn)單任務(wù)將是一種資源浪費(fèi)。通過(guò)重新使用事件線程來(lái)處理多個(gè)事件,啟動(dòng)和拆卸成本(隨平臺(tái)而變)會(huì)分?jǐn)傇诙鄠€(gè)事件上。
Swing 為事件使用單一后臺(tái)線程的另一個(gè)原因是確保事件不會(huì)互相干涉,因?yàn)橹钡角耙皇录Y(jié)束,下一事件才開(kāi)始處理。該方法簡(jiǎn)化了事件處理程序的編寫(xiě)。
使用多個(gè)線程,將要做更多的工作來(lái)確保一次僅一個(gè)線程地執(zhí)行線程相關(guān)的代碼。
大多數(shù)服務(wù)器應(yīng)用程序(如 Web 服務(wù)器、POP 服務(wù)器、數(shù)據(jù)庫(kù)服務(wù)器或文件服務(wù)器)代表遠(yuǎn)程客戶機(jī)處理請(qǐng)求,這些客戶機(jī)通常使用 socket 連接到服務(wù)器。對(duì)于每個(gè)請(qǐng)求,通常要進(jìn)行少量處理(獲得該文件的代碼塊,并將其發(fā)送回 socket),但是可能會(huì)有大量(且不受限制)的客戶機(jī)請(qǐng)求服務(wù)。
用于構(gòu)建服務(wù)器應(yīng)用程序的簡(jiǎn)單化模型會(huì)為每個(gè)請(qǐng)求創(chuàng)建新的線程。下列代碼段實(shí)現(xiàn)簡(jiǎn)單的 Web 服務(wù)器,它接受端口 80 的 socket 連接,并創(chuàng)建新的線程來(lái)處理請(qǐng)求。不幸的是,該代碼不是實(shí)現(xiàn) Web 服務(wù)器的好方法,因?yàn)樵谥刎?fù)載條件下它將失敗,停止整臺(tái)服務(wù)器。
class UnreliableWebServer {
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
// Don't do this!
new Thread(r).start();
}
}
}
當(dāng)服務(wù)器被請(qǐng)求吞沒(méi)時(shí),UnreliableWebServer
類(lèi)不能很好地處理這種情況。每次有請(qǐng)求時(shí),就會(huì)創(chuàng)建新的類(lèi)。根據(jù)操作系統(tǒng)和可用內(nèi)存,可以創(chuàng)建的線程數(shù)是有限的。
不幸的是,您通常不知道限制是多少 -- 只有當(dāng)應(yīng)用程序因?yàn)?OutOfMemoryError
而崩潰時(shí)才發(fā)現(xiàn)。
如果足夠快地在這臺(tái)服務(wù)器上拋出請(qǐng)求的話,最終其中一個(gè)線程創(chuàng)建將失敗,生成的 Error
會(huì)關(guān)閉整個(gè)應(yīng)用程序。當(dāng)一次僅能有效支持很少線程時(shí),沒(méi)有必要?jiǎng)?chuàng)建上千個(gè)
線程,無(wú)論如何,這樣使用資源可能會(huì)損害性能。創(chuàng)建線程會(huì)使用相當(dāng)一部分內(nèi)存,其中包括有兩個(gè)堆棧(Java 和 C),以及每線程數(shù)據(jù)結(jié)構(gòu)。如果創(chuàng)建過(guò)多線程,其中
每個(gè)線程都將占用一些 CPU 時(shí)間,結(jié)果將使用許多內(nèi)存來(lái)支持大量線程,每個(gè)線程都運(yùn)行得很慢。這樣就無(wú)法很好地使用計(jì)算資源。
使用線程池解決問(wèn)題
為任務(wù)創(chuàng)建新的線程并不一定不好,但是如果創(chuàng)建任務(wù)的頻率高,而平均任務(wù)持續(xù)時(shí)間低,我們可以看到每項(xiàng)任務(wù)創(chuàng)建一個(gè)新的線程將產(chǎn)生性能(如果負(fù)載不可預(yù)知,還
有穩(wěn)定性)問(wèn)題。
如果不是每項(xiàng)任務(wù)創(chuàng)建一個(gè)新的線程,則服務(wù)器應(yīng)用程序必須采取一些方法來(lái)限制一次可以處理的請(qǐng)求數(shù)。這意味著每次需要啟動(dòng)新的任務(wù)時(shí),它不能僅調(diào)用下列代碼。
new Thread(runnable).start()
管理一大組小任務(wù)的標(biāo)準(zhǔn)機(jī)制是組合工作隊(duì)列和線程池。工作隊(duì)列就是要處理的任務(wù)的隊(duì)列,前面描述的 Queue
類(lèi)完全適合。線程池是線程的集合,每個(gè)線程都提取公用
工作隊(duì)列。當(dāng)一個(gè)工作線程完成任務(wù)處理后,它會(huì)返回隊(duì)列,查看是否有其他任務(wù)需要處理。如果有,它會(huì)轉(zhuǎn)移到下一個(gè)任務(wù),并開(kāi)始處理。
線程池為線程生命周期間接成本問(wèn)題和資源崩潰問(wèn)題提供了解決方案。通過(guò)對(duì)多個(gè)任務(wù)重新使用線程,創(chuàng)建線程的間接成本將分布到多個(gè)任務(wù)中。作為一種額外好處,因?yàn)?br/>請(qǐng)求到達(dá)時(shí),線程已經(jīng)存在,從而可以消除由創(chuàng)建線程引起的延遲。因此,可以立即處理請(qǐng)求,使應(yīng)用程序更易響應(yīng)。而且,通過(guò)正確調(diào)整線程池中的線程數(shù),可以強(qiáng)制超
出特定限制的任何請(qǐng)求等待,直到有線程可以處理它,它們等待時(shí)所消耗的資源要少于使用額外線程所消耗的資源,這樣可以防止資源崩潰。
Executor 框架
java.util.concurrent
包中包含靈活的線程池實(shí)現(xiàn),但是更重要的是,它包含用于管理實(shí)現(xiàn) Runnable
的任務(wù)的執(zhí)行的整個(gè)框架。該框架稱(chēng)為 Executor 框架。
Executor
接口相當(dāng)簡(jiǎn)單。它描述將運(yùn)行 Runnable
的對(duì)象:
public interface Executor {
void execute(Runnable command);
}
任務(wù)運(yùn)行于哪個(gè)線程不是由該接口指定的,這取決于使用的 Executor
的實(shí)現(xiàn)。它可以運(yùn)行于后臺(tái)線程,如 Swing 事件線程,或者運(yùn)行于線程池,或者調(diào)用線程,或者
新的線程,它甚至可以運(yùn)行于其他 JVM!通過(guò)同步的 Executor
接口提交任務(wù),從任務(wù)執(zhí)行策略中刪除任務(wù)提交。Executor
接口獨(dú)自關(guān)注任務(wù)提交 -- 這是
Executor
實(shí)現(xiàn)的選擇,確定執(zhí)行策略。這使在部署時(shí)調(diào)整執(zhí)行策略(隊(duì)列限制、池大小、優(yōu)先級(jí)排列等等)更加容易,更改的代碼最少。
java.util.concurrent
中的大多數(shù) Executor
實(shí)現(xiàn)還實(shí)現(xiàn) ExecutorService
接口,這是對(duì) Executor
的擴(kuò)展,它還管理執(zhí)行服務(wù)的生命周期。這使它們更易
于
管理,并向生命可能比單獨(dú) Executor
的生命更長(zhǎng)的應(yīng)用程序提供服務(wù)。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,
TimeUnit unit);
// other convenience methods for submitting tasks
}
Executor
java.util.concurrent
包包含多個(gè) Executor
實(shí)現(xiàn),每個(gè)實(shí)現(xiàn)都實(shí)現(xiàn)不同的執(zhí)行策略。什么是執(zhí)行策略?執(zhí)行策略定義何時(shí)在哪個(gè)線程中運(yùn)行任務(wù),
執(zhí)行任務(wù)可能消耗的資源級(jí)別(線程、內(nèi)存等等),以及如果執(zhí)行程序超載該怎么辦。
執(zhí)行程序通常通過(guò)工廠方法例示,而不是通過(guò)構(gòu)造函數(shù)。Executors
類(lèi)包含用于構(gòu)造許多不同類(lèi)型的 Executor
實(shí)現(xiàn)的靜態(tài)工廠方法:
Executors.newCachedThreadPool()
創(chuàng)建不限制大小的線程池,但是當(dāng)以前創(chuàng)建的線程可以使用時(shí)將重新使用那些線程。如果沒(méi)有現(xiàn)有線程可用,
- 將創(chuàng)建新的線程并將其添加到池中。使用不到 60 秒的線程將終止并從緩存中刪除。
Executors.newFixedThreadPool(int n)
創(chuàng)建線程池,其重新使用在不受限制的隊(duì)列之外運(yùn)行的固定線程組。在關(guān)閉前,所有線程都會(huì)因?yàn)閳?zhí)行
- 過(guò)程中的失敗而終止,如果需要執(zhí)行后續(xù)任務(wù),將會(huì)有新的線程來(lái)代替這些線程。
Executors.newSingleThreadExecutor()
創(chuàng)建 Executor,其使用在不受限制的隊(duì)列之外運(yùn)行的單一工作線程,與 Swing 事件線程非常相似。
- 保證順序執(zhí)行任務(wù),在任何給定時(shí)間,不會(huì)有多個(gè)任務(wù)處于活動(dòng)狀態(tài)。
更可靠的 Web 服務(wù)器 -- 使用 Executor
前面 如何不對(duì)任務(wù)進(jìn)行管理 中的代碼顯示了如何不用編寫(xiě)可靠服務(wù)器應(yīng)用程序。幸運(yùn)的是,修復(fù)這個(gè)示例非常簡(jiǎn)單,只需將 Thread.start()
調(diào)用替換
為向 Executor
提交任務(wù)即可:
class ReliableWebServer {
Executor pool =
Executors.newFixedThreadPool(7);
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
pool.execute(r);
}
}
}
注意,本例與前例之間的區(qū)別僅在于 Executor
的創(chuàng)建以及如何提交執(zhí)行的任務(wù)。
定制 ThreadPoolExecutor
Executors
中的 newFixedThreadPool
和 newCachedThreadPool
工廠方法返回的 Executor
是類(lèi) ThreadPoolExecutor
的實(shí)例,是高度可定制的。
通過(guò)使用包含 ThreadFactory
變量的工廠方法或構(gòu)造函數(shù)的版本,可以定義池線程的創(chuàng)建。ThreadFactory
是工廠對(duì)象,其構(gòu)造執(zhí)行程序要使用的新線程。
使用定制的線程工廠,創(chuàng)建的線程可以包含有用的線程名稱(chēng),并且這些線程是守護(hù)線程,屬于特定線程組或具有特定優(yōu)先級(jí)。
下面是線程工廠的例子,它創(chuàng)建守護(hù)線程,而不是創(chuàng)建用戶線程:
public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
}
有時(shí),Executor
不能執(zhí)行任務(wù),因?yàn)樗呀?jīng)關(guān)閉或者因?yàn)?Executor
使用受限制隊(duì)列存儲(chǔ)等待任務(wù),而該隊(duì)列已滿。在這種情況下,需要咨詢(xún)執(zhí)行程序的
RejectedExecutionHandler
來(lái)確定如何處理任務(wù) -- 拋出異常(默認(rèn)情況),放棄任務(wù),在調(diào)用者的線程中執(zhí)行任務(wù),或放棄隊(duì)列中最早的任務(wù)以為
新任務(wù)騰出空間。ThreadPoolExecutor.setRejectedExecutionHandler
可以設(shè)置拒絕的執(zhí)行處理程序。
還可以擴(kuò)展 ThreadPoolExecutor
,并覆蓋方法 beforeExecute
和 afterExecute
,以添加裝置,添加記錄,添加計(jì)時(shí),重新初始化線程本地變量,
或進(jìn)行其他執(zhí)行定制。
需要特別考慮的問(wèn)題
使用 Executor
框架會(huì)從執(zhí)行策略中刪除任務(wù)提交,一般情況下,人們希望這樣,那是因?yàn)樗试S我們靈活地調(diào)整執(zhí)行策略,不必更改許多位置的代碼。然而,當(dāng)提交代
碼暗含假設(shè)特定執(zhí)行策略時(shí),存在多種情況,在這些情況下,
重要的是選擇的 Executor 實(shí)現(xiàn)一致的執(zhí)行策略。
這類(lèi)情況中的其中的一種就是一些任務(wù)同時(shí)等待其他任務(wù)完成。在這種情況下,當(dāng)線程池沒(méi)有足夠的線程時(shí),如果所有當(dāng)前執(zhí)行的任務(wù)都在等待另一項(xiàng)任務(wù),而該任務(wù)因?yàn)?br/>線程池已滿不能執(zhí)行,那么線程池可能會(huì)死鎖。
另一種相似的情況是一組線程必須作為共同操作組一起工作。在這種情況下,需要確保線程池能夠容納所有線程。
如果應(yīng)用程序?qū)μ囟▓?zhí)行程序進(jìn)行了特定假設(shè),那么應(yīng)該在 Executor
定義和初始化的附近對(duì)這些進(jìn)行說(shuō)明,從而使善意的更改不會(huì)破壞應(yīng)用程序的正確功能。
創(chuàng)建 Executor
時(shí),人們普遍會(huì)問(wèn)的一個(gè)問(wèn)題是"線程池應(yīng)該有多大?"。當(dāng)然,答案取決于硬件和將執(zhí)行的任務(wù)類(lèi)型(它們是受計(jì)算限制或是受 IO 的限制?)。
如果線程池太小,資源可能不能被充分利用,在一些任務(wù)還在工作隊(duì)列中等待執(zhí)行時(shí),可能會(huì)有處理器處于閑置狀態(tài)。
另一方面,如果線程池太大,則將有許多有效線程,因?yàn)榇罅烤€程或有效任務(wù)使用內(nèi)存,或者因?yàn)槊宽?xiàng)任務(wù)要比使用少量線程有更多上下文切換,性能可能會(huì)受損。
所以假設(shè)為了使處理器得到充分使用,線程池應(yīng)該有多大?如果知道系統(tǒng)有多少處理器和任務(wù)的計(jì)算時(shí)間和等待時(shí)間的近似比率,Amdahl 法則提供很好的近似公式。
用 WT 表示每項(xiàng)任務(wù)的平均等待時(shí)間,ST 表示每項(xiàng)任務(wù)的平均服務(wù)時(shí)間(計(jì)算時(shí)間)。則 WT/ST 是每項(xiàng)任務(wù)等待所用時(shí)間的百分比。對(duì)于 N 處理器系統(tǒng),池中可以
近似有 N*(1+WT/ST) 個(gè)線程。
好的消息是您不必精確估計(jì) WT/ST。"合適的"池大小的范圍相當(dāng)大;只需要避免"過(guò)大"和"過(guò)小"的極端情況即可。
Future 接口
Future
接口允許表示已經(jīng)完成的任務(wù)、正在執(zhí)行過(guò)程中的任務(wù)或者尚未開(kāi)始執(zhí)行的任務(wù)。通過(guò) Future
接口,可以嘗試取消尚未完成的任務(wù),查詢(xún)?nèi)蝿?wù)已經(jīng)完成還是
取消了,以及提取(或等待)任務(wù)的結(jié)果值。
FutureTask
類(lèi)實(shí)現(xiàn)了 Future
,并包含一些構(gòu)造函數(shù),允許將 Runnable
或 Callable
(會(huì)產(chǎn)生結(jié)果的 Runnable
)和 Future
接口封裝。因?yàn)?FutureTask
也實(shí)現(xiàn) Runnable
,所以可以只將 FutureTask
提供給 Executor
。一些提交方法(如 ExecutorService.submit()
)除了提交任務(wù)之外,還將返回 Future
接口。
Future.get()
方法檢索任務(wù)計(jì)算的結(jié)果(或如果任務(wù)完成,但有異常,則拋出 ExecutionException
)。如果任務(wù)尚未完成,那么 Future.get()
將被阻塞,
直到任務(wù)完成;如果任務(wù)已經(jīng)完成,那么它將立即返回結(jié)果。
使用 Future 構(gòu)建緩存
該示例代碼與 java.util.concurrent
中的多個(gè)類(lèi)關(guān)聯(lián),突出顯示了 Future
的功能。它實(shí)現(xiàn)緩存,使用 Future
描述緩存值,該值可能已經(jīng)計(jì)算,或者可能在其他線程中"正在構(gòu)造"。
它利用 ConcurrentHashMap
中的原子 putIfAbsent()
方法,確保僅有一個(gè)線程試圖計(jì)算給定關(guān)鍵字的值。如果其他線程隨后請(qǐng)求同一關(guān)鍵字的值,它僅能等待(通過(guò) Future.get()
的幫助)第一個(gè)線程完成。因此兩個(gè)線程不會(huì)計(jì)算相同的值。
public class Cache<K, V> {
ConcurrentMap<K, FutureTask<V>> map = new ConcurrentHashMap();
Executor executor = Executors.newFixedThreadPool(8);
public V get(final K key) {
FutureTask<V> f = map.get(key);
if (f == null) {
Callable<V> c = new Callable<V>() {
public V call() {
// return value associated with key
}
};
f = new FutureTask<V>(c);
FutureTask old = map.putIfAbsent(key, f);
if (old == null)
executor.execute(f);
else
f = old;
}
return f.get();
}
}
CompletionService
CompletionService
將執(zhí)行服務(wù)與類(lèi)似 Queue
的接口組合,從任務(wù)執(zhí)行中刪除任務(wù)結(jié)果的處理。CompletionService
接口包含用來(lái)提交將要執(zhí)行的任務(wù)的 submit()
方法和用來(lái)詢(xún)問(wèn)下一完成任務(wù)的 take()
/poll()
方法。
CompletionService
允許應(yīng)用程序結(jié)構(gòu)化,使用 Producer/Consumer 模式,其中生產(chǎn)者創(chuàng)建任務(wù)并提交,消費(fèi)者請(qǐng)求完成任務(wù)的結(jié)果并處理這些結(jié)果。CompletionService
接口由 ExecutorCompletionService
類(lèi)實(shí)現(xiàn),該類(lèi)使用 Executor
處理任務(wù)并從 CompletionService
導(dǎo)出 submit/poll/take 方法。
下列代碼使用 Executor
和 CompletionService
來(lái)啟動(dòng)許多"solver"任務(wù),并使用第一個(gè)生成非空結(jié)果的任務(wù)的結(jié)果,然后取消其余任務(wù):
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch(ExecutionException ignore) {}
}
}
finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
java.util.concurrent
中其他類(lèi)別的有用的類(lèi)也是同步工具。這組類(lèi)相互協(xié)作,控制一個(gè)或多個(gè)線程的執(zhí)行流。
Semaphore
、CyclicBarrier
、CountdownLatch
和 Exchanger
類(lèi)都是同步工具的例子。每個(gè)類(lèi)都有線程可以調(diào)用的方法,
方法是否被阻塞取決于正在使用的特定同步工具的狀態(tài)和規(guī)則。
Semaphore
Semaphore 類(lèi)實(shí)現(xiàn)標(biāo)準(zhǔn) Dijkstra 計(jì)數(shù)信號(hào)。計(jì)數(shù)信號(hào)可以認(rèn)為具有一定數(shù)量的許可權(quán),該許可權(quán)可以獲得或釋放。如果有剩余的許可權(quán),acquire()
方法將成功,
否則該方法將被阻塞,直到有可用的許可權(quán)(通過(guò)其他線程釋放許可權(quán))。線程一次可以獲得多個(gè)許可權(quán)。
計(jì)數(shù)信號(hào)可以用于限制有權(quán)對(duì)資源進(jìn)行并發(fā)訪問(wèn)的線程數(shù)。該方法對(duì)于實(shí)現(xiàn)資源池或限制 Web 爬蟲(chóng)(Web crawler)中的輸出 socket 連接非常有用。
注意信號(hào)不跟蹤哪個(gè)線程擁有多少許可權(quán);這由應(yīng)用程序來(lái)決定,以確保何時(shí)線程釋放許可權(quán),該信號(hào)表示其他線程擁有許可權(quán)或者正在釋放許可權(quán),以及其他線程知道
它的許可權(quán)已釋放。
互斥
計(jì)數(shù)信號(hào)的一種特殊情況是互斥,或者互斥信號(hào)。互斥就是具有單一許可權(quán)的計(jì)數(shù)信號(hào),意味著在給定時(shí)間僅一個(gè)線程可以具有許可權(quán)(也稱(chēng)為二進(jìn)制信號(hào))。互斥可以
用于管理對(duì)共享資源的獨(dú)占訪問(wèn)。
雖然互斥許多地方與鎖定一樣,但互斥還有一個(gè)鎖定通常沒(méi)有的其他功能,就是互斥可以由具有許可權(quán)的線程之外的其他線程來(lái)釋放。這在死鎖恢復(fù)時(shí)會(huì)非常有用。
CyclicBarrier 類(lèi)可以幫助同步,它允許一組線程等待整個(gè)線程組到達(dá)公共屏障點(diǎn)。CyclicBarrier
是使用整型變量構(gòu)造的,其確定組中的線程數(shù)。當(dāng)一個(gè)線程到達(dá)屏障時(shí)(通過(guò)調(diào)用 CyclicBarrier.await()
),它會(huì)被阻塞,直到所有線程都到達(dá)屏障,然后在該點(diǎn)允許所有線程繼續(xù)執(zhí)行。該操作與許多家庭逛商業(yè)街相似 -- 每個(gè)家庭成員都自己走,并商定 1:00 在電影院集合。當(dāng)您到電影院但不是所有人都到了時(shí),您會(huì)坐下來(lái)等其他人到達(dá)。然后所有人一起離開(kāi)。
認(rèn)為屏障是循環(huán)的是因?yàn)樗梢灾匦率褂茫灰坏┧芯€程都已經(jīng)在屏障處集合并釋放,則可以將該屏障重新初始化到它的初始狀態(tài)。
還可以指定在屏障處等待時(shí)的超時(shí);如果在該時(shí)間內(nèi)其余線程還沒(méi)有到達(dá)屏障,則認(rèn)為屏障被打破,所有正在等待的線程會(huì)收到 BrokenBarrierException
。
下列代碼將創(chuàng)建 CyclicBarrier
并啟動(dòng)一組線程,每個(gè)線程將計(jì)算問(wèn)題的一部分,等待所有其他線程結(jié)束之后,再檢查解決方案是否達(dá)成一致。如果不一致,那么每個(gè)工作線程將開(kāi)始另一個(gè)迭代。該例將使用 CyclicBarrier
變量,它允許注冊(cè) Runnable
,在所有線程到達(dá)屏障但還沒(méi)有釋放任何線程時(shí)執(zhí)行 Runnable
。
class Solver { // Code sketch
void solve(final Problem p, int nThreads) {
final CyclicBarrier barrier =
new CyclicBarrier(nThreads,
new Runnable() {
public void run() { p.checkConvergence(); }}
);
for (int i = 0; i < nThreads; ++i) {
final int id = i;
Runnable worker = new Runnable() {
final Segment segment = p.createSegment(id);
public void run() {
try {
while (!p.converged()) {
segment.update();
barrier.await();
}
}
catch(Exception e) { return; }
}
};
new Thread(worker).start();
}
}
CountdownLatch
類(lèi)與 CyclicBarrier
相似,因?yàn)樗慕巧菍?duì)已經(jīng)在它們中間分?jǐn)偭藛?wèn)題的一組線程進(jìn)行協(xié)調(diào)。它也是使用整型變量構(gòu)造的,指明計(jì)數(shù)的初始值,但是與 CyclicBarrier
不同的是,CountdownLatch
不能重新使用。
其中,CyclicBarrier
是到達(dá)屏障的所有線程的大門(mén),只有當(dāng)所有線程都已經(jīng)到達(dá)屏障或屏障被打破時(shí),才允許這些線程通過(guò),CountdownLatch
將到達(dá)和等待功能分離。任何線程都可以通過(guò)調(diào)用 countDown()
減少當(dāng)前計(jì)數(shù),這種不會(huì)阻塞線程,而只是減少計(jì)數(shù)。await() 方法的行為與 CyclicBarrier.await()
稍微有所不同,調(diào)用 await()
任何線程都會(huì)被阻塞,直到閂鎖計(jì)數(shù)減少為零,在該點(diǎn)等待的所有線程才被釋放,對(duì) await()
的后續(xù)調(diào)用將立即返回。
當(dāng)問(wèn)題已經(jīng)分解為許多部分,每個(gè)線程都被分配一部分計(jì)算時(shí),CountdownLatch
非常有用。在工作線程結(jié)束時(shí),它們將減少計(jì)數(shù),協(xié)調(diào)線程可以在閂鎖處等待當(dāng)前這一批計(jì)算結(jié)束,然后繼續(xù)移至下一批計(jì)算。
相反地,具有計(jì)數(shù) 1 的 CountdownLatch
類(lèi)可以用作"啟動(dòng)大門(mén)",來(lái)立即啟動(dòng)一組線程;工作線程可以在閂鎖處等待,協(xié)調(diào)線程減少計(jì)數(shù),從而立即釋放所有工作線程。下例使用兩個(gè) CountdownLatche
。一個(gè)作為啟動(dòng)大門(mén),一個(gè)在所有工作線程結(jié)束時(shí)釋放線程:
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let them run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
}
Exchanger
類(lèi)方便了兩個(gè)共同操作線程之間的雙向交換;這樣,就像具有計(jì)數(shù)為 2 的 CyclicBarrier
,并且兩個(gè)線程在都到達(dá)屏障時(shí)可以"交換"一些狀態(tài)。(Exchanger
模式有時(shí)也稱(chēng)為聚集。)
Exchanger
通常用于一個(gè)線程填充緩沖(通過(guò)讀取 socket),而另一個(gè)線程清空緩沖(通過(guò)處理從 socket 收到的命令)的情況。當(dāng)兩個(gè)線程在屏障處集合時(shí),它們交換緩沖。下列代碼說(shuō)明了這項(xiàng)技術(shù):
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = new DataBuffer();
DataBuffer initialFullBuffer = new DataBuffer();
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.full())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.empty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
鎖定和原子之Lock
Java 語(yǔ)言?xún)?nèi)置了鎖定工具 -- synchronized
關(guān)鍵字。當(dāng)線程獲得監(jiān)視器時(shí)(內(nèi)置鎖定),其他線程如果試圖獲得相同鎖定,那么它們將被阻塞,直到第一個(gè)線程釋放該鎖定。同步還確保隨后獲得相同鎖定的線程可以看到之前的線程在具有該鎖定時(shí)所修改的變量的值,從而確保如果類(lèi)正確地同步了共享狀態(tài)的訪問(wèn)權(quán),那么線程將不會(huì)看到變量的"失效"值,這是緩存或編譯器優(yōu)化的結(jié)果。
雖然同步?jīng)]有什么問(wèn)題,但它有一些限制,在一些高級(jí)應(yīng)用程序中會(huì)造成不便。Lock
接口將內(nèi)置監(jiān)視器鎖定的鎖定行為普遍化,允許多個(gè)鎖定實(shí)現(xiàn),同時(shí)提供一些內(nèi)置鎖定缺少的功能,如計(jì)時(shí)的等待、可中斷的等待、鎖定輪詢(xún)、每個(gè)鎖定有多個(gè)條件等待集合以及無(wú)阻塞結(jié)構(gòu)的鎖定。
interface Lock {
void lock();
void lockInterruptibly() throws IE;
boolean tryLock();
boolean tryLock(long time,
TimeUnit unit) throws IE;
void unlock();
Condition newCondition() throws
UnsupportedOperationException;
}
ReentrantLock
ReentrantLock
是具有與隱式監(jiān)視器鎖定(使用 synchronized 方法和語(yǔ)句訪問(wèn))相同的基本行為和語(yǔ)義的 Lock 的實(shí)現(xiàn),但它具有擴(kuò)展的能力。
作為額外收獲,在競(jìng)爭(zhēng)條件下,ReentrantLock
的實(shí)現(xiàn)要比現(xiàn)在的 synchronized 實(shí)現(xiàn)更具有可伸縮性。(有可能在 JVM 的將來(lái)版本中改進(jìn) synchronized 的競(jìng)爭(zhēng)性能。)
這意味著當(dāng)許多線程都競(jìng)爭(zhēng)相同鎖定時(shí),使用 ReentrantLock
的吞吐量通常要比 synchronized
好。換句話說(shuō),當(dāng)許多線程試圖訪問(wèn) ReentrantLock
保護(hù)的共享資源時(shí),
JVM 將花費(fèi)較少的時(shí)間來(lái)調(diào)度線程,而用更多個(gè)時(shí)間執(zhí)行線程。
雖然 ReentrantLock
類(lèi)有許多優(yōu)點(diǎn),但是與同步相比,它有一個(gè)主要缺點(diǎn) -- 它可能忘記釋放鎖定。建議當(dāng)獲得和釋放 ReentrantLock
時(shí)使用下列結(jié)構(gòu):
Lock lock = new ReentrantLock();
...
lock.lock();
try {
// perform operations protected by lock
}
catch(Exception ex) {
// restore invariants
}
finally {
lock.unlock();
}
因?yàn)殒i定失誤(忘記釋放鎖定)的風(fēng)險(xiǎn),所以對(duì)于基本鎖定,強(qiáng)烈建議您繼續(xù)使用 synchronized
,除非真的需要 ReentrantLock
額外的靈活性和可伸縮性。
ReentrantLock
是用于高級(jí)應(yīng)用程序的高級(jí)工具 -- 有時(shí)需要,但有時(shí)用原來(lái)的方法就很好。
Condition
就像 Lock
接口是同步的具體化,Condition
接口是 Object
中 wait()
和 notify()
方法的具體化。Lock
中的一個(gè)方法是 newCondition()
,
它要求鎖定向該鎖定返回新的 Condition
對(duì)象限制。await()
、signal()
和 signalAll()
方法類(lèi)似于 wait()
、notify()
和 notifyAll()
,
但增加了靈活性,每個(gè) Lock
都可以創(chuàng)建多個(gè)條件變量。這簡(jiǎn)化了一些并發(fā)算法的實(shí)現(xiàn)。
ReadWriteLock
ReentrantLock
實(shí)現(xiàn)的鎖定規(guī)則非常簡(jiǎn)單 -- 每當(dāng)一個(gè)線程具有鎖定時(shí),其他線程必須等待,直到該鎖定可用。有時(shí),當(dāng)對(duì)數(shù)據(jù)結(jié)構(gòu)的讀取通常多于修改時(shí),
可以使用更復(fù)雜的稱(chēng)為讀寫(xiě)鎖定的鎖定結(jié)構(gòu),它允許有多個(gè)并發(fā)讀者,同時(shí)還允許一個(gè)寫(xiě)入者獨(dú)占鎖定。該方法在一般情況下(只讀)提供了更大的并發(fā)性,同時(shí)
在必要時(shí)仍提供獨(dú)占訪問(wèn)的安全性。ReadWriteLock
接口和 ReentrantReadWriteLock
類(lèi)提供這種功能 -- 多讀者、單寫(xiě)入者鎖定規(guī)則,可以用這種功能
來(lái)保護(hù)共享的易變資源。
原子變量
即使大多數(shù)用戶將很少直接使用它們,原子變量類(lèi)(AtomicInteger
、AtomicLong
、AtomicReference
等等)也有充分理由是最顯著的新并發(fā)類(lèi)。這些類(lèi)公開(kāi)對(duì)
JVM 的低級(jí)別改進(jìn),允許進(jìn)行具有高度可伸縮性的原子讀-修改-寫(xiě)操作。大多數(shù)現(xiàn)代 CPU 都有原子讀-修改-寫(xiě)的原語(yǔ),比如比較并交換(CAS)或加載鏈接/條件存儲(chǔ)
(LL/SC)。原子變量類(lèi)使用硬件提供的最快的并發(fā)結(jié)構(gòu)來(lái)實(shí)現(xiàn)。
許多并發(fā)算法都是根據(jù)對(duì)計(jì)數(shù)器或數(shù)據(jù)結(jié)構(gòu)的比較并交換操作來(lái)定義的。通過(guò)暴露高性能的、高度可伸縮的 CAS 操作(以原子變量的形式),用 Java 語(yǔ)言實(shí)現(xiàn)高性能、
無(wú)等待、無(wú)鎖定的并發(fā)算法已經(jīng)變得可行。
幾乎 java.util.concurrent
中的所有類(lèi)都是在 ReentrantLock
之上構(gòu)建的,ReentrantLock
則是在原子變量類(lèi)的基礎(chǔ)上構(gòu)建的。所以,雖然僅少數(shù)并發(fā)專(zhuān)家
使用原子變量類(lèi),但 java.util.concurrent
類(lèi)的很多可伸縮性改進(jìn)都是由它們提供的。
原子變量主要用于為原子地更新"熱"字段提供有效的、細(xì)粒度的方式,"熱"字段是指由多個(gè)線程頻繁訪問(wèn)和更新的字段。另外,原子變量還是計(jì)數(shù)器或生成序號(hào)的自然
機(jī)制。
性能與可伸縮性
雖然 java.util.concurrent
努力的首要目標(biāo)是使編寫(xiě)正確、線程安全的類(lèi)更加容易,但它還有一個(gè)次要目標(biāo),就是提供可伸縮性。可伸縮性與性能完全不同,實(shí)際上,
可伸縮性有時(shí)要以性能為代價(jià)來(lái)獲得。
性能是"可以快速執(zhí)行此任務(wù)的程度"的評(píng)測(cè)。可伸縮性描述應(yīng)用程序的吞吐量如何表現(xiàn)為它的工作量和可用計(jì)算資源增加。可伸縮的程序可以按比例使用更多的處理器、
內(nèi)存或 I/O 帶寬來(lái)處理更多個(gè)工作量。當(dāng)我們?cè)诓l(fā)環(huán)境中談?wù)摽缮炜s性時(shí),我們是在問(wèn)當(dāng)許多線程同時(shí)訪問(wèn)給定類(lèi)時(shí),這個(gè)類(lèi)的執(zhí)行情況。
java.util.concurrent
中的低級(jí)別類(lèi) ReentrantLock
和原子變量類(lèi)的可伸縮性要比內(nèi)置監(jiān)視器(同步)鎖定高得多。因此,使用 ReentrantLock
或原子變量
類(lèi)來(lái)協(xié)調(diào)共享訪問(wèn)的類(lèi)也可能更具有可伸縮性。
Hashtable 與 ConcurrentHashMap
作為可伸縮性的例子,ConcurrentHashMap
實(shí)現(xiàn)設(shè)計(jì)的可伸縮性要比其線程安全的上一代 Hashtable
的可伸縮性強(qiáng)得多。Hashtable
一次只允許一個(gè)線程訪問(wèn)
Map
;ConcurrentHashMap
允許多個(gè)讀者并發(fā)執(zhí)行,讀者與寫(xiě)入者并發(fā)執(zhí)行,以及一些寫(xiě)入者并發(fā)執(zhí)行。因此,如果許多線程頻繁訪問(wèn)共享映射,使用
ConcurrentHashMap
的總的吞吐量要比使用 Hashtable
的好。
下表大致說(shuō)明了 Hashtable
和 ConcurrentHashMap
之間的可伸縮性差別。在每次運(yùn)行時(shí),N 個(gè)線程并發(fā)執(zhí)行緊密循環(huán),它們從 Hashtable
或
ConcurrentHashMap
中檢索隨即關(guān)鍵字,60% 的失敗檢索將執(zhí)行 put()
操作,2% 的成功檢索執(zhí)行 remove()
操作。測(cè)試在運(yùn)行 Linux 的雙處理器
Xeon 系統(tǒng)中執(zhí)行。數(shù)據(jù)顯示 10,000,000 個(gè)迭代的運(yùn)行時(shí)間,對(duì)于 ConcurrentHashMap
,標(biāo)準(zhǔn)化為一個(gè)線程的情況。可以看到直到許多線程,
ConcurrentHashMap
的性能仍保持可伸縮性,而 Hashtable
的性能在出現(xiàn)鎖定競(jìng)爭(zhēng)時(shí)幾乎立即下降。
與通常的服務(wù)器應(yīng)用程序相比,這個(gè)測(cè)試中的線程數(shù)看起來(lái)很少。然而,因?yàn)槊總€(gè)線程未進(jìn)行其他操作,僅是重復(fù)地選擇使用該表,所以這樣可以模擬在執(zhí)行
一些實(shí)際工作的情況下使用該表的大量線程的競(jìng)爭(zhēng)。
線程 |
ConcurrentHashMap |
Hashtable |
1 |
1.0 |
1.51 |
2 |
1.44 |
17.09 |
4 |
1.83 |
29.9 |
8 |
4.06 |
54.06 |
16 |
7.5 |
119.44 |
32 |
15.32 |
237.2 |
Lock 與 synchronized 與原子
下列基準(zhǔn)說(shuō)明了使用 java.util.concurrent
可能改進(jìn)可伸縮性的例子。該基準(zhǔn)將模擬旋轉(zhuǎn)骰子,使用線性同余隨機(jī)數(shù)生成器。有三個(gè)可用的隨機(jī)數(shù)生成器的實(shí)現(xiàn):一個(gè)使用同步來(lái)管理生成器的狀態(tài)(單一變量),一個(gè)使用 ReentrantLock
,另一個(gè)則使用 AtomicLong
。下圖顯示了在 8-way Ultrasparc3 系統(tǒng)上,逐漸增加線程數(shù)量時(shí)這三個(gè)版本的相對(duì)吞吐量。(該圖對(duì)原子變量方法的可伸縮性描述比較保守。)
圖 1. 使用同步、Lock 和 AtomicLong 的相對(duì)吞吐量

公平與不公平
java.util.concurrent
中許多類(lèi)中的另外一個(gè)定制元素是"公平"的問(wèn)題。公平鎖定或公平信號(hào)是指在其中根據(jù)先進(jìn)先出(FIFO)的原則給與線程鎖定或信號(hào)。ReentrantLock
、Semaphore
和 ReentrantReadWriteLock
的構(gòu)造函數(shù)都可以使用變量確定鎖定是否公平,或者是否允許闖入(線程獲得鎖定,即使它們等待的時(shí)間不是最長(zhǎng))。
雖然闖入鎖定的想法可能有些可笑,但實(shí)際上不公平、闖入的鎖定非常普遍,且通常很受歡迎。使用同步訪問(wèn)的內(nèi)置鎖定不是公平鎖定(且沒(méi)有辦法使它們公平)。相反,它們提供較弱的生病保證,要求所有線程最終都將獲得鎖定。
大多數(shù)應(yīng)用程序選擇(且應(yīng)該選擇)闖入鎖定而不是公平鎖定的原因是性能。在大多數(shù)情況下,完全的公平不是程序正確性的要求,真正公平的成本相當(dāng)高。下表向前面的面板中的表中添加了第四個(gè)數(shù)據(jù)集,并由一個(gè)公平鎖定管理對(duì) PRNG 狀態(tài)的訪問(wèn)。注意闖入鎖定與公平鎖定之間吞吐量的巨大差別。
圖 2. 使用同步、Lock、公平鎖定和 AtomicLong 的相對(duì)吞吐量

結(jié)束語(yǔ)
java.util.concurrent
包中包含大量有用的構(gòu)建快,可以用它們來(lái)改進(jìn)并發(fā)類(lèi)的性能、可伸縮性、線程安全和可維護(hù)性。
通過(guò)這些構(gòu)建快,應(yīng)該可以不再需要在您的代碼中大量使用同步、wait/notify 和 Thread.start()
,而用更高級(jí)別、標(biāo)準(zhǔn)化的、
高性能并發(fā)實(shí)用程序來(lái)替換它們。
凡是有該標(biāo)志的文章,都是該blog博主Caoer(草兒)原創(chuàng),凡是索引、收藏
、轉(zhuǎn)載請(qǐng)注明來(lái)處和原文作者。非常感謝。