基于線程安全的一些原則來編程當(dāng)然可以避免并發(fā)問題,但不是所有人都能寫出高質(zhì)量的線程安全的代碼,并且如果代碼里到處都是線程安全的控制也極大地影響了代碼可讀性和可維護(hù)性。因此,Java平臺為了解決這個(gè)問題,提供了很多線程安全的類和并發(fā)工具,通過這些類和工具就能更簡便地寫線程安全的代碼。歸納一下有以下幾種:
- 同步容器類
- 并發(fā)容器類
- 生產(chǎn)者和消費(fèi)者模式
- 阻塞和可中斷方法
- Synchronizer
這些類和方法的使用都可以從JDK DOC查到,但在具體使用中還是有很多問題需要注意
同步容器類
同步容器類就是一些經(jīng)過同步處理了的容器類,比如List有Vector,Map有Hashtable,查看其源碼發(fā)現(xiàn)其保證線程安全的方式就是把每個(gè)對外暴露的存取方法用synchronized關(guān)鍵字同步化,這樣做我們立馬會想到有以下問題:
1)性能有問題
同步化了所有存取方法,就表明所有對這個(gè)容器對象的操作將會串行,這樣做來得倒是干凈,但性能的代價(jià)也是很可觀的
2)復(fù)合操作問題
同步容器類只是同步了單一操作,如果客戶端是一組復(fù)合操作,它就沒法同步了,依然需要客戶端做額外同步,比如以下代碼:
- public static Object getLast(Vector list) {
- int lastIndex = list.size() - 1;
- return list.get(lastIndex);
- }
- public static void deleteLast(Vector list) {
- int lastIndex = list.size() - 1;
- list.remove(lastIndex);
- }
getLast和deleteLast都是復(fù)合操作,由先前對原子性的分析可以判斷,這依然存在線程安全問題,有可能會拋出ArrayIndexOutOfBoundsException的異常,錯(cuò)誤產(chǎn)生的邏輯如下所示:

