如果說CountDownLatch是一次性的,那么CyclicBarrier正好可以循環使用。它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。所謂屏障點就是一組任務執行完畢的時刻。
清單1 一個使用CyclicBarrier的例子
package xylz.study.concurrency.lock;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
final CyclicBarrier barrier;
final int MAX_TASK;
public CyclicBarrierDemo(int cnt) {
barrier = new CyclicBarrier(cnt + 1);
MAX_TASK = cnt;
}
public void doWork(final Runnable work) {
new Thread() {
public void run() {
work.run();
try {
int index = barrier.await();
doWithIndex(index);
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
}
}.start();
}
private void doWithIndex(int index) {
if (index == MAX_TASK / 3) {
System.out.println("Left 30%.");
} else if (index == MAX_TASK / 2) {
System.out.println("Left 50%");
} else if (index == 0) {
System.out.println("run over");
}
}
public void waitForNext() {
try {
doWithIndex(barrier.await());
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
}
public static void main(String[] args) {
final int count = 10;
CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
for (int i = 0; i < 100; i++) {
demo.doWork(new Runnable() {
public void run() {
//do something
try {
Thread.sleep(1000L);
} catch (Exception e) {
return;
}
}
});
if ((i + 1) % count == 0) {
demo.waitForNext();
}
}
}
}
清單1描述的是一個周期性處理任務的例子,在這個例子中有一對的任務(100個),希望每10個為一組進行處理,當前僅當上一組任務處理完成后才能進行下一組,另外在每一組任務中,當任務剩下50%,30%以及所有任務執行完成時向觀察者發出通知。
在這個例子中,CyclicBarrierDemo 構建了一個count+1的任務組(其中一個任務時為了外界方便掛起主線程)。每一個子任務里,人物本身執行完畢后都需要等待同組內其它任務執行完成后才能繼續。同時在剩下任務50%、30%已經0時執行特殊的其他任務(發通知)。
很顯然CyclicBarrier有以下幾個特點:
-
await()方法將掛起線程,直到同組的其它線程執行完畢才能繼續
-
await()方法返回線程執行完畢的索引,注意,索引時從任務數-1開始的,也就是第一個執行完成的任務索引為parties-1,最后一個為0,這個parties為總任務數,清單中是cnt+1
-
CyclicBarrier 是可循環的,顯然名稱說明了這點。在清單1中,每一組任務執行完畢就能夠執行下一組任務。
另外除了CyclicBarrier除了以上特點外,還有以下幾個特點:
-
如果屏障操作不依賴于掛起的線程,那么任何線程都可以執行屏障操作。在清單1中可以看到并沒有指定那個線程執行50%、30%、0%的操作,而是一組線程(cnt+1)個中任何一個線程只要到達了屏障點都可以執行相應的操作
-
CyclicBarrier 的構造函數允許攜帶一個任務,這個任務將在0%屏障點執行,它將在await()==0后執行。
-
CyclicBarrier 如果在await時因為中斷、失敗、超時等原因提前離開了屏障點,那么任務組中的其他任務將立即被中斷,以InterruptedException異常離開線程。
-
所有await()之前的操作都將在屏障點之前運行,也就是CyclicBarrier 的內存一致性效果
CyclicBarrier 的所有API如下:
-
public CyclicBarrier(int parties) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。
-
public CyclicBarrier(int parties, Runnable barrierAction) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態時啟動,并在啟動 barrier 時執行給定的屏障操作,該操作由最后一個進入 barrier 的線程執行。
-
public int await() throws InterruptedException, BrokenBarrierException 在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。
-
public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間。
-
public int getNumberWaiting() 返回當前在屏障處等待的參與者數目。此方法主要用于調試和斷言。
-
public int getParties() 返回要求啟動此 barrier 的參與者數目。
-
public boolean isBroken() 查詢此屏障是否處于損壞狀態。
-
public void reset() 將屏障重置為其初始狀態。
針對以上API,下面來探討下CyclicBarrier 的實現原理,以及為什么有這樣的API。
清單2 CyclicBarrier.await*()的實現片段
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
清單2有點復雜,這里一點一點的剖析,并且還原到最原始的狀態。
利用前面學到的知識,我們知道要想讓線程等待其他線程執行完畢,那么已經執行完畢的線程(進入await*()方法)就需要park(),直到超時或者被中斷,或者被其它線程喚醒。
前面說過CyclicBarrier 的特點是要么大家都正常執行完畢,要么大家都異常被中斷,不會其中有一個被中斷而其它正常執行完畢的現象存在。這種特點叫all-or-none。類似的概念是原子操作中的要么大家都執行完,要么一個操作都不執行完。當前這其實是兩個概念了。要完成這樣的特點就必須有一個狀態來描述曾經是否有過線程被中斷(broken)了,這樣后面執行完的線程就該知道是否需要繼續等待了。而在CyclicBarrier 中Generation 就是為了完成這件事情的。Generation的定義非常簡單,整個結構就只有一個變量boolean broken = false;,定義是否發生了broken操作。
由于有競爭資源的存在(broken/index),所以毫無疑問需要一把鎖lock。拿到鎖后整個過程是這樣的:
-
檢查是否存在中斷位(broken),如果存在就立即以BrokenBarrierException異常返回。此異常描述的是線程進入屏障被破壞的等待狀態。否則進行2。
-
檢查當前線程是否被中斷,如果是那么就設置中斷位(使其它將要進入等待的線程知道),另外喚醒已經等待的線程,同時以InterruptedException異常返回,表示線程要處理中斷。否則進行3。
-
將剩余任務數減1,如果此時剩下的任務數為0,也就是達到了公共屏障點,那么就執行屏障點任務(如果有的話),同時創建新的Generation(在這個過程中會喚醒其它所有線程,因此當前線程是屏障點線程,那么其它線程就都應該在等待狀態)。否則進行4。
-
到這里說明還沒有到達屏障點,那么此時線程就應該park()。很顯然在下面的for循環中就是要park線程。這里park線程采用的是Condition.await()方法。也就是trip.await*()。為什么需要Condition?因為所有的await*()其實等待的都是一個條件,一旦條件滿足就應該都被喚醒,所以Condition整好滿足這個特點。所以到這里就會明白為什么在步驟3中到達屏障點時創建新的Generation的時候是一定要喚醒其它線程的原因了。
上面4個步驟其實只是描述主體結構,事實上整個過程中有非常多的邏輯來處理異常引發的問題,比如執行屏障點任務引發的異常,park線程超時引發的中斷異常和超時異常等等。所以對于await()而言,異常的處理比業務邏輯的處理更復雜,這就解釋了為什么await()的時候可能引發InterruptedException,BrokenBarrierException,TimeoutException 三種異常。
清單3 生成下一個循環周期并喚醒其它線程
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
清單3 描述了如何生成下一個循環周期的過程,在這個過程中當然需要使用Condition.signalAll()喚醒所有已經執行完成并且正在等待的線程。另外這里count描述的是還有多少線程需要執行,是為了線程執行完畢索引計數。
isBroken() 方法描述的就是generation.broken,也即線程組是否發生了異常。這里再一次解釋下為什么要有這個狀態的存在。
如果一個將要位于屏障點或者已經位于屏障點的而執行屏障點任務的線程發生了異常,那么即使喚醒了其它等待的線程,其它等待的線程也會因為循環等待而“死去”,因為再也沒有一個線程來喚醒這些第二次進行park的線程了。還有一個意圖是,如果屏障點都已經損壞了,那么其它將要等待屏障點的再線程掛起就沒有意義了。
寫到這里的時候非常不幸,用了4年多了臺燈終于“壽終正寢了”。
其實CyclicBarrier 還有一個reset方法,描述的是手動立即將所有線程中斷,恢復屏障點,進行下一組任務的執行。也就是與重新創建一個新的屏障點相比,可能維護的代價要小一些(減少同步,減少上一個CyclicBarrier 的管理等等)。
本來是想和Semaphore 一起將的,最后發現鋪開后就有點長了,而且也不利于理解和吸收,所以放到下一篇吧。
參考資料:
-
使用 CyclicBarrier 做線程間同步
-
CyclicBarrier And CountDownLatch Tutorial
-
線程—CyclicBarrier
-
Java線程學習筆記(十)CountDownLatch 和CyclicBarrier
-
關于多線程同步的初步教程--Barrier的設計及使用
-
Thread coordination with CountDownLatch and CyclicBarrier
-
如何充分利用多核CPU,計算很大的List中所有整數的和
©2009-2014 IMXYLZ
|求賢若渴