http://www-128.ibm.com/developerworks/cn/java/j-csp2/
用 JCSP 進(jìn)行并發(fā)編程

級別: 中級

Abhijit Belapurkar
高級技術(shù)架構(gòu)師, Infosys Technologies Limited
2005 年 7 月 07 日

在這篇由三部分構(gòu)成的面向 Java 程序員的通信順序進(jìn)程 (CSP)介紹的第二期中,Abhijit Belapurkar 將介紹如何使用基于 Java 的 JCSP 庫來編寫能夠確保沒有并發(fā)問題(例如爭用風(fēng)險、 死鎖、活動鎖、資源耗盡)的 Java 應(yīng)用程序。

CSP 是對并發(fā)對象之間的復(fù)雜交互進(jìn)行建模的范式。使用 CSP 的主要優(yōu)勢之一是:對程序每一階段所包含對象的行為進(jìn)行精確地指定和驗證。CSP 的理論和實踐對于并發(fā)設(shè)計和編程領(lǐng)域有深遠(yuǎn)的影響。它是 occam 這樣的編程語言的基礎(chǔ),對其他語言(例如 Ada)的設(shè)計也有影響。就像在本文 第 1 部分 簡要討論過的,由于適合在 Java 平臺上進(jìn)行安全、優(yōu)雅的多線程編程,所以 CSP 對 Java 開發(fā)人員也是無價的。

在我的這篇由三部分組成的 Java 平臺 CSP 編程介紹的第 2 部分中,我把重點(diǎn)放在 CSP 理論和實踐上,特別是它在 Java 語言中多線程程序設(shè)計的應(yīng)用。我將從 CSP 理論的概述開始介紹,然后介紹基于 Java 的 JCSP 庫實現(xiàn),JCSP 的核心是 CSP 。

CSP 基礎(chǔ)
CSP 的基本構(gòu)造是進(jìn)程和進(jìn)程之間各種形式的通信。CSP 中的每件事都是進(jìn)程,甚至(子)進(jìn)程網(wǎng)絡(luò)也是進(jìn)程。但是,在進(jìn)程之間沒有直接交互 —— 所有交互都通過 CSP 的同步對象(例如各級進(jìn)程訂閱的通信通道和事件邊界)實現(xiàn)的。

CSP 進(jìn)程 與典型的 Java 對象不同:封裝在進(jìn)程組件中的數(shù)據(jù) 操縱數(shù)據(jù)的算法都是私有的。也就是說,進(jìn)程沒有對外可以調(diào)用的方法(除了啟動進(jìn)程必須調(diào)用的方法之外),算法只能在進(jìn)程自己的控制線程內(nèi)執(zhí)行。如果把這種方法與 Java 語言中的方法調(diào)用進(jìn)行對比,就可以立即看出 CSP 是如何消除顯式鎖定的需求的:

不要錯過本系列的其余部分!
“適用于 Java 程序員的 CSP”是對通信順序進(jìn)程(Communicating Sequential Processes —— CSP)進(jìn)行介紹的由三部分組成的一個系列。CSP 是并發(fā)編程的一個范式,它承認(rèn)了并發(fā)編程的復(fù)雜性,卻并沒有把復(fù)雜性留給開發(fā)人員。請參閱本系列的其他部分:

第 2 部分:用 JCSP 進(jìn)行并發(fā)編程

第 3 部分: JCSP 的高級主題

在 Java 語言中,在對象上調(diào)用的方法總是在調(diào)用者的線程中運(yùn)行。但也有一個特殊的控制線程是通過系統(tǒng)中的多個對象進(jìn)行工作的。對于大部分情況來說,對象沒有自己的生命 —— 它們只是在運(yùn)行線程調(diào)用它們的方法時才存在。因此,不同的執(zhí)行線程可以在同一時間試圖調(diào)用同一對象的同一方法,就像 第 1 部分所討論的那樣。顯然,這種情況在 CSP 中永遠(yuǎn)不會發(fā)生。

通信通道和進(jìn)程網(wǎng)絡(luò)
進(jìn)程間通信最簡單的機(jī)制就是通過通道讀寫數(shù)據(jù)。CSP 中基本的通道構(gòu)造是同步的(synchronous)點(diǎn)對點(diǎn)的(point-to-point);也就是說,它不包含內(nèi)部緩沖,并且把一個進(jìn)程連接到另外一個進(jìn)程。從這個基本通道開始,有可能構(gòu)建多個閱讀器/寫入器通道(即一對多、多對一和多對多)。

CSP 中的進(jìn)程構(gòu)成了復(fù)雜系統(tǒng)的基本構(gòu)造塊 —— 一個進(jìn)程可以同一個或多個其他進(jìn)程連接起來(全都設(shè)置成并行的),從而構(gòu)成一個進(jìn)程網(wǎng)絡(luò)。可以把這個網(wǎng)絡(luò)本身想像成一個進(jìn)程,這個進(jìn)程還可以遞歸地與其他進(jìn)程、它們自己的網(wǎng)絡(luò)或者其他類似東西組合在一起,形成一個為了最好地解決手上問題而設(shè)計的復(fù)雜排列的金字塔。

如果單獨(dú)考慮,那么進(jìn)程僅僅是一個獨(dú)立的串行程序,它只與外部 I/O 設(shè)備交互。這個程序本身并不需要考慮在 I/O 通道另一端的進(jìn)程是否存在或?qū)Ψ降男再|(zhì)。

CSP 理論已經(jīng)在許多基于 Java 的框架中實現(xiàn)了,包括面向 Java 的通信順序進(jìn)程(Communicating Sequential Processes for Java,JCSP) 庫。

關(guān)于 CSP 的更多內(nèi)容
本文提供了對 CSP 復(fù)雜主題的一般性介紹。如果對于深入理論底層的數(shù)學(xué)機(jī)制有興趣,那么請參閱 C.A.R. Hoare 的原文章以及他針對這一主題撰寫的書。要想獲得 CSP 理論的最新發(fā)展(這些年已經(jīng)做了更新),請參閱 Bill Roscoe 撰寫的書。要想獲得廣泛的參考來源,請參考牛津大學(xué)計算機(jī)實驗室和 WoTUG 主頁的 CSP 歸檔。還請參閱 參考資料,以獲取所有這些參考和更多內(nèi)容的鏈接。

