接近一周沒(méi)更新《Java線(xiàn)程》專(zhuān)欄了,主要是這周工作上比較忙,生活上也比較忙,呵呵,進(jìn)入正題,上一篇講述了并發(fā)包下的Lock,Lock可以更好的解決線(xiàn)程同步問(wèn)題,使之更面向?qū)ο螅⑶襌eadWriteLock在處理同步時(shí)更強(qiáng)大,那么同樣,線(xiàn)程間僅僅互斥是不夠的,還需要通信,本篇的內(nèi)容是基于上篇之上,使用Lock如何處理線(xiàn)程通信。

        那么引入本篇的主角,Condition,Condition 將 Object 監(jiān)視器方法(wait、notify 和 notifyAll)分解成截然不同的對(duì)象,以便通過(guò)將這些對(duì)象與任意 Lock 實(shí)現(xiàn)組合使用,為每個(gè)對(duì)象提供多個(gè)等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語(yǔ)句的使用,Condition 替代了 Object 監(jiān)視器方法的使用。下面將之前寫(xiě)過(guò)的一個(gè)線(xiàn)程通信的例子替換成用Condition實(shí)現(xiàn)(Java線(xiàn)程(三)),代碼如下:

  1. public class ThreadTest2 {  
  2.     public static void main(String[] args) {  
  3.         final Business business = new Business();  
  4.         new Thread(new Runnable() {  
  5.             @Override  
  6.             public void run() {  
  7.                 threadExecute(business, "sub");  
  8.             }  
  9.         }).start();  
  10.         threadExecute(business, "main");  
  11.     }     
  12.     public static void threadExecute(Business business, String threadType) {  
  13.         for(int i = 0; i < 100; i++) {  
  14.             try {  
  15.                 if("main".equals(threadType)) {  
  16.                     business.main(i);  
  17.                 } else {  
  18.                     business.sub(i);  
  19.                 }  
  20.             } catch (InterruptedException e) {  
  21.                 e.printStackTrace();  
  22.             }  
  23.         }  
  24.     }  
  25. }  
  26. class Business {  
  27.     private boolean bool = true;  
  28.     private Lock lock = new ReentrantLock();  
  29.     private Condition condition = lock.newCondition();   
  30.     public /*synchronized*/ void main(int loop) throws InterruptedException {  
  31.         lock.lock();  
  32.         try {  
  33.             while(bool) {                 
  34.                 condition.await();//this.wait();  
  35.             }  
  36.             for(int i = 0; i < 100; i++) {  
  37.                 System.out.println("main thread seq of " + i + ", loop of " + loop);  
  38.             }  
  39.             bool = true;  
  40.             condition.signal();//this.notify();  
  41.         } finally {  
  42.             lock.unlock();  
  43.         }  
  44.     }     
  45.     public /*synchronized*/ void sub(int loop) throws InterruptedException {  
  46.         lock.lock();  
  47.         try {  
  48.             while(!bool) {  
  49.                 condition.await();//this.wait();  
  50.             }  
  51.             for(int i = 0; i < 10; i++) {  
  52.                 System.out.println("sub thread seq of " + i + ", loop of " + loop);  
  53.             }  
  54.             bool = false;  
  55.             condition.signal();//this.notify();  
  56.         } finally {  
  57.             lock.unlock();  
  58.         }  
  59.     }  
  60. }  
        在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統(tǒng)線(xiàn)程的通信方式,Condition都可以實(shí)現(xiàn),這里注意,Condition是被綁定到Lock上的,要?jiǎng)?chuàng)建一個(gè)Lock的Condition必須用newCondition()方法。

        這樣看來(lái),Condition和傳統(tǒng)的線(xiàn)程通信沒(méi)什么區(qū)別,Condition的強(qiáng)大之處在于它可以為多個(gè)線(xiàn)程間建立不同的Condition,下面引入API中的一段代碼,加以說(shuō)明。

  1. class BoundedBuffer {  
  2.    final Lock lock = new ReentrantLock();//鎖對(duì)象  
  3.    final Condition notFull  = lock.newCondition();//寫(xiě)線(xiàn)程條件   
  4.    final Condition notEmpty = lock.newCondition();//讀線(xiàn)程條件   
  5.   
  6.    final Object[] items = new Object[100];//緩存隊(duì)列  
  7.    int putptr/*寫(xiě)索引*/, takeptr/*讀索引*/, count/*隊(duì)列中存在的數(shù)據(jù)個(gè)數(shù)*/;  
  8.   
  9.    public void put(Object x) throws InterruptedException {  
  10.      lock.lock();  
  11.      try {  
  12.        while (count == items.length)//如果隊(duì)列滿(mǎn)了   
  13.          notFull.await();//阻塞寫(xiě)線(xiàn)程  
  14.        items[putptr] = x;//賦值   
  15.        if (++putptr == items.length) putptr = 0;//如果寫(xiě)索引寫(xiě)到隊(duì)列的最后一個(gè)位置了,那么置為0  
  16.        ++count;//個(gè)數(shù)++  
  17.        notEmpty.signal();//喚醒讀線(xiàn)程  
  18.      } finally {  
  19.        lock.unlock();  
  20.      }  
  21.    }  
  22.   
  23.    public Object take() throws InterruptedException {  
  24.      lock.lock();  
  25.      try {  
  26.        while (count == 0)//如果隊(duì)列為空  
  27.          notEmpty.await();//阻塞讀線(xiàn)程  
  28.        Object x = items[takeptr];//取值   
  29.        if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到隊(duì)列的最后一個(gè)位置了,那么置為0  
  30.        --count;//個(gè)數(shù)--  
  31.        notFull.signal();//喚醒寫(xiě)線(xiàn)程  
  32.        return x;  
  33.      } finally {  
  34.        lock.unlock();  
  35.      }  
  36.    }   
  37.  }  
        這是一個(gè)處于多線(xiàn)程工作環(huán)境下的緩存區(qū),緩存區(qū)提供了兩個(gè)方法,put和take,put是存數(shù)據(jù),take是取數(shù)據(jù),內(nèi)部有個(gè)緩存隊(duì)列,具體變量和方法說(shuō)明見(jiàn)代碼,這個(gè)緩存區(qū)類(lèi)實(shí)現(xiàn)的功能:有多個(gè)線(xiàn)程往里面存數(shù)據(jù)和從里面取數(shù)據(jù),其緩存隊(duì)列(先進(jìn)先出后進(jìn)后出)能緩存的最大數(shù)值是100,多個(gè)線(xiàn)程間是互斥的,當(dāng)緩存隊(duì)列中存儲(chǔ)的值達(dá)到100時(shí),將寫(xiě)線(xiàn)程阻塞,并喚醒讀線(xiàn)程,當(dāng)緩存隊(duì)列中存儲(chǔ)的值為0時(shí),將讀線(xiàn)程阻塞,并喚醒寫(xiě)線(xiàn)程,下面分析一下代碼的執(zhí)行過(guò)程:

        1. 一個(gè)寫(xiě)線(xiàn)程執(zhí)行,調(diào)用put方法;

        2. 判斷count是否為100,顯然沒(méi)有100;

        3. 繼續(xù)執(zhí)行,存入值;

        4. 判斷當(dāng)前寫(xiě)入的索引位置++后,是否和100相等,相等將寫(xiě)入索引值變?yōu)?,并將count+1;

        5. 僅喚醒讀線(xiàn)程阻塞隊(duì)列中的一個(gè);

        6. 一個(gè)讀線(xiàn)程執(zhí)行,調(diào)用take方法;

        7. ……

        8. 僅喚醒寫(xiě)線(xiàn)程阻塞隊(duì)列中的一個(gè)。

        這就是多個(gè)Condition的強(qiáng)大之處,假設(shè)緩存隊(duì)列中已經(jīng)存滿(mǎn),那么阻塞的肯定是寫(xiě)線(xiàn)程,喚醒的肯定是讀線(xiàn)程,相反,阻塞的肯定是讀線(xiàn)程,喚醒的肯定是寫(xiě)線(xiàn)程,那么假設(shè)只有一個(gè)Condition會(huì)有什么效果呢,緩存隊(duì)列中已經(jīng)存滿(mǎn),這個(gè)Lock不知道喚醒的是讀線(xiàn)程還是寫(xiě)線(xiàn)程了,如果喚醒的是讀線(xiàn)程,皆大歡喜,如果喚醒的是寫(xiě)線(xiàn)程,那么線(xiàn)程剛被喚醒,又被阻塞了,這時(shí)又去喚醒,這樣就浪費(fèi)了很多時(shí)間。

        本文來(lái)自:高爽|Coder,原文地址:http://blog.csdn.net/ghsau/article/details/7481142,轉(zhuǎn)載請(qǐng)注明。