今天無意中看到PipedInputStream這個類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:
PipedInputStream類與PipedOutputStream類用于在應用程序中創建管道通信.一個PipedInputStream實例對象必須和一個PipedOutputStream實例對象進行連接而產生一個通信管道.PipedOutputStream可以向管道中寫入數據,PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數據.這兩個類主要用來完成線程之間的通信.一個線程的PipedInputStream對象能夠從另外一個線程的PipedOutputStream對象中讀取數據.
原來如此,不過這只是說說而已,具體怎么實現的呢,愛刨根問底的我可不會輕易放過這些疑惑,于是看了一下SUN源碼這兩個類的實現,恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會在此寫上,供各位批判:)
首先簡單的介紹一下這兩個類的實現原理,PipedInputStream和PipedOutputStream的實現原理類似于"生產者-消費者"原理,PipedOutputStream是生產者,PipedInputStream是消費者,在PipedInputStream中有一個buffer字節數組,默認大小為1024,作為緩沖區,存放"生產者"生產出來的東東.還有兩個變量,in,out,in是用來記錄"生產者"生產了多少,out是用來記錄"消費者"消費了多少,in為-1表示消費完了,in==out表示生產滿了.當消費者沒東西可消費的時候,也就是當in為-1的時候,消費者會一直等待,直到有東西可消費.
因為生產和消費的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對對象上鎖,之前一直以為只是對這個方法上鎖,別的synchronized方法仍然可以進入,哎,慚愧慚愧~~),所以肯定是生產者先生產出一定數量的東西,消費者才可以開始消費,所以在生產的時候發現in==out,那一定是滿了,同理,在消費的時候發現in==out,那一定是消費完了,因為生產的東西永遠要比消費來得早,消費者最多可以消費和生產的數量相等的東西,而不會超出.
好了,介紹完之后,看看SUN高手是怎么實現這些功能的.由于buffer(存放產品的通道)這個關鍵變量在PipedInputStream消費者這個類中,所以要想對buffer操作,只能通過PipedInputStream來操作,因此將產品放入通道的操作是在PipedInputStream中.
存放產品的行為:

protected synchronized void receive(int b) throws IOException
{// 這里好像有些問題,因為這個方法是在PipedOutputStream類中調用的,而這個方法是protected的,下面另一個receive方法就不是protected,可能是我的源碼有些問題,也請大家幫我看看
checkStateForReceive();// 檢測通道是否連接,準備好接收產品
writeSide = Thread.currentThread();// 當前線程是生產者
if (in == out)
awaitSpace();// 發現通道滿了,沒地方放東西啦,等吧~~

if (in < 0)
{// in<0,表示通道是空的,將生產和消費的位置都置成第一個位置
in = 0;
out = 0;
}
buffer[in++] = (byte) (b & 0xFF);

if (in >= buffer.length)
{// 如果生產位置到達了通道的末尾,為了循環利用通道,將in置成0
in = 0;
}
}


synchronized void receive(byte b[], int off, int len) throws IOException
{// 看,這個方法不是protected的!
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;// 需要接收多少產品的數量

while (bytesToTransfer > 0)
{
if (in == out)
awaitSpace();
int nextTransferAmount = 0;// 本次實際可以接收的數量

if (out < in)
{
nextTransferAmount = buffer.length - in;// 如果消費的當前位置<生產的當前位置,則還可以再生產buffer.length-in這么多

} else if (in < out)
{

if (in == -1)
{
in = out = 0;// 如果已經消費完,則將in,out置成0,從頭開始接收
nextTransferAmount = buffer.length - in;

} else
{
nextTransferAmount = out - in;// 如果消費的當前位置>生產的當前位置,而且還沒消費完,那么至少還可以再生產out-in這么多,注意,這種情況是因為通道被重復利用而產生的!
}
}
if (nextTransferAmount > bytesToTransfer)// 如果本次實際可以接收的數量要大于當前傳過來的數量,
nextTransferAmount = bytesToTransfer;// 那么本次實際只能接收當前傳過來的這么多了
assert (nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);// 把本次實際接收的數量放進通道
bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進通道
off += nextTransferAmount;
in += nextTransferAmount;

if (in >= buffer.length)
{// 到末尾了,該從頭開始了
in = 0;
}
}
}
消費產品的行為:

public synchronized int read() throws IOException
{// 消費單個產品

if (!connected)
{
throw new IOException("Pipe not connected");

} else if (closedByReader)
{
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter

&& (in < 0))
{
throw new IOException("Write end dead");
}

readSide = Thread.currentThread();
int trials = 2;

while (in < 0)
{// in<0,表示通道是空的,等待生產者生產

if (closedByWriter)
{

/**//* closed by writer, return EOF */
return -1;// 返回-1表示生產者已經不再生產產品了,closedByWriter為true表示是由生產者將通道關閉的
}

if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0))
{
throw new IOException("Pipe broken");
}

/**//* might be a writer waiting */
notifyAll();

try
{
wait(1000);

} catch (InterruptedException ex)
{
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;

if (out >= buffer.length)
{
out = 0;// 如果消費到通道的末尾了,從通道頭開始繼續循環消費
}

if (in == out)
{

/**//* now empty */
in = -1;// 消費的位置和生產的位置重合了,表示消費完了,需要生產者生產,in置為-1
}
return ret;
}


public synchronized int read(byte b[], int off, int len) throws IOException
{

if (b == null)
{
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0)

|| ((off + len) > b.length) || ((off + len) < 0))
{
throw new IndexOutOfBoundsException();

} else if (len == 0)
{
return 0;
}


/**//* possibly wait on the first character */
int c = read();// 利用消費單個產品來檢測通道是否連接,并且通道中是否有東西可消費

if (c < 0)
{
return -1;// 返回-1表示生產者生產完了,消費者也消費完了,消費者可以關閉通道了
}
b[off] = (byte) c;
int rlen = 1;

// 這里沒有采用receive(byte [], int ,
// int)方法中System.arrayCopy()的方法,其實用System.arrayCopy()的方法也可以實現

/**//*
* 這是用System.arrayCopy()實現的方法 int bytesToConsume = len - 1; while
* (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
* in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
* b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
* buffer.length - out; }
*
* if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
* bytesToConsume; assert (nextConsumeAmount > 0);
* System.arraycopy(buffer, out, b, off, nextConsumeAmount);
* bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
* nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
* buffer.length) { out = 0; } if(in == out) { in = -1; } }
*/


while ((in >= 0) && (--len > 0))
{
b[off + rlen] = buffer[out++];
rlen++;

if (out >= buffer.length)
{
out = 0;
}

if (in == out)
{

/**//* now empty */
in = -1;// in==out,表示滿了,將in置成-1
}
}
return rlen;
}
雖說功能看似簡單,可是實現起來卻費了一番功夫,在線程的調度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個類,讓我對多線程編程有了更多的認識.我覺得要想周密的分析整個功能,得把整個流程都分析清楚,構造好了模型再去實現細節,只有整體構造對了,才能把正確的實現局部,自己的分析能力還有待加強啊!