JCSP 庫
JCSP 庫由英國坎特伯雷市肯特大學(xué)的 Peter Welch 教授和 Paul Austin 開發(fā)(請參閱 參考資料)。對于本文余下的大部分內(nèi)容來說,我會把重點(diǎn)放在 CSP 概念在 JCSP 中的實現(xiàn)方式上。因為 Java 語言沒有提供對 CSP 構(gòu)造的自帶支持,所以 JCSP 庫內(nèi)部使用 Java 語言 實際 支持的、自帶的并發(fā)構(gòu)造,例如 synchronizedwaitnotify。為了幫助您正確地理解 JCSP 的工作方式,我將從這些 Java 構(gòu)造的角度對 JCSP 庫中某些類的內(nèi)部實現(xiàn)進(jìn)行了解釋。

注意,后續(xù)章節(jié)中的示例基于或來自 JCSP 庫的 Javadoc 文檔,或者基于可以在 JCSP 主頁上得到的演示文稿。

JCSP 中的進(jìn)程
在 JCSP 中,進(jìn)程實際上就是實現(xiàn)了 CSProcess 接口的類。清單 1 顯示了這個接口:

清單 1. CSProcess 接口

package jcsp.lang;

public interface CSProcess
{
    public void run();
}

注意,CSProcess 接口看起來就像 Java 語言的 Runnable 接口,而且它也充當(dāng)著類似的角色。雖然 JCSP 目前是用標(biāo)準(zhǔn) Java API 實現(xiàn)的,但是并不需要這樣,而且在未來可能真的不需要這樣。出于這個原因,在 JCSP 中沒有直接使用 Runnable 接口。

驗證 JCSP 程序
Peter Welch 教授和其他人構(gòu)建了一個正式的 CSP 模型,從而可以用 CSP 術(shù)語對多線程 Java 程序進(jìn)行分析,并驗證程序是否會造成導(dǎo)致爭用風(fēng)險、死鎖和資源耗盡的 bug。因為 JCSP 庫使用模型底部的監(jiān)視器機(jī)制 (即 synchronized()wait()notify()notifyAll()) ,所以基于 JCSP 的應(yīng)用程序可以用各種軟件工程工具進(jìn)行驗證,其中包括一些商業(yè)化支持的工具。請參閱 參考資料,學(xué)習(xí)關(guān)于 FDR2 的內(nèi)容,這是一個針對基于 CSP 的程序的模型檢測工具。

JCSP 定義了兩個接口用于從通道讀取對象和向通道寫入對象。從通道讀取對象的接口叫作 ChannelInput ,它只有一個方法,叫作 read()。如果進(jìn)程調(diào)用一個實現(xiàn) ChannelInput 接口的對象的這個方法,那么進(jìn)程會阻塞,直到在通道另一端的進(jìn)程實際向通道寫入了一個對象。 一旦在通道上有對象可用,對象就被返回給調(diào)用進(jìn)程。類似地,ChannelOutput 接口也只有一個方法,叫作 write(Object o)。如果進(jìn)程調(diào)用 一個實現(xiàn) ChannelOutput 接口的對象的這個方法,進(jìn)程也會阻塞,直到通道接受對象。正如前面提到過的,最簡單的通道類型沒有緩沖,所以它在另一端(讀取)的進(jìn)程調(diào)用 read() 之前不會接受對象。

從現(xiàn)在開始,我將使用代碼示例來演示這些和其他 JCSP 構(gòu)造如何工作。在清單 2 中,可以看到一個非常簡單的進(jìn)程,它輸出 1 到 100 之間的所有偶數(shù):

清單 2. 生成 1 到 100 之間偶數(shù)的進(jìn)程

import jcsp.lang.*;

public class SendEvenIntsProcess implements CSProcess 
{
    private ChannelOutput out;

    public SendEvenIntsProcess(ChannelOutput out)
    {
      this.out = out;
    }

    public void run()
    {
      for (int i = 2; i <= 100; i = i + 2)
      {
        out.write (new Integer (i));
      }
    }
}

與每一個寫進(jìn)程對應(yīng),必須有一個讀進(jìn)程。如果不存在這樣的進(jìn)程,則會造成 SendEvenIntsProcessChannelOutput 對象的 out 進(jìn)行第一次寫操作之后立即無限期阻塞。清單 3 演示了一個簡單的讀進(jìn)程,該進(jìn)程與清單 2 介紹的寫進(jìn)程對應(yīng):

清單 3. 對應(yīng)的消費(fèi)者進(jìn)程

import jcsp.lang.*;

public class ReadEvenIntsProcess implements CSProcess
{
    private ChannelInput in;
    public ReadEvenIntsProcess(ChannelInput in)
    {
      this.in = in;
    }

    public void run()
    {
      while (true)
      {
        Integer d = (Integer)in.read();
        System.out.println("Read: " + d.intValue());
      }
    }
}

JCSP 中的通道
到目前為止,我只有兩個獨(dú)立的進(jìn)程。下一步就是使用一個用作共享同步機(jī)制的公共通道把它們聯(lián)系在一起,然后從中剔除一個進(jìn)程。channel 接口是 JCSP 的 ChannelInputChannelOutput 接口的子接口,是讀取和寫入對象的公共接口。這個接口有許多可能的實現(xiàn),就像下面描述的一樣:

  • One2OneChannel,顧名思義,實現(xiàn)了“單一寫入器/單一閱讀器”類型的通道。

  • One2AnyChannel 實現(xiàn)了“單一寫入器/多閱讀器”對象通道。(注意,這不是廣播機(jī)制,實際上,為了從通道讀取對象,多個閱讀器要進(jìn)行相互競爭;在指定時間只有一個閱讀器能使用通道和寫入器進(jìn)行溝通。)

  • Any2OneChannel 實現(xiàn)了 “多寫入器/單一閱讀器”對象通道。同上面的情況一樣,寫入進(jìn)程彼此競爭使用通道。在指定時間,只有閱讀器和眾多寫入器中的一個在實際使用通道。

  • Any2AnyChannel 實現(xiàn)了“多寫入器/多閱讀器”對象通道。讀取進(jìn)程彼此競爭使用的通道,寫入進(jìn)程也一樣。在指定時間只有一個閱讀器和一個寫入器在實際使用通道。

在清單 3 的示例中,我只有一個寫入器進(jìn)程和一個閱讀器進(jìn)程,所以 One2OneChannel 類就足夠了。驅(qū)動器程序的示例代碼如清單 4 所示:

