接近一周沒更新《Java線程》專欄了,主要是這周工作上比較忙,生活上也比較忙,呵呵,進入正題,上一篇講述了并發包下的Lock,Lock可以更好的解決線程同步問題,使之更面向對象,并且ReadWriteLock在處理同步時更強大,那么同樣,線程間僅僅互斥是不夠的,還需要通信,本篇的內容是基于上篇之上,使用Lock如何處理線程通信。
那么引入本篇的主角,Condition,Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。下面將之前寫過的一個線程通信的例子替換成用Condition實現(Java線程(三)),代碼如下:
- public class ThreadTest2 {
- public static void main(String[] args) {
- final Business business = new Business();
- new Thread(new Runnable() {
- @Override
- public void run() {
- threadExecute(business, "sub");
- }
- }).start();
- threadExecute(business, "main");
- }
- public static void threadExecute(Business business, String threadType) {
- for(int i = 0; i < 100; i++) {
- try {
- if("main".equals(threadType)) {
- business.main(i);
- } else {
- business.sub(i);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- class Business {
- private boolean bool = true;
- private Lock lock = new ReentrantLock();
- private Condition condition = lock.newCondition();
- public /*synchronized*/ void main(int loop) throws InterruptedException {
- lock.lock();
- try {
- while(bool) {
- condition.await();//this.wait();
- }
- for(int i = 0; i < 100; i++) {
- System.out.println("main thread seq of " + i + ", loop of " + loop);
- }
- bool = true;
- condition.signal();//this.notify();
- } finally {
- lock.unlock();
- }
- }
- public /*synchronized*/ void sub(int loop) throws InterruptedException {
- lock.lock();
- try {
- while(!bool) {
- condition.await();//this.wait();
- }
- for(int i = 0; i < 10; i++) {
- System.out.println("sub thread seq of " + i + ", loop of " + loop);
- }
- bool = false;
- condition.signal();//this.notify();
- } finally {
- lock.unlock();
- }
- }
- }
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通信方式,Condition都可以實現,這里注意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。
這樣看來,Condition和傳統的線程通信沒什么區別,Condition的強大之處在于它可以為多個線程間建立不同的Condition,下面引入API中的一段代碼,加以說明。
- class BoundedBuffer {
- final Lock lock = new ReentrantLock();//鎖對象
- final Condition notFull = lock.newCondition();//寫線程條件
- final Condition notEmpty = lock.newCondition();//讀線程條件
-
- final Object[] items = new Object[100];//緩存隊列
- int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/;
-
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)//如果隊列滿了
- notFull.await();//阻塞寫線程
- items[putptr] = x;//賦值
- if (++putptr == items.length) putptr = 0;//如果寫索引寫到隊列的最后一個位置了,那么置為0
- ++count;//個數++
- notEmpty.signal();//喚醒讀線程
- } finally {
- lock.unlock();
- }
- }
-
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)//如果隊列為空
- notEmpty.await();//阻塞讀線程
- Object x = items[takeptr];//取值
- if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到隊列的最后一個位置了,那么置為0
- --count;//個數--
- notFull.signal();//喚醒寫線程
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
這是一個處于多線程工作環境下的緩存區,緩存區提供了兩個方法,put和take,put是存數據,take是取數據,內部有個緩存隊列,具體變量和方法說明見代碼,這個緩存區類實現的功能:有多個線程往里面存數據和從里面取數據,其緩存隊列(先進先出后進后出)能緩存的最大數值是100,多個線程間是互斥的,當緩存隊列中存儲的值達到100時,將寫線程阻塞,并喚醒讀線程,當緩存隊列中存儲的值為0時,將讀線程阻塞,并喚醒寫線程,下面分析一下代碼的執行過程:
1. 一個寫線程執行,調用put方法;
2. 判斷count是否為100,顯然沒有100;
3. 繼續執行,存入值;
4. 判斷當前寫入的索引位置++后,是否和100相等,相等將寫入索引值變為0,并將count+1;
5. 僅喚醒讀線程阻塞隊列中的一個;
6. 一個讀線程執行,調用take方法;
7. ……
8. 僅喚醒寫線程阻塞隊列中的一個。
這就是多個Condition的強大之處,假設緩存隊列中已經存滿,那么阻塞的肯定是寫線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫線程,那么假設只有一個Condition會有什么效果呢,緩存隊列中已經存滿,這個Lock不知道喚醒的是讀線程還是寫線程了,如果喚醒的是讀線程,皆大歡喜,如果喚醒的是寫線程,那么線程剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。
本文來自:高爽|Coder,原文地址:http://blog.csdn.net/ghsau/article/details/7481142,轉載請注明。