<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    java人

    愛生活,更愛java!

      BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      1 隨筆 :: 2 文章 :: 3 評(píng)論 :: 0 Trackbacks

            今天無意中看到PipedInputStream這個(gè)類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:

            PipedInputStream類與PipedOutputStream類用于在應(yīng)用程序中創(chuàng)建管道通信.一個(gè)PipedInputStream實(shí)例對(duì)象必須和一個(gè)PipedOutputStream實(shí)例對(duì)象進(jìn)行連接而產(chǎn)生一個(gè)通信管道.PipedOutputStream可以向管道中寫入數(shù)據(jù),PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數(shù)據(jù).這兩個(gè)類主要用來完成線程之間的通信.一個(gè)線程的PipedInputStream對(duì)象能夠從另外一個(gè)線程的PipedOutputStream對(duì)象中讀取數(shù)據(jù).

            原來如此,不過這只是說說而已,具體怎么實(shí)現(xiàn)的呢,愛刨根問底的我可不會(huì)輕易放過這些疑惑,于是看了一下SUN源碼這兩個(gè)類的實(shí)現(xiàn),恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會(huì)在此寫上,供各位批判:)

            首先簡(jiǎn)單的介紹一下這兩個(gè)類的實(shí)現(xiàn)原理,PipedInputStreamPipedOutputStream的實(shí)現(xiàn)原理類似于"生產(chǎn)者-消費(fèi)者"原理,PipedOutputStream是生產(chǎn)者,PipedInputStream是消費(fèi)者,在PipedInputStream中有一個(gè)buffer字節(jié)數(shù)組,默認(rèn)大小為1024,作為緩沖區(qū),存放"生產(chǎn)者"生產(chǎn)出來的東東.還有兩個(gè)變量,in,out,in是用來記錄"生產(chǎn)者"生產(chǎn)了多少,out是用來記錄"消費(fèi)者"消費(fèi)了多少,in為-1表示消費(fèi)完了,in==out表示生產(chǎn)滿了.當(dāng)消費(fèi)者沒東西可消費(fèi)的時(shí)候,也就是當(dāng)in為-1的時(shí)候,消費(fèi)者會(huì)一直等待,直到有東西可消費(fèi).

            因?yàn)樯a(chǎn)和消費(fèi)的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對(duì)對(duì)象上鎖,之前一直以為只是對(duì)這個(gè)方法上鎖,別的synchronized方法仍然可以進(jìn)入,哎,慚愧慚愧~~),所以肯定是生產(chǎn)者先生產(chǎn)出一定數(shù)量的東西,消費(fèi)者才可以開始消費(fèi),所以在生產(chǎn)的時(shí)候發(fā)現(xiàn)in==out,那一定是滿了,同理,在消費(fèi)的時(shí)候發(fā)現(xiàn)in==out,那一定是消費(fèi)完了,因?yàn)樯a(chǎn)的東西永遠(yuǎn)要比消費(fèi)來得早,消費(fèi)者最多可以消費(fèi)和生產(chǎn)的數(shù)量相等的東西,而不會(huì)超出.

            好了,介紹完之后,看看SUN高手是怎么實(shí)現(xiàn)這些功能的.由于buffer(存放產(chǎn)品的通道)這個(gè)關(guān)鍵變量在PipedInputStream消費(fèi)者這個(gè)類中,所以要想對(duì)buffer操作,只能通過PipedInputStream來操作,因此將產(chǎn)品放入通道的操作是在PipedInputStream中.

    存放產(chǎn)品的行為:

        protected synchronized void receive(int b) throws IOException {// 這里好像有些問題,因?yàn)檫@個(gè)方法是在PipedOutputStream類中調(diào)用的,而這個(gè)方法是protected的,下面另一個(gè)receive方法就不是protected,可能是我的源碼有些問題,也請(qǐng)大家?guī)臀铱纯?/span>
            checkStateForReceive();// 檢測(cè)通道是否連接,準(zhǔn)備好接收產(chǎn)品
            writeSide = Thread.currentThread();// 當(dāng)前線程是生產(chǎn)者
            if (in == out)
                awaitSpace();
    // 發(fā)現(xiàn)通道滿了,沒地方放東西啦,等吧~~
            if (in < 0{// in<0,表示通道是空的,將生產(chǎn)和消費(fèi)的位置都置成第一個(gè)位置
                in = 0;
                out 
    = 0;
            }

            buffer[in
    ++= (byte) (b & 0xFF);
            
    if (in >= buffer.length) {// 如果生產(chǎn)位置到達(dá)了通道的末尾,為了循環(huán)利用通道,將in置成0
                in = 0;
            }

        }


        
    synchronized void receive(byte b[], int off, int len) throws IOException {// 看,這個(gè)方法不是protected的!
            checkStateForReceive();
            writeSide 
    = Thread.currentThread();
            
    int bytesToTransfer = len;// 需要接收多少產(chǎn)品的數(shù)量
            while (bytesToTransfer > 0{
                
    if (in == out)
                    awaitSpace();
                
    int nextTransferAmount = 0;// 本次實(shí)際可以接收的數(shù)量
                if (out < in) {
                    nextTransferAmount 
    = buffer.length - in;// 如果消費(fèi)的當(dāng)前位置<生產(chǎn)的當(dāng)前位置,則還可以再生產(chǎn)buffer.length-in這么多
                }
     else if (in < out) {
                    
    if (in == -1{
                        in 
    = out = 0;// 如果已經(jīng)消費(fèi)完,則將in,out置成0,從頭開始接收
                        nextTransferAmount = buffer.length - in;
                    }
     else {
                        nextTransferAmount 
    = out - in;// 如果消費(fèi)的當(dāng)前位置>生產(chǎn)的當(dāng)前位置,而且還沒消費(fèi)完,那么至少還可以再生產(chǎn)out-in這么多,注意,這種情況是因?yàn)橥ǖ辣恢貜?fù)利用而產(chǎn)生的!
                    }

                }

                
    if (nextTransferAmount > bytesToTransfer)// 如果本次實(shí)際可以接收的數(shù)量要大于當(dāng)前傳過來的數(shù)量,
                    nextTransferAmount = bytesToTransfer;// 那么本次實(shí)際只能接收當(dāng)前傳過來的這么多了
                assert (nextTransferAmount > 0);
                System.arraycopy(b, off, buffer, in, nextTransferAmount);
    // 把本次實(shí)際接收的數(shù)量放進(jìn)通道
                bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進(jìn)通道
                off += nextTransferAmount;
                in 
    += nextTransferAmount;
                
    if (in >= buffer.length) {// 到末尾了,該從頭開始了
                    in = 0;
                }

            }

        }

     

    消費(fèi)產(chǎn)品的行為:

        public synchronized int read() throws IOException {// 消費(fèi)單個(gè)產(chǎn)品
            if (!connected) {
                
    throw new IOException("Pipe not connected");
            }
     else if (closedByReader) {
                
    throw new IOException("Pipe closed");
            }
     else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                    
    && (in < 0)) {
                
    throw new IOException("Write end dead");
            }


            readSide 
    = Thread.currentThread();
            
    int trials = 2;
            
    while (in < 0{// in<0,表示通道是空的,等待生產(chǎn)者生產(chǎn)
                if (closedByWriter) {
                    
    /* closed by writer, return EOF */
                    
    return -1;// 返回-1表示生產(chǎn)者已經(jīng)不再生產(chǎn)產(chǎn)品了,closedByWriter為true表示是由生產(chǎn)者將通道關(guān)閉的
                }

                
    if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                    
    throw new IOException("Pipe broken");
                }

                
    /* might be a writer waiting */
                notifyAll();
                
    try {
                    wait(
    1000);
                }
     catch (InterruptedException ex) {
                    
    throw new java.io.InterruptedIOException();
                }

            }

            
    int ret = buffer[out++& 0xFF;
            
    if (out >= buffer.length) {
                out 
    = 0;// 如果消費(fèi)到通道的末尾了,從通道頭開始繼續(xù)循環(huán)消費(fèi)
            }

            
    if (in == out) {
                
    /* now empty */
                in 
    = -1;// 消費(fèi)的位置和生產(chǎn)的位置重合了,表示消費(fèi)完了,需要生產(chǎn)者生產(chǎn),in置為-1
            }

            
    return ret;
        }


        
    public synchronized int read(byte b[], int off, int len) throws IOException {
            
    if (b == null{
                
    throw new NullPointerException();
            }
     else if ((off < 0|| (off > b.length) || (len < 0)
                    
    || ((off + len) > b.length) || ((off + len) < 0)) {
                
    throw new IndexOutOfBoundsException();
            }
     else if (len == 0{
                
    return 0;
            }


            
    /* possibly wait on the first character */
            
    int c = read();// 利用消費(fèi)單個(gè)產(chǎn)品來檢測(cè)通道是否連接,并且通道中是否有東西可消費(fèi)
            if (c < 0{
                
    return -1;// 返回-1表示生產(chǎn)者生產(chǎn)完了,消費(fèi)者也消費(fèi)完了,消費(fèi)者可以關(guān)閉通道了
            }

            b[off] 
    = (byte) c;
            
    int rlen = 1;

            
    // 這里沒有采用receive(byte [], int ,
            
    // int)方法中System.arrayCopy()的方法,其實(shí)用System.arrayCopy()的方法也可以實(shí)現(xiàn)
            /*
             * 這是用System.arrayCopy()實(shí)現(xiàn)的方法 int bytesToConsume = len - 1; while
             * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
             * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
             * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
             * buffer.length - out; }
             * 
             * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
             * bytesToConsume; assert (nextConsumeAmount > 0);
             * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
             * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
             * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
             * buffer.length) { out = 0; } if(in == out) { in = -1; } }
             
    */


            
    while ((in >= 0&& (--len > 0)) {
                b[off 
    + rlen] = buffer[out++];
                rlen
    ++;
                
    if (out >= buffer.length) {
                    out 
    = 0;
                }

                
    if (in == out) {
                    
    /* now empty */
                    in 
    = -1;// in==out,表示滿了,將in置成-1
                }

            }

            
    return rlen;
        }


    雖說功能看似簡(jiǎn)單,可是實(shí)現(xiàn)起來卻費(fèi)了一番功夫,在線程的調(diào)度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個(gè)類,讓我對(duì)多線程編程有了更多的認(rèn)識(shí).我覺得要想周密的分析整個(gè)功能,得把整個(gè)流程都分析清楚,構(gòu)造好了模型再去實(shí)現(xiàn)細(xì)節(jié),只有整體構(gòu)造對(duì)了,才能把正確的實(shí)現(xiàn)局部,自己的分析能力還有待加強(qiáng)啊!

    posted on 2008-04-10 17:47 爪哇豬 閱讀(3945) 評(píng)論(3)  編輯  收藏

    評(píng)論

    # re: 使用PipedInputStream,PipedOutputStream心得體會(huì) 2011-10-20 10:14 王康
    不知道你指的protected關(guān)鍵字有什么問題???  回復(fù)  更多評(píng)論
      

    # r的 2012-11-05 16:31
    的  回復(fù)  更多評(píng)論
      

    # re: 使用PipedInputStream,PipedOutputStream心得體會(huì)[未登錄] 2015-04-03 12:35 aaron
    請(qǐng)問可以引用嗎?謝謝  回復(fù)  更多評(píng)論
      


    只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 国产成人亚洲综合无码精品| 丁香花免费完整高清观看| a在线视频免费观看在线视频三区| 国产精品亚洲专区在线播放| 亚洲av无一区二区三区| 亚洲av日韩av永久在线观看| 麻豆亚洲AV成人无码久久精品| 337p日本欧洲亚洲大胆人人 | 国产精品久免费的黄网站| 中文字幕人成无码免费视频| 91嫩草国产在线观看免费| 免费看香港一级毛片| 亚洲国产精品丝袜在线观看| 亚洲精品综合久久| 亚洲色婷婷一区二区三区| 亚洲an天堂an在线观看| 亚洲中文无码a∨在线观看| 亚洲性色精品一区二区在线| 久久精品国产亚洲AV电影网| 一级特黄aaa大片免费看| a级毛片免费在线观看| 亚洲免费观看网站| 妞干网在线免费观看| 免费人成在线观看网站视频| 国产亚洲人成A在线V网站| 亚洲综合精品一二三区在线| 亚洲伊人久久大香线蕉| 羞羞漫画登录页面免费| 美女视频黄a视频全免费网站色窝 美女被cao网站免费看在线看 | 亚洲av色福利天堂| 亚洲国产精品久久网午夜| 亚洲av无码一区二区三区天堂| 国产亚洲精品美女| 最新国产乱人伦偷精品免费网站| 18女人腿打开无遮掩免费| 影音先锋在线免费观看| 亚洲视频在线免费| 亚洲综合一区二区精品久久| 亚洲乱人伦中文字幕无码| 国产99精品一区二区三区免费| 久久国产精品成人片免费|