清單 4. 驅(qū)動器程序

import jcsp.lang.*;

public class DriverProgram
{
    public static void main(String[] args)
    {
      One2OneChannel chan = new One2OneChannel();
      new Parallel
      (
        new CSProcess[]
	    {
	      new SendEvenIntsProcess (chan),
	      new ReadEvenIntsProcess (chan)
	    }
      ).run ();
    }
}

正如代碼表示的,我首先實例化一個新的 One2OneChannel 對象,然后把它傳遞給 SendEvenIntsProcessReadEventIntsProcess 進(jìn)程的構(gòu)造函數(shù)。這樣做是因為 One2OneChannel 同時實現(xiàn)了兩個接口 —— ChannelInputChannelOutput

通道內(nèi)部
因為通道在 JCSP 中是重要的概念,所以在進(jìn)行下一步之前,要確定您確實理解了它們的工作方式。正如我在前面提到的,通道在默認(rèn)情況下是非緩沖的,但是也可以把它們變成緩沖的。實現(xiàn)方式是:通道本身并不處理緩沖特性,而是把這個責(zé)任委托給其他類,其他類必須實現(xiàn)叫作 ChannelDataStore 的接口。JCSP 為這個接口提供了多個內(nèi)置實現(xiàn),其中包括以下幾個實現(xiàn):

  • ZeroBuffer,對應(yīng)默認(rèn)的非緩沖特性。

  • Buffer,為與之相關(guān)聯(lián)的通道提供了一個阻塞的先進(jìn)先出的緩沖語義。

  • InfiniteBuffer,也提供先進(jìn)先出語義,但是如果緩沖為空,那么可以將閱讀器阻塞。寫入器永遠(yuǎn)不會阻塞,因為緩沖可以無限擴(kuò)展,或者至少到了底層內(nèi)存系統(tǒng)設(shè)置的限制為止。

通道實戰(zhàn)
考慮一個實際使用的通道示例。當(dāng)我創(chuàng)建了如清單 4 所示的 One2OneChannel 實例時,我把它內(nèi)部的 ChannelDatasource 設(shè)置成 ZeroBuffer 的一個新實例。ZeroBuffer 只能保存一個對象(或整數(shù))。它有一個內(nèi)部狀態(tài)變量,該變量的起始值為 EMPTY,只要放進(jìn)一個對象,該變量的值就變成 FULL 了。

當(dāng) SendEvenIntsProcess 進(jìn)程在它的 out 通道上進(jìn)行 write 操作時,會發(fā)生什么呢?One2OneChannel 類的 write() 方法是一個 synchronized() 方法。因此,發(fā)送方進(jìn)程運(yùn)行所在的線程(很快就會看到發(fā)送方進(jìn)程和閱讀器進(jìn)程運(yùn)行在獨(dú)立的線程中)就會得到與這個通道實例相關(guān)聯(lián)的監(jiān)視器鎖,并繼續(xù)處理方法。在該方法中,業(yè)務(wù)的第一個順序就是調(diào)用內(nèi)部持有的 ZeroBuffer 實例的 put 方法,把對象(或者在這個示例中是整數(shù))寫到 ZeroBuffer 實例。這樣就把緩沖的狀態(tài)變成 FULL。這時,調(diào)用線程調(diào)用 wait,造成線程進(jìn)入監(jiān)視器的 等候集,后面進(jìn)行的操作是釋放監(jiān)視器鎖和阻塞線程。

稍后,閱讀器線程調(diào)用通道上的 read 操作(這也是一個同步的方法,所以閱讀器線程在繼續(xù)處理之前必須得到監(jiān)視器鎖)。因為內(nèi)部緩沖的狀態(tài)是 FULL,所以可用數(shù)據(jù)將被返回,并發(fā)出一個 notify()notify() 喚醒發(fā)送方線程,然后發(fā)送方線程退出監(jiān)視器等候集,并重新申請監(jiān)視器鎖。

在反過來的場景中,如果閱讀器線程調(diào)用通道上的 read 方法時,通道的內(nèi)部緩沖狀態(tài)是 EMPTY,那么閱讀器線程就不得不 wait,在這種情況下,發(fā)送方線程要在把數(shù)據(jù)對象寫入內(nèi)部緩沖之后通知閱讀器線程。

Parallel 構(gòu)造
清單 4 中,您可能已經(jīng)注意到驅(qū)動器程序引入了一個新類,叫作 ParallelParallel 類是由 JCSP 以預(yù)定義 CSProcess 的形式提供的,它接受一組獨(dú)立的 CSProcess 實例,并“平行地”運(yùn)行它們 (除了最后一個之外,所有進(jìn)程都在獨(dú)立的線程中運(yùn)行;最后一個進(jìn)程由 Parallel 對象在自己的控制線程中運(yùn)行)。 Parallel 進(jìn)程的 run 方法只有在所有的部件進(jìn)程終止的時候才終止。所以 Parallel 進(jìn)程是一種把多個獨(dú)立進(jìn)程組織起來的機(jī)制,它用通道(在驅(qū)動器程序中示例中)作為“線”把進(jìn)程連在一起。

了解 Parallel 構(gòu)造的另一個途徑是說:它可以把小的、簡單的組件組合成更高層次的進(jìn)程。實際上,Parallel 允許通過迭代把前面迭代中創(chuàng)建的組件與新的組件連接起來,創(chuàng)建出任意復(fù)雜程度的完全連接的進(jìn)程網(wǎng)絡(luò)。生成的進(jìn)程網(wǎng)絡(luò)可以像一個 CSProcess 對象一樣公開和使用。

Parallel 示例
JCSP 庫提供了一組即插即用的組件,不過僅僅是出于教育的目的,正好適合我的目的:進(jìn)入其中幾個的內(nèi)部實現(xiàn),可以很好的表現(xiàn)如何在 JCSP 中組合成網(wǎng)絡(luò)化的并發(fā)進(jìn)程。我用下面的示例進(jìn)程來表現(xiàn) JCSP 中 Parallel 構(gòu)造的內(nèi)部工作方式:

  • PlusInt 在兩個輸入流中都接受整數(shù),把整數(shù)加在一起,然后把結(jié)果輸出到輸出流。

  • Delta2Int 平行地把到達(dá)它的輸入流的每個整數(shù)廣播到它的兩個輸出通道。

  • PrefixInt 在它的整數(shù)輸入流之前加上一個(用戶配置的)整數(shù)。(也就是說,在這個進(jìn)程的輸出通道上有整數(shù)可用之前,第一個輸出是預(yù)先配置的整數(shù)。后面的輸出才是從輸入流得到的整數(shù)。)

  • IntegrateInt 是一個用 Parallel 構(gòu)造組合了前三個進(jìn)程的進(jìn)程。它的功能是輸出來自它的輸入通道的整數(shù)的中間匯總值。

