你所不知道的五件事情--java.util.concurrent(第一部分)
--使用并發(fā)集合類進(jìn)行多線程編程
這是Ted Neward在IBM developerWorks中5 things系列文章中的一篇,講述了關(guān)于Java并發(fā)集合API的一些應(yīng)用竅門,值得大家學(xué)習(xí)。(2010.05.24最后更新)
摘要:編寫既要性能良好又要防止應(yīng)用崩潰的多線程代碼確實(shí)很難--這也正是我們需要java.util.concurrent的原因。Ted Neward向你展示了像CopyOnWriteArrayList,BlockingQueue和ConcurrentMap這樣的并發(fā)集合類是如何為了并發(fā)編程需要而改進(jìn)標(biāo)準(zhǔn)集合類的。
并發(fā)集合API是Java 5的一大新特性,但由于對(duì)Annotation和泛型的熱捧,許多Java開發(fā)者忽視了這些API。另外(可能更真實(shí)的是),因?yàn)樵S多開發(fā)者猜想并發(fā)集合 API肯定很復(fù)雜,就像去嘗試解決一些問(wèn)題那樣,所以開發(fā)者們會(huì)回避java.util.concurrent包。
事實(shí)上,java.util.concurrent的很多類并不需要你費(fèi)很大力就能高效地解決通常的并發(fā)問(wèn)題。繼續(xù)看下去,你就能學(xué)到 java.util.concurrent中的類,如CopyOnWriteArrayList和BlockingQueue,是怎樣幫助你解決多線程編程可怕的挑戰(zhàn)。
1. TimeUnit
java.util.concurrent.TimeUnit本身并不是集合框架類,這個(gè)枚舉使得代碼非常易讀。使用TimeUnit能夠?qū)㈤_發(fā)者從與毫秒相關(guān)的困苦中解脫出來(lái),轉(zhuǎn)而他們自己的方法或API。
TimeUnit能與所有的時(shí)間單元協(xié)作,范圍從毫秒和微秒到天和小時(shí),這就意味著它能處理開發(fā)者可能用到的幾乎所有時(shí)間類型。還要感謝這個(gè)枚舉類型聲明的時(shí)間轉(zhuǎn)換方法,當(dāng)時(shí)間加快時(shí),它甚至能細(xì)致到把小時(shí)轉(zhuǎn)換回毫秒。
2. CopyOnWriteArrayList
制作數(shù)組的干凈復(fù)本是一項(xiàng)成本極高的操作,在時(shí)間和內(nèi)存這兩方面均有開銷,以至于在通常的應(yīng)用中不能考慮該方法;開發(fā)者常常求助于使用同步的 ArrayList來(lái)替代前述方法。但這也是一個(gè)比較有代價(jià)的選項(xiàng),因?yàn)楫?dāng)每次你遍歷訪問(wèn)該集合中的內(nèi)容時(shí),你不得不同步所有的方法,包括讀和寫,以確保內(nèi)存一致性。
在有大量用戶在讀取ArrayList而只有很少用戶對(duì)其進(jìn)行修改的這一場(chǎng)景中,上述方法將使成本結(jié)構(gòu)變得緩慢。
CopyOnWriteArrayList就是解決這一問(wèn)題的一個(gè)極好的寶貝工具。它的Javadoc描述到,ArrayList通過(guò)創(chuàng)建數(shù)組的干凈復(fù)本來(lái)實(shí)現(xiàn)可變操作(添加,修改,等等),而CopyOnWriteArrayList則是ArrayList的一個(gè)"線程安全"的變體。
對(duì)于任何修改操作,該集合類會(huì)在內(nèi)部將其內(nèi)容復(fù)制到一個(gè)新數(shù)組中,所以當(dāng)讀用戶訪問(wèn)數(shù)組的內(nèi)容時(shí)不會(huì)招致任何同步開銷(因?yàn)樗鼈儧](méi)有對(duì)可變數(shù)據(jù)進(jìn)行操作)。
本質(zhì)上,創(chuàng)建CopyOnWriteArrayList的想法,是出于應(yīng)對(duì)當(dāng)ArrayList無(wú)法滿足我們要求時(shí)的場(chǎng)景:經(jīng)常讀,而很少寫的集合對(duì)象,例如針對(duì)JavaBean事件的Listener。
3. BlockingQueue
BlockingQueue接口表明它是一個(gè)Queue,這就意味著它的元素是按先進(jìn)先出(FIFO)的次序進(jìn)行存儲(chǔ)的。以特定次序插入的元素會(huì)以相同的次序被取出--但根據(jù)插入保證,任何從空隊(duì)列中取出元素的嘗試都會(huì)堵塞調(diào)用線程直到該元素可被取出時(shí)為止。同樣地,任何向一個(gè)已滿隊(duì)列中插入元素的嘗試將會(huì)堵塞調(diào)用線程直到該隊(duì)列的存儲(chǔ)空間有空余時(shí)為止。
在不需要顯式地關(guān)注同步問(wèn)題時(shí),如何將由一個(gè)線程聚集的元素"交給"另一個(gè)線程進(jìn)行處理呢,BlockingQueue很靈巧地解決了這個(gè)問(wèn)題。Java Tutorial中Guarded Blocks一節(jié)是很好的例子。它使用手工同步和wait()/notifyAll()方法創(chuàng)建了一個(gè)單點(diǎn)(single-slot)受限緩沖,當(dāng)一個(gè)新的元素可被消費(fèi)且當(dāng)該點(diǎn)已經(jīng)準(zhǔn)備好被一個(gè)新的元素填充時(shí),該方法就會(huì)在線程之間發(fā)出信號(hào)。(詳情請(qǐng)見Guarded Blocks)
盡管教程Guarded Blocks中的代碼可以正常工作,但它比較長(zhǎng),有些凌亂,而且完全不直觀。誠(chéng)然,在Java平臺(tái)的早期時(shí)代,Java開發(fā)者們不得不;但現(xiàn)在已經(jīng)是 2010年了--問(wèn)題已經(jīng)得到改進(jìn)?
清單1展示的程序重寫了Guarded Blocks中的代碼,其中我使用ArrayBlockingQueue替代了手工編寫的Drop。
清單1. BlockingQueue
import java.util.*;
import java.util.concurrent.*;
class Producer
implements Runnable
{
private BlockingQueue<String> drop;
List<String> messages = Arrays.asList(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"Wouldn't you eat ivy too?");
public Producer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
for (String s : messages)
drop.put(s);
drop.put("DONE");
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
class Consumer
implements Runnable
{
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
String msg = null;
while (!((msg = drop.take()).equals("DONE")))
System.out.println(msg);
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
public class ABQApp
{
public static void main(String[] args)
{
BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
ArrayBlockingQueue也崇尚"公平"--即意味著,它能給予讀和寫線程先進(jìn)先出的訪問(wèn)次序。該方法可能是一種更高效的策略,但它也加大了造成線程饑餓的風(fēng)險(xiǎn)。(就是說(shuō),當(dāng)其它讀線程持有鎖時(shí),該策略可更高效地允許讀線程進(jìn)行執(zhí)行,但這也就會(huì)產(chǎn)生讀線程的常量流使寫線程總是無(wú)法執(zhí)行的風(fēng)險(xiǎn))
BlockingQueue也支持在方法中使用時(shí)間參數(shù),當(dāng)插入或取出元素出了問(wèn)題時(shí),方法需要返回以發(fā)出操作失敗的信號(hào),而該時(shí)間參數(shù)指定了在返回前應(yīng)該阻塞多長(zhǎng)時(shí)間。
4. ConcurrentMap
Map有一些細(xì)微的并發(fā)Bug,會(huì)使許多粗心的Java開發(fā)者誤入歧途。ConcurrentMap則是一個(gè)簡(jiǎn)單的決定方案。
當(dāng)有多個(gè)線程在訪問(wèn)一個(gè)Map時(shí),通常在儲(chǔ)存一個(gè)鍵/值對(duì)之前通常會(huì)使用方法containsKey()或get()去確定給出的鍵是否存在。即使用同步的Map,某個(gè)線程仍可在處理的過(guò)程中潛入其中,然后獲得對(duì)Map的控制權(quán)。問(wèn)題在于,在get()方法的開始處獲得了鎖,然后在調(diào)用方法put()去重新獲得該鎖之前會(huì)先釋放它。這就導(dǎo)致了競(jìng)爭(zhēng)條件:兩個(gè)線程之間的競(jìng)爭(zhēng),根據(jù)哪個(gè)線程先執(zhí)行,其結(jié)果將不盡相同。
如果兩個(gè)線程在同一時(shí)刻調(diào)用一個(gè)方法,一個(gè)測(cè)試鍵是否存在,另一個(gè)則置入新的鍵/值對(duì),那么在此過(guò)程中,第一個(gè)線程的值將會(huì)丟失。幸運(yùn)地是,ConcurrentMap接口支持一組額外的方法,設(shè)計(jì)這些方法是為了在一個(gè)鎖中做兩件事情:例如,putIfAbsent()首先進(jìn)行測(cè)試,之后只有當(dāng)該鍵還未存儲(chǔ)到Map中時(shí),才執(zhí)行置入操作。
5. SynchronousQueues
根據(jù)Javadoc的描述,SynchronousQueue是一個(gè)很有趣的創(chuàng)造物:
一個(gè)阻塞隊(duì)列在每次的插入操作中必須等等另一線程執(zhí)行對(duì)應(yīng)的刪除線程,反之亦然。同步隊(duì)列并沒(méi)有任何內(nèi)部的存儲(chǔ)空間,一個(gè)都沒(méi)有。
本質(zhì)上,SynchronousQueue是之前提及的BlockingQueue的另一種實(shí)現(xiàn)。使用ArrayBlockingQueue利用的阻塞語(yǔ)義,SynchronousQueue給予我們一種極輕量級(jí)的途徑在兩個(gè)線程之間交換單個(gè)元素。在清單2中,我用SynchronousQueue替代 ArrayBlockingQueue重寫了清單1的代碼:
清單2 SynchronousQueue
import java.util.*;
import java.util.concurrent.*;
class Producer
implements Runnable
{
private BlockingQueue<String> drop;
List<String> messages = Arrays.asList(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"Wouldn't you eat ivy too?");
public Producer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
for (String s : messages)
drop.put(s);
drop.put("DONE");
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
class Consumer
implements Runnable
{
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
String msg = null;
while (!((msg = drop.take()).equals("DONE")))
System.out.println(msg);
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
public class SynQApp
{
public static void main(String[] args)
{
BlockingQueue<String> drop = new SynchronousQueue<String>();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
上述實(shí)現(xiàn)看起來(lái)幾乎相同,但該應(yīng)用程序已新加了一個(gè)好處,在這個(gè)實(shí)現(xiàn)中,只有當(dāng)有線程正在等待消費(fèi)某個(gè)元素時(shí),SynchronousQueue才會(huì)允許將該元素插入到隊(duì)列中。
就實(shí)踐方式來(lái)看,SynchronousQueue類似于Ada或CSP等語(yǔ)言中的"交會(huì)通道(Rendezvous Channel)"。在其它環(huán)境中,有時(shí)候被稱為"連接"。
結(jié)論
當(dāng)Java運(yùn)行時(shí)類庫(kù)預(yù)先已經(jīng)提供了方便使用的等價(jià)物時(shí),為什么還要費(fèi)力地向集合框架中引入并發(fā)呢?本系列的下一篇文章將探索 java.util.concurrent命名空間的更多內(nèi)容。
請(qǐng)關(guān)注你所不知道的五件事情--java.util.concurrent(第二部分)