解決辦法就是通過對這些復(fù)合操作加鎖
3)迭代器并發(fā)問題
Java Collection進(jìn)行迭代的標(biāo)準(zhǔn)時(shí)使用Iterator,無論是使用老的方式迭代循環(huán),還是Java 5提供for-each新方式,都需要對迭代的整個(gè)過程加鎖,不然就會有Concurrentmodificationexception異常拋出。
此外有些迭代也是隱含的,比如容器類的toString方法,或containsAll, removeAll, retainAll等方法都會隱含地對容器進(jìn)行迭代
并發(fā)容器類
正是由于同步容器類有以上問題,導(dǎo)致這些類成了雞肋,于是Java 5推出了并發(fā)容器類,Map對應(yīng)的有ConcurrentHashMap,List對應(yīng)的有CopyOnWriteArrayList。與同步容器類相比,它有以下特性:
- 更加細(xì)化的鎖機(jī)制。同步容器直接把容器對象做為鎖,這樣就把所有操作串行化,其實(shí)這是沒必要的,過于悲觀,而并發(fā)容器采用更細(xì)粒度的鎖機(jī)制,保證一些不會發(fā)生并發(fā)問題的操作進(jìn)行并行執(zhí)行
- 附加了一些原子性的復(fù)合操作。比如putIfAbsent方法
- 迭代器的弱一致性。它在迭代過程中不再拋出Concurrentmodificationexception異常,而是弱一致性。在并發(fā)高的情況下,有可能size和isEmpty方法不準(zhǔn)確,但真正在并發(fā)環(huán)境下這些方法也沒什么作用。
- CopyOnWriteArrayList采用寫入時(shí)復(fù)制的方式避開并發(fā)問題。這其實(shí)是通過冗余和不可變性來解決并發(fā)問題,在性能上會有比較大的代價(jià),但如果寫入的操作遠(yuǎn)遠(yuǎn)小于迭代和讀操作,那么性能就差別不大了
生產(chǎn)者和消費(fèi)者模式
大學(xué)時(shí)學(xué)習(xí)操作系統(tǒng)多會為生產(chǎn)者和消費(fèi)者模式而頭痛,也是每次考試肯定會涉及到的,而Java知道大家很憷這個(gè)模式的并發(fā)復(fù)雜性,于是乎提供了阻塞隊(duì)列(BlockingQueue)來滿足這個(gè)模式的需求。阻塞隊(duì)列說起來很簡單,就是當(dāng)隊(duì)滿的時(shí)候?qū)懢€程會等待,直到隊(duì)列不滿的時(shí)候;當(dāng)隊(duì)空的時(shí)候讀線程會等待,直到隊(duì)不空的時(shí)候。實(shí)現(xiàn)這種模式的方法很多,其區(qū)別也就在于誰的消耗更低和等待的策略更優(yōu)。以LinkedBlockingQueue的具體實(shí)現(xiàn)為例,它的put源碼如下:
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- try {
- while (count.get() == capacity)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- insert(e);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- }
撇開其鎖的具體實(shí)現(xiàn),其流程就是我們在操作系統(tǒng)課上學(xué)習(xí)到的標(biāo)準(zhǔn)生產(chǎn)者模式,看來那些枯燥的理論還是有用武之地的。其中,最核心的還是Java的鎖實(shí)現(xiàn),有興趣的朋友可以再進(jìn)一步深究一下
阻塞和可中斷方法
由LinkedBlockingQueue的put方法可知,它是通過線程的阻塞和中斷阻塞來實(shí)現(xiàn)等待的。當(dāng)調(diào)用一個(gè)會拋出InterruptedException的方法時(shí),就成為了一個(gè)阻塞的方法,要為響應(yīng)中斷做好準(zhǔn)備。處理中斷可有以下方法:
- 傳遞InterruptedException。把捕獲的InterruptedException再往上拋,使其調(diào)用者感知到,當(dāng)然在拋之前需要完成你自己應(yīng)該做的清理工作,LinkedBlockingQueue的put方法就是采取這種方式
- 中斷其線程。在不能拋出異常的情況下,可以直接調(diào)用Thread.interrupt()將其中斷。
Synchronizer
Synchronizer不是一個(gè)類,而是一種滿足一個(gè)種規(guī)則的類的統(tǒng)稱。它有以下特性:
- 它是一個(gè)對象
- 封裝狀態(tài),而這些狀態(tài)決定著線程執(zhí)行到某一點(diǎn)是通過還是被迫等待
- 提供操作狀態(tài)的方法
其實(shí)BlockingQueue就是一種Synchronizer。Java還提供了其他幾種Synchronizer
1)CountDownLatch
CountDownLatch是一種閉鎖,它通過內(nèi)部一個(gè)計(jì)數(shù)器count來標(biāo)示狀態(tài),當(dāng)count>0時(shí),所有調(diào)用其await方法的線程都需等待,當(dāng)通過其countDown方法將count降為0時(shí)所有等待的線程將會被喚起。使用實(shí)例如下所示:
- public class TestHarness {
- public long timeTasks(int nThreads, final Runnable task)
- throws InterruptedException {
- final CountDownLatch startGate = new CountDownLatch(1);
- final CountDownLatch endGate = new CountDownLatch(nThreads);
- for (int i = 0; i < nThreads; i++) {
- Thread t = new Thread() {
- public void run() {
- try {
- startGate.await();
- try {
- task.run();
- } finally {
- endGate.countDown();
- }
- } catch (InterruptedException ignored) { }
- }
- };
- t.start();
- }
- long start = System.nanoTime();
- startGate.countDown();
- endGate.await();
- long end = System.nanoTime();
- return end-start;
- }
- }
2)Semaphore
Semaphore類實(shí)際上就是操作系統(tǒng)中談到的信號量的一種實(shí)現(xiàn),其原理就不再累述,可見探索并發(fā)編程------操作系統(tǒng)篇
具體使用就是通過其acquire和release方法來完成,如以下示例:
- public class BoundedHashSet<T> {
- private final Set<T> set;
- private final Semaphore sem;
- public BoundedHashSet(int bound) {
- this.set = Collections.synchronizedSet(new HashSet<T>());
- sem = new Semaphore(bound);
- }
- public boolean add(T o) throws InterruptedException {
- sem.acquire();
- boolean wasAdded = false;
- try {
- wasAdded = set.add(o);
- return wasAdded;
- }
- finally {
- if (!wasAdded)
- sem.release();
- }
- }
- public boolean remove(Object o) {
- boolean wasRemoved = set.remove(o);
- if (wasRemoved)
- sem.release();
- return wasRemoved;
- }
- }
3)關(guān)卡
關(guān)卡和閉鎖類似,也是阻塞一組線程,直到某件事情發(fā)生,而不同在于關(guān)卡是等到符合某種條件的所有線程都達(dá)到關(guān)卡點(diǎn)。具體使用上可以用CyclicBarrier來應(yīng)用關(guān)卡
以上是Java提供的一些并發(fā)工具,既然是工具就有它所適用的場景,因此需要知道它的特性,這樣才能在具體場景下選擇最合適的工具。