IntegrateInt 類的 run 方法如清單 5 所示:

清單 5. IntegrateInt 進(jìn)程

import jcsp.lang.*;

public class IntegrateInt implements CSProcess 
{
  private final ChannelInputInt in;
  private final ChannelOutputInt out;

  public IntegrateInt (ChannelInputInt in, ChannelOutputInt out)
  {
    this.in = in;
    this.out = out;
  }

  public void run()
  {
      One2OneChannelInt a = new One2OneChannelInt ();
      One2OneChannelInt b = new One2OneChannelInt ();
      One2OneChannelInt c = new One2OneChannelInt ();

      new Parallel 
      (
        new CSProcess[]
        {
          new PlusInt (in, c, a),
          new Delta2Int (a, out, b),
          new PrefixInt (0, b, c)
        }
      ).run ();
  }
}

注意,與 請單 4 中使用的通道相比,這個示例中使用了不同種類的通道。 IntegrateInt 類使用 ChannelInputIntChannelOutputInt 通道,顧名思義,可以用它們傳遞 int 類型的整數(shù)。相比之下,清單 4 中的驅(qū)動器程序使用了 ChannelInputChannelOutput,它們是 對象 通道,可以用來在通道中從發(fā)送方給接收方發(fā)送任意對象。出于這個原因,在清單 4 中傳遞 int 值之前,我不得不把 int 值包裝成 Integer 對象。

在清單 5 中,還需要注意觀察什么呢?實際上,PrefixInt 進(jìn)程的第一個輸出是 0,它是通過 PlusInt 進(jìn)程添加到輸入通道到達(dá)的第一個整數(shù)上的。這個結(jié)果被寫入通道 a,它構(gòu)成了 Delta2Int 進(jìn)程的輸入通道。Delta2Int 進(jìn)程把整數(shù)結(jié)果寫到 out (進(jìn)程的整體輸出通道)并把它發(fā)送到 PrefixInt 進(jìn)程。然后 PrefixInt 進(jìn)程把整數(shù)作為輸入發(fā)送給 PlusInt 進(jìn)程,并添加到流中的第二個整數(shù),如此類推。

IntegrateInt 進(jìn)程組成的圖示如圖 1 所示:

圖 1. IntegrateInt 進(jìn)程
IntegrateInt 進(jìn)程

網(wǎng)絡(luò)中的網(wǎng)絡(luò)
IntegrateInt 進(jìn)程就是這樣由三個小進(jìn)程組成,它本身可以當(dāng)作一個復(fù)合進(jìn)程來用。JCSP 庫提供了一個叫作 SquaresInt 的進(jìn)程,顧名思義,它生成一個整數(shù)流,整數(shù)流是自然數(shù) (1、2、3、4,等等)的平方。這個進(jìn)程的代碼如清單 6 所示:

清單 6. SquaresInt 進(jìn)程

public class SquaresInt implements CSProcess 
{
  private final ChannelOutputInt out;

  public SquaresInt (ChannelOutputInt out)
  {
    this.out = out;
  }

  public void run()
  {
      One2OneChannelInt a = new One2OneChannelInt ();
      One2OneChannelInt b = new One2OneChannelInt ();

      new Parallel 
      (
        new CSProcess[]
        {
          new NumbersInt (a),
          new IntegrateInt (a, b),
          new PairsInt (b, out)
        }
      ).run ();
  }
}

我可以肯定您已經(jīng)注意到清單 6 顯示的兩個新進(jìn)程。NumbersInt 是一個內(nèi)置進(jìn)程,它只是在其輸出通道中輸出從 0 開始的自然數(shù)。PairsInt 進(jìn)程則把連續(xù)的一對輸入值相加并輸出結(jié)果。這兩個新進(jìn)程和 IntegrateInt 一起構(gòu)成了 SquaresInt 進(jìn)程,如圖 2 中的圖表所示:

圖 2. SquaresInt 進(jìn)程
SquaresInt 進(jìn)程

SquaresInt 的工作方式
在進(jìn)入下一部分之前,先來考慮 SquaresInt 進(jìn)程的內(nèi)部工作方式。在下面可以看到 SquaresInt 內(nèi)部每個通道上的交通流向:


Channel "a":	[0, 1, 2, 3, 4, 5, 6, 7, 8, ...ad infinitum]
Channel "b":	[0, 1, 3, 6, 10, 15, 21, 28, 36, ...ad infinitum]
Channel "out":	[1, 4, 9, 16, 25, 36, 49, 64, 81 ...ad infinitum]

您有沒有看這樣的模式:寫入通道 a 的整數(shù)造成它們也被寫入通道 b,因此也寫到通道 out?在第一次“滴答”當(dāng)中,NumbersInt 進(jìn)程把整數(shù) 0 寫入通道 aIntegrateInt 進(jìn)程也把整數(shù) 0 (是當(dāng)前匯總的值)寫入通道 bPairsInt 進(jìn)程在這次滴答中什么都不產(chǎn)生,因為它需要處理兩個輸入。在第二次滴答中,NumbersInt 進(jìn)程在它的輸出通道上寫入整數(shù) 1。這造成 IntegrateInt 進(jìn)程把匯總值修改成 0+1=1,所以把整數(shù) 1 寫入通道 b

這時, PairsInt 有了兩個整數(shù)輸入可以處理 —— 整數(shù) 0 來自前一次滴答,整數(shù) 1 來自當(dāng)前滴答。它把它們加在一起,并把輸出 0+1=1 寫到通道 out。請注意 1 是 1 的平方,所以我們現(xiàn)在可能是在正確的軌道上。繼續(xù)把示例前進(jìn)到下一個(第三個)滴答,NumbersInt 進(jìn)程把把整數(shù) 2 寫入通道 a。這使 IntegrateInt 進(jìn)程把匯總值更新為 1 (前一個匯總值) + 2 (新值) = 3 并把這個整數(shù)寫入通道 b

PairsInt 進(jìn)程看到最后兩個整數(shù)是什么?它們是 1 (在前一次滴答期間) 和 3 (在當(dāng)前滴答期間)。所以,進(jìn)程把這兩個整數(shù)加在一起,并把 1+3=4 寫入通道 out。您會注意到 4 是 2 的平方,這意味著 SquaresInt 工作起來就像它應(yīng)當(dāng)工作的那樣。實際上,應(yīng)當(dāng)繼續(xù)運(yùn)行這個程序到任意數(shù)量的滴答,這樣就可以驗證寫入通道 out 的整數(shù)總是在序列中的下一個整數(shù)的平方。我在下一節(jié)精確地這一操作。

數(shù)學(xué)問題
就在您納悶的時候,我想解釋一下生成平方值的數(shù)學(xué)基礎(chǔ)。假設(shè)在 NumbersInt 進(jìn)程已經(jīng)把整數(shù)輸出到某個 n-1 的時候,您偷看到了箱子內(nèi)部。IntegrateInt 進(jìn)程最后生成(而且通過共享通道 b 放到 PairsInt 進(jìn)程)的中間匯總會是 [1+2+3+...+(n-1)] = (n-1)(n-2)/2

在下一次滴答期間,NumbersInt 會輸出 n,這造成 IntegrateInt 進(jìn)程的中間匯總增長為 (1+2+3+...+n) = n(n-1)/2。然后這個匯總會通過共享通道 b 傳給 PairsInt 進(jìn)程。 PairsInt 會把這兩個數(shù)加在一起,生成 [(n-1)(n-2)/2 + n(n-1)/2] = [(n-2) + n](n-1)/2 = (2n-2)(n-1)/2 = (n-1)exp2

接下來,NumbersInt 進(jìn)程會產(chǎn)生(n+1)。與之對應(yīng),IntegrateInt 進(jìn)程會把 n(n+1)/2 送到 PairsInt 進(jìn)程。然后 PairsInt 會生成 [n(n-1)/2 + n(n+1)/2] = nexp2。針對所有的 n 對這進(jìn)行通用化,就會按照期望的那樣產(chǎn)生全部平方。

JCSP 中的確定性
以上示例演示了 CSP 的復(fù)合語言 —— 即如何用 Parallel 構(gòu)造把細(xì)致的無狀態(tài)的組件組成分層的網(wǎng)絡(luò)。所有這類相互通信的平行進(jìn)程的分層網(wǎng)絡(luò)的賣點(diǎn)就是:它們是完全確定的。在這個上下文環(huán)境中 確定 意味著什么呢?它意味著這類分層網(wǎng)絡(luò)的輸出只取決于提供給它的輸入,而不用考慮網(wǎng)絡(luò)運(yùn)行的運(yùn)行時環(huán)境(JVM)的特性。也就是說,進(jìn)程網(wǎng)絡(luò)獨(dú)立于 JVM 的調(diào)度策略,也獨(dú)立于它所分布的多處理器。(我在這里假設(shè)的是個單一節(jié)點(diǎn),但是,沒有什么固有的東西會防礙把這個討論引入物理上分布在多個節(jié)點(diǎn)上而在進(jìn)程之間通過線路進(jìn)行通信的進(jìn)程網(wǎng)絡(luò)上。)

確定性會是工具包中的強(qiáng)大工具,因為它可以讓您清晰地推斷出程序的行為,不必?fù)?dān)心運(yùn)行時環(huán)境對它可能產(chǎn)生的影響。同時,確定性不是并發(fā)性編程惟一可能的技術(shù)或必需的技術(shù)。因為下一個(也是最后一個)實例將顯示,非確定性在 JSP 中也是同樣強(qiáng)大的實用概念。

JCSP 中的非確定性
非確定是許多真實的應(yīng)用程序的因子,在這些應(yīng)用程序,可見的輸出是某個功能或者事件發(fā)生的順序。換句話說,當(dāng)結(jié)果取決于設(shè)計的調(diào)度,而 不是 取決于事件時,就是非確定性在并發(fā)應(yīng)用程序中發(fā)揮作用的地方了。您將會看到,JCSP 顯式地處理這類問題。

例如,假設(shè)一個進(jìn)程對于 下面要做什么 有許多備選項,每個備選項都有一個與之關(guān)聯(lián)的 警衛(wèi)(guard),警衛(wèi)必須處于“就緒(ready)”狀態(tài),這樣才能讓備選項得以考慮。進(jìn)程可以從可用的備選項(也就是就緒的)中選擇一個選項;選擇本身可能基于不同的策略,可能是任意選擇、最高優(yōu)先級選擇或者公平選擇。

事件選擇策略
在 JCSP 的特定上下文中,提供了一個叫作 Guard 的抽象類,競爭進(jìn)程選擇的事件必須繼續(xù)它。進(jìn)程本身使用另一個預(yù)先提供的類,叫作 Alternative,這些警衛(wèi)對象必須以對象數(shù)組的形式傳遞給它的構(gòu)造函數(shù)。Alternative 類為三種事件選擇策略提供了方法。

Alternative 類的 select() 方法對應(yīng)著 任意選擇 策略。select() 方法調(diào)用一直受阻塞,直到一個或多個警衛(wèi)就緒為止(請記住,所有競爭的警衛(wèi)對于 Alternative 類來說都是已知的)。其中一個就緒的警衛(wèi)被隨機(jī)選中,它的索引(在傳遞進(jìn)去的警衛(wèi)數(shù)組中)也被返回。

priSelect() 方法對應(yīng)著 最高優(yōu)先級 策略。也就是說,如果不止一個警衛(wèi)就緒,則返回索引值最低的那個;這里面的假設(shè)是:在數(shù)組中傳遞給 Alternative 構(gòu)造函數(shù)的警衛(wèi)已經(jīng)按照優(yōu)先級順序進(jìn)行降序排序了。

最后,方法 fairSelect 是在多個就緒警衛(wèi)中進(jìn)行 公平 選擇:在這個方法的連續(xù)調(diào)用中,在其他就緒而且可用的警衛(wèi)沒被選中之前,不會有某個就緒的警衛(wèi)被選中兩次。所以,如果警衛(wèi)的總數(shù)是 n,那么在最壞的情況下,就緒的警衛(wèi)沒獲得選中的次數(shù)不會連續(xù)超過 n 次。

如果進(jìn)程不關(guān)心如何選擇多個就緒警衛(wèi),那么任意選擇策略最合適;如果進(jìn)程想保證沒有資源耗盡或者最差服務(wù)次數(shù),例如在實時系統(tǒng)中,那么任意選擇就不太適用了。在前面的情況下,推薦使用 fairSelect 方法,而在后面的情況下,用 priSelect() 方法最好。

警衛(wèi)類型
大體來說,JCSP 提供了三類警衛(wèi):

  • 通道警衛(wèi) 總是對應(yīng)著進(jìn)程等候從中讀取數(shù)據(jù)的通道。也就是說,只有在通道另一端的進(jìn)程已經(jīng)輸出數(shù)據(jù),而該數(shù)據(jù)還沒有被進(jìn)程輸入的時候,警衛(wèi)才就緒。

  • 計時器警衛(wèi) 總是和設(shè)置(絕對)超時對應(yīng)。也就是說,如果超時,則計時器警衛(wèi)就會就緒。

  • 跳過警衛(wèi) 總是就緒。

JCSP 中的通道警衛(wèi) 可以是以下類型:AltingChannelInput/AltingChannelInputInt,只要在對應(yīng)的通道中有了對象或整數(shù)數(shù)據(jù),則這兩個通道將就緒;或者 AltingChannelAccept,如果在通道中出現(xiàn)不可接受的“CALL”(這一點(diǎn)后面有更多介紹),則通道就會就緒。這些都是抽象類,它們擁有 One2OneAny2One 類型通道形式的具體實現(xiàn)。JCSP 中的計時器 警衛(wèi)屬于 CSTimer 類型,而 跳過警衛(wèi) 則是以 Skip 類的形式提供的。

運(yùn)作中的警衛(wèi)
我用一個簡單的示例,演示如何用 JCSP 警衛(wèi)實現(xiàn)并發(fā)應(yīng)用程序中的非確定性,借此總結(jié)對 JCSP 的介紹。假設(shè)您必須開發(fā)一個乘法(或者 倍增) 設(shè)計,讀取的整數(shù)在輸出通道以固定速率到達(dá),可以用某個乘數(shù)乘以它們,然后把它們寫入其輸出通道。設(shè)備可以用一個初始乘數(shù)開始,但是這個乘數(shù)每 5 秒鐘自動加倍。

這個故事中介紹的方法是這樣的:系統(tǒng)中存在著第二個控制器進(jìn)程,它能通過專用通道向設(shè)備發(fā)送 suspend operation 信號。這使設(shè)備中止自身,并把乘數(shù)的當(dāng)前值通過第二個通道發(fā)送給控制器。

在中止的時候,設(shè)備只應(yīng)當(dāng)允許全部進(jìn)入的整數(shù)不經(jīng)變化地通過它的輸出通道。控制器進(jìn)程 —— 可能在用設(shè)備發(fā)送給它的乘數(shù)做了某些計算中的一種 —— 通過專用通道把一個新乘數(shù)發(fā)送回設(shè)備。(請注意:只要設(shè)備處于 中止 狀態(tài),就會被迫接受這個乘數(shù)。)

更新過的乘數(shù)插入到設(shè)備,并充當(dāng)設(shè)備的喚醒信號。設(shè)備現(xiàn)在繼續(xù)執(zhí)行它的放大操作,用新更新的乘數(shù)乘上輸入的整數(shù)。計時器這時也重置,所以新的乘數(shù)也在 5 秒之后被設(shè)置成加倍數(shù)值,如此類推。

圖 3 中的圖表說明了這個放大設(shè)備:

圖 3. 放大設(shè)備
ScaleInt 進(jìn)程

ScaleInt 進(jìn)程
放大設(shè)備的源代碼在清單 7 中顯示。這個示例中的非確定性是因為:output 的值基于 ininject 流的值(同時還基于這些值到達(dá)的順序)。

清單 7. ScaleInt 進(jìn)程

import jcsp.lang.*;
import jcsp.plugNplay.ints.*;

public class ScaleInt implements CSProcess
{
  private int s;
  private final ChannelOutputInt out, factor;
  private final AltingChannelInputInt in, suspend, inject;

  public ScaleInt (int s, AltingChannelInputInt suspend, AltingChannelInputInt in, 
    ChannelOutputInt factor, AltingChannelInputInt inject, ChannelOutputInt out)
  {
    this.s = s;
	this.in = in;
	this.out = out;
	this.suspend = suspend;
	this.factor = factor;
	this.inject = inject;
  }

  public void run()
  {
	final long second = 1000;               // Java timings are in millisecs
	final long doubleInterval = 5*second;
	final CSTimer timer = new CSTimer ();

	final Alternative normalAlt = new Alternative (new Guard[] {suspend, timer, in});
	
	final int NORMAL_SUSPEND=0, NORMAL_TIMER=1, NORMAL_IN = 2;

	final Alternative suspendedAlt = new Alternative (new Guard[] {inject, in});
	
	final int SUSPENDED_INJECT=0, SUSPENDED_IN = 1;
	
	long timeout = timer.read () + doubleInterval;
	timer.setAlarm (timeout);

	while (true)
	{
	  switch (normalAlt.priSelect ())
	  {
		case NORMAL_SUSPEND:
		  suspend.read ();              // don't care what's sent
		  factor.write (s);             // reply with the crucial information
		  boolean suspended = true;
		  while (suspended)
		  {
		    switch (suspendedAlt.priSelect ())
			{
			  case SUSPENDED_INJECT:    // this is the resume signal as well
			    s = inject.read ();     // get the new scaling factor
				suspended = false;      // and resume normal operations
				timeout = timer.read () + doubleInterval;
				timer.setAlarm (timeout);
				break;
			  case SUSPENDED_IN:
			    out.write (in.read ());
				break;
			}
		  }
		  break;
		case NORMAL_TIMER:
		  timeout = timer.read () + doubleInterval;
		  timer.setAlarm (timeout);
		  s = s*2;
		  break;
		case NORMAL_IN:
		  out.write (s * in.read ());
		  break;
	  }
    }
  }
}

import jcsp.lang.*;
import jcsp.plugNplay.ints.*;

public class Controller implements CSProcess
{
  private long interval;
  private final ChannelOutputInt suspend, inject;
  private final ChannelInputInt factor;

  public Controller (long interval, ChannelOutputInt suspend, ChannelOutputInt inject, 
    ChannelInputInt factor)
  { 
    this.interval = interval;
    this.suspend = suspend;
    this.inject = inject;
    this.factor = factor;
  }

  public void run ()
  {
	int currFactor = 0;
	final CSTimer tim = new CSTimer ();
	long timeout = tim.read ();
	while (true)
	{
	  timeout += interval;
	  tim.after (timeout);        // blocks until timeout reached
	  suspend.write (0);          // suspend signal (value irrelevant)
	  currFactor = factor.read ();			
	  currFactor ++;              // compute new factor
	  inject.write (currFactor);  // inject new factor
	}
  }
}

import jcsp.lang.*;
import jcsp.plugNplay.ints.*;

public class DriverProgram
{
  public static void main(String args[])
  {
	try
	{
	  final One2OneChannelInt temp = new One2OneChannelInt ();
	  final One2OneChannelInt in = new One2OneChannelInt ();
	  final One2OneChannelInt suspend = new One2OneChannelInt ();
	  final One2OneChannelInt factor = new One2OneChannelInt ();
	  final One2OneChannelInt inject = new One2OneChannelInt ();
	  final One2OneChannelInt out = new One2OneChannelInt ();
		
	  new Parallel
	  (
		new CSProcess[]
		{
		  new NumbersInt (temp),
		  new FixedDelayInt (1000, temp, in),
		  new ScaleInt (2, suspend, in, factor, inject, out),
		  new Controller (6000, suspend, inject, factor),
		  new PrinterInt (out, "--> ", "\n")
		}
	  ).run ();
	}
	catch (Exception e)
	{
		e.printStackTrace();
	}
  }
}

上面的類 ScaleInt 對應(yīng)著放大設(shè)備。正如前面提到的,這個類必須實現(xiàn) CSProcess 接口。因為上面的代碼演示了許多概念,所以我將逐個討論它的不同方面。

兩個備選項
ScaleInt 類中,我們感興趣的第一個方法是 run()。在 run() 方法中,要做的第一件事是創(chuàng)建 Alternative 類的兩個實例,每個都有一組不同的 Guard 對象。

第一個 Alternative 實例由變量 normalAlt 表示,它是為設(shè)備正常操作的時候使用的。與之關(guān)聯(lián)的警衛(wèi)列表如下所示:

  • suspendOne2OneChannelInt 的實例。正如前面提到過的,One2OneChannelInt 實現(xiàn)了單一閱讀器/寫入器整數(shù)通道,通道是零緩沖、完全同步的。這是控制器進(jìn)程向設(shè)備發(fā)送中止信號的通道。

  • timerCSTimer 的實例,它被設(shè)置成每 5 秒觸發(fā)一次,每次觸發(fā)時,設(shè)備會把乘數(shù)的當(dāng)前值加倍。

  • inOne2OneChannelInt 的實例,設(shè)備通過它接收輸入的整數(shù)。

第二個 Alternative 實例由 suspendedAlt 表示,它是供設(shè)備在已經(jīng)被 Controller 中止的情況下使用的。與之關(guān)聯(lián)的警衛(wèi)如下如示:

  • injectOne2OneChannelInt 的實例,由控制器進(jìn)程使用,用來向設(shè)備發(fā)送新的乘數(shù)(也充當(dāng)喚醒信號)。

  • in 是前面已經(jīng)看到的 One2OneChannelInt 相同的實例;設(shè)備通過這個通道接收輸入整數(shù)。

兩個 Alternative 實例被用在不同的情況下等候警衛(wèi)就緒,列表的順序是隱式的優(yōu)先級順序。例如,如果 normalAltsuspendtimer 警衛(wèi)恰好同時就緒,那么和 suspend 警衛(wèi)對應(yīng)的事件首先被處理。

警衛(wèi)就緒
下一個我們感興趣的是在每個警衛(wèi)就緒的時候,發(fā)生了什么。我首先研究 normalSelect,假設(shè)設(shè)備操作正常(也就是說,還沒有被中止):

  • 如果控制器向設(shè)備發(fā)送了 suspend 信號,那么這個事件以最高優(yōu)先級得到處理。作為響應(yīng),設(shè)備把乘數(shù)的當(dāng)前值通過叫作 factor 的通道發(fā)送給控制器。然后將叫作 suspended 的內(nèi)部標(biāo)志設(shè)置為 true,然后進(jìn)入循環(huán),等候別人發(fā)送信號,以繼續(xù)其操作。在循環(huán)內(nèi)部,設(shè)備調(diào)用第二個 Alternative 實例上的 priSelect() 方法 (suspendedAlt)。

    這個 Alternative 實例包含兩個警衛(wèi):第一個表示控制器向設(shè)備發(fā)送乘數(shù)的事件,第二個表示整數(shù)到達(dá)設(shè)備的輸入通道。在前一種情況下,設(shè)備用從 inject 通道讀取的值來更新乘數(shù)(保存在變量 s 中),并將 suspended 標(biāo)志設(shè)置回 false (這樣就保證了在下一次迭代時可以退出內(nèi)部循環(huán)),用當(dāng)前計時器的值作為基值重新設(shè)置鬧鐘。在后一種情況下,設(shè)備只是從它的輸入通道讀取整數(shù),并把整數(shù)寫入輸出通道(也就是說,在設(shè)備中止時,不許使用乘數(shù)的要求)。

  • 具有下一個優(yōu)先級得到處理的事件是鬧鐘到期事件。這造成設(shè)備把當(dāng)前乘數(shù)加倍,用當(dāng)前計時器的值作為基值重新設(shè)置鬧鐘,然后返回,繼續(xù)等候下一個事件。

  • 第三個可能是事件是從設(shè)備的輸入通道接收整數(shù)的事件。與之對應(yīng)的是,設(shè)備讀取整數(shù),用當(dāng)前乘數(shù) s 乘上它,并將結(jié)果寫入設(shè)備的輸出通道。

Controller 類
下一個要考慮的類是 Controller 類。請記住,控制器類的任務(wù)是周期性地(大概是基于復(fù)雜的計算)向設(shè)備進(jìn)程插入乘數(shù)值。在這個示例中,周期基礎(chǔ)只是一個計時器,該計時器按照規(guī)律的、配置好的間隔到期。每次到期時,控制器就在 suspend 上寫一個 0(也就是說,它將中止設(shè)備),并在叫作 factor 的輸入通道上讀取當(dāng)前的乘數(shù)。

這時,控制器只是把這個值加一,然后通過一對一通道 (叫作 inject,專門用于為這個目的) 將它插回設(shè)備。這就通知設(shè)備繼續(xù)工作的方式,這時計時器被重新設(shè)置成在適當(dāng)間隔后到期。

DriverProgram 類
最后剩下的類是驅(qū)動器類 DriverProgram。這個類創(chuàng)建適當(dāng)?shù)耐ǖ篮?CSProcess 實例數(shù)組。它用 JCSP 提供的類 NumbersInt 生成一系列自然數(shù),通過temp 通道傳遞給另一個叫作 FixedDelayInt 的內(nèi)置類。顧名思義,FixedDelayInt 將來自其輸入通道的值在固定延遲(在示例代碼中,該延遲是 1 秒)之后發(fā)送到它的輸出通道。

這個自然數(shù)的流每隔一秒就被發(fā)送到 ScaleInt 進(jìn)程的 in 通道。ScaleInt 進(jìn)程的 out 通道的輸出傳遞給 JCSP 提供的 PrinterInt 進(jìn)程,然后該進(jìn)程再接著把整數(shù)值輸出到 System.out

第 2 部分的結(jié)束語
在這個由三部分組成的介紹適用于 Java 程序員的 CSP 的系列文章的第 2 部分中,我解釋并演示了并發(fā)編程中的 CSP 理論。然后是對 CSP 構(gòu)造的概述,其中介紹了最流行的基于 Java 的 CSP 庫 —— JCSP。由于 Java 語言沒有對 CSP 構(gòu)造提供自帶的支持,所以 JCSP 庫內(nèi)部采 Java 支持 的自帶構(gòu)造,例如 synchronized()wait()notify()。為了幫助您正確地理解 JCSP 是如何工作的,我從 Java 構(gòu)造的角度解釋了一些 JCSP 類庫的內(nèi)部實現(xiàn),然后在幾個實際示例中演示了它們的用法。

這里所進(jìn)行的討論可以作為本系列最后一篇文章的絕好基礎(chǔ)。在最后一篇文章中,我將解釋 CSP 和 AOP 的相似性,并簡要地對 CSP 解決并發(fā)性的方法和新的 java.util.concurrent 包解決并發(fā)性的方法進(jìn)行比較,還將介紹許多用 JCSP 進(jìn)行高級同步 的技術(shù)。

致謝
非常感謝 Peter Welch 教授在我編寫這個文章系列期間給予的鼓勵。他在百忙之中抽出時間非常細(xì)致地審閱了草稿,并提供了許多寶貴的提高系列質(zhì)量和準(zhǔn)確性的建議。文章中如果還存在錯誤的話,那都是由于我的原因!我在文章中使用的示例基于或來自 JCSP 庫的 javadoc 中提供的示例,以及 JCSP Web 站點(diǎn)上提供的 Powerpoint 演示文稿。這兩個來源提供了需要探索的大量信息。

參考資料

  • 您可以參閱本文在 developerWorks 全球站點(diǎn)上的 英文原文

  • Brian Goetz 編寫的由三部分組成的 “Threading lightly”是解決 Java 平臺上同步問題的巧妙而系統(tǒng)的方法。(developerWorks,2001 年 7 月)

  • Allen Holub 撰寫的“如果我是國王:關(guān)于解決 Java編程語言線程問題的建議” (developerWorks,2000 年 10 月)是一篇啟蒙性的、至今仍有意義的、關(guān)于 Java 平臺多線程編程錯誤的概述。

  • C.A.R. Hoare 開創(chuàng)性的論文“Communicating Sequential Processes”把通信順序進(jìn)程的并行組成作為一種基本的編程結(jié)構(gòu)化方法提了出來(Communications of the ACM Archive,1978)。

  • 可以免費(fèi)獲得 PDF 格式的 C.A.R. Hoare 撰寫的 關(guān)于 CSP 的書籍

  • Bill Roscoe 撰寫的 Theory and Practice of Concurrency (Prentice Hall, 1997) 是最關(guān)于并發(fā)性和 CS 主題的最新書籍。

  • 牛津大學(xué)計算機(jī)實驗室負(fù)責(zé)的 CSP Archive 是學(xué)習(xí)更多關(guān)于 CSP 內(nèi)容的好地方,除此之外,還有 WoTUG homepage

  • Peter Welch 教授和 Jeremy Martin 合著的“Formal Analysis of Concurrent Java Systems” (IOS Press, 2000) 是在 Java 語言中實踐 CSP 的良好起點(diǎn)。

  • JCSP homepage 由英國坎特伯雷市肯特大學(xué)負(fù)責(zé)。

  • FDR2 (故障偏差求精,F(xiàn)ailures-Divergence Refinement) 是面向基于 CSP 的程序的幾個商業(yè)化模型檢測工具之一。

  • CSP 的實現(xiàn)可用于 Java 語言之外的其他語言:C++CSP 是針對 C++ 的實現(xiàn),而 J#.Net 是針對 .Net 的實現(xiàn)。

  • Occam-pi 是一個語言平臺,它期望用 pi-calculus 的移動特性擴(kuò)展 occam 語言的 CSP 想法。請從 occam-pi homepage 學(xué)習(xí)這個尖端的研究。

  • 在學(xué)習(xí) occam 時,您可能還想調(diào)查 occam 編程器的各種擴(kuò)展

  • 在 developerWorks Java 技術(shù)專區(qū) 可以找到 Java 編程各方面的文章。

  • 請參閱 Developer Bookstore,以獲得技術(shù)書籍的完整清單,其中包括數(shù)百本 Java 相關(guān)主題的書籍。

  • 還請參閱 Java 技術(shù)專區(qū)教程頁,以獲得 developerWorks 上免費(fèi)的、以 Java 為重點(diǎn)的教程。

關(guān)于作者
Abhijit Belapurkar 擁有位于印度德里市的印度理工學(xué)院(IIT)計算機(jī)科學(xué)的理工學(xué)士學(xué)位。在過去的 11 年中,他一直工作在分布式應(yīng)用程序的架構(gòu)和信息安全領(lǐng)域,他在使用 Java 平臺構(gòu)建 n 層應(yīng)用程序方面也已經(jīng)有大約 6 年的工作經(jīng)驗。他目前作為高級技術(shù)架構(gòu)師在 J2EE 領(lǐng)域工作,服務(wù)于印度班加羅爾的 Infosys 科技有限公司。