<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    java人

    愛生活,更愛java!

      BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
      1 隨筆 :: 2 文章 :: 3 評論 :: 0 Trackbacks

            今天無意中看到PipedInputStream這個(gè)類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:

            PipedInputStream類與PipedOutputStream類用于在應(yīng)用程序中創(chuàng)建管道通信.一個(gè)PipedInputStream實(shí)例對象必須和一個(gè)PipedOutputStream實(shí)例對象進(jìn)行連接而產(chǎn)生一個(gè)通信管道.PipedOutputStream可以向管道中寫入數(shù)據(jù),PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數(shù)據(jù).這兩個(gè)類主要用來完成線程之間的通信.一個(gè)線程的PipedInputStream對象能夠從另外一個(gè)線程的PipedOutputStream對象中讀取數(shù)據(jù).

            原來如此,不過這只是說說而已,具體怎么實(shí)現(xiàn)的呢,愛刨根問底的我可不會輕易放過這些疑惑,于是看了一下SUN源碼這兩個(gè)類的實(shí)現(xiàn),恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會在此寫上,供各位批判:)

            首先簡單的介紹一下這兩個(gè)類的實(shí)現(xiàn)原理,PipedInputStreamPipedOutputStream的實(shí)現(xiàn)原理類似于"生產(chǎn)者-消費(fèi)者"原理,PipedOutputStream是生產(chǎn)者,PipedInputStream是消費(fèi)者,在PipedInputStream中有一個(gè)buffer字節(jié)數(shù)組,默認(rèn)大小為1024,作為緩沖區(qū),存放"生產(chǎn)者"生產(chǎn)出來的東東.還有兩個(gè)變量,in,out,in是用來記錄"生產(chǎn)者"生產(chǎn)了多少,out是用來記錄"消費(fèi)者"消費(fèi)了多少,in為-1表示消費(fèi)完了,in==out表示生產(chǎn)滿了.當(dāng)消費(fèi)者沒東西可消費(fèi)的時(shí)候,也就是當(dāng)in為-1的時(shí)候,消費(fèi)者會一直等待,直到有東西可消費(fèi).

            因?yàn)樯a(chǎn)和消費(fèi)的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對對象上鎖,之前一直以為只是對這個(gè)方法上鎖,別的synchronized方法仍然可以進(jìn)入,哎,慚愧慚愧~~),所以肯定是生產(chǎn)者先生產(chǎn)出一定數(shù)量的東西,消費(fèi)者才可以開始消費(fèi),所以在生產(chǎn)的時(shí)候發(fā)現(xiàn)in==out,那一定是滿了,同理,在消費(fèi)的時(shí)候發(fā)現(xiàn)in==out,那一定是消費(fèi)完了,因?yàn)樯a(chǎn)的東西永遠(yuǎn)要比消費(fèi)來得早,消費(fèi)者最多可以消費(fèi)和生產(chǎn)的數(shù)量相等的東西,而不會超出.

            好了,介紹完之后,看看SUN高手是怎么實(shí)現(xiàn)這些功能的.由于buffer(存放產(chǎn)品的通道)這個(gè)關(guān)鍵變量在PipedInputStream消費(fèi)者這個(gè)類中,所以要想對buffer操作,只能通過PipedInputStream來操作,因此將產(chǎn)品放入通道的操作是在PipedInputStream中.

    存放產(chǎn)品的行為:

        protected synchronized void receive(int b) throws IOException {// 這里好像有些問題,因?yàn)檫@個(gè)方法是在PipedOutputStream類中調(diào)用的,而這個(gè)方法是protected的,下面另一個(gè)receive方法就不是protected,可能是我的源碼有些問題,也請大家?guī)臀铱纯?/span>
            checkStateForReceive();// 檢測通道是否連接,準(zhǔn)備好接收產(chǎn)品
            writeSide = Thread.currentThread();// 當(dāng)前線程是生產(chǎn)者
            if (in == out)
                awaitSpace();
    // 發(fā)現(xiàn)通道滿了,沒地方放東西啦,等吧~~
            if (in < 0{// in<0,表示通道是空的,將生產(chǎn)和消費(fèi)的位置都置成第一個(gè)位置
                in = 0;
                out 
    = 0;
            }

            buffer[in
    ++= (byte) (b & 0xFF);
            
    if (in >= buffer.length) {// 如果生產(chǎn)位置到達(dá)了通道的末尾,為了循環(huán)利用通道,將in置成0
                in = 0;
            }

        }


        
    synchronized void receive(byte b[], int off, int len) throws IOException {// 看,這個(gè)方法不是protected的!
            checkStateForReceive();
            writeSide 
    = Thread.currentThread();
            
    int bytesToTransfer = len;// 需要接收多少產(chǎn)品的數(shù)量
            while (bytesToTransfer > 0{
                
    if (in == out)
                    awaitSpace();
                
    int nextTransferAmount = 0;// 本次實(shí)際可以接收的數(shù)量
                if (out < in) {
                    nextTransferAmount 
    = buffer.length - in;// 如果消費(fèi)的當(dāng)前位置<生產(chǎn)的當(dāng)前位置,則還可以再生產(chǎn)buffer.length-in這么多
                }
     else if (in < out) {
                    
    if (in == -1{
                        in 
    = out = 0;// 如果已經(jīng)消費(fèi)完,則將in,out置成0,從頭開始接收
                        nextTransferAmount = buffer.length - in;
                    }
     else {
                        nextTransferAmount 
    = out - in;// 如果消費(fèi)的當(dāng)前位置>生產(chǎn)的當(dāng)前位置,而且還沒消費(fèi)完,那么至少還可以再生產(chǎn)out-in這么多,注意,這種情況是因?yàn)橥ǖ辣恢貜?fù)利用而產(chǎn)生的!
                    }

                }

                
    if (nextTransferAmount > bytesToTransfer)// 如果本次實(shí)際可以接收的數(shù)量要大于當(dāng)前傳過來的數(shù)量,
                    nextTransferAmount = bytesToTransfer;// 那么本次實(shí)際只能接收當(dāng)前傳過來的這么多了
                assert (nextTransferAmount > 0);
                System.arraycopy(b, off, buffer, in, nextTransferAmount);
    // 把本次實(shí)際接收的數(shù)量放進(jìn)通道
                bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進(jìn)通道
                off += nextTransferAmount;
                in 
    += nextTransferAmount;
                
    if (in >= buffer.length) {// 到末尾了,該從頭開始了
                    in = 0;
                }

            }

        }

     

    消費(fèi)產(chǎn)品的行為:

        public synchronized int read() throws IOException {// 消費(fèi)單個(gè)產(chǎn)品
            if (!connected) {
                
    throw new IOException("Pipe not connected");
            }
     else if (closedByReader) {
                
    throw new IOException("Pipe closed");
            }
     else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                    
    && (in < 0)) {
                
    throw new IOException("Write end dead");
            }


            readSide 
    = Thread.currentThread();
            
    int trials = 2;
            
    while (in < 0{// in<0,表示通道是空的,等待生產(chǎn)者生產(chǎn)
                if (closedByWriter) {
                    
    /* closed by writer, return EOF */
                    
    return -1;// 返回-1表示生產(chǎn)者已經(jīng)不再生產(chǎn)產(chǎn)品了,closedByWriter為true表示是由生產(chǎn)者將通道關(guān)閉的
                }

                
    if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                    
    throw new IOException("Pipe broken");
                }

                
    /* might be a writer waiting */
                notifyAll();
                
    try {
                    wait(
    1000);
                }
     catch (InterruptedException ex) {
                    
    throw new java.io.InterruptedIOException();
                }

            }

            
    int ret = buffer[out++& 0xFF;
            
    if (out >= buffer.length) {
                out 
    = 0;// 如果消費(fèi)到通道的末尾了,從通道頭開始繼續(xù)循環(huán)消費(fèi)
            }

            
    if (in == out) {
                
    /* now empty */
                in 
    = -1;// 消費(fèi)的位置和生產(chǎn)的位置重合了,表示消費(fèi)完了,需要生產(chǎn)者生產(chǎn),in置為-1
            }

            
    return ret;
        }


        
    public synchronized int read(byte b[], int off, int len) throws IOException {
            
    if (b == null{
                
    throw new NullPointerException();
            }
     else if ((off < 0|| (off > b.length) || (len < 0)
                    
    || ((off + len) > b.length) || ((off + len) < 0)) {
                
    throw new IndexOutOfBoundsException();
            }
     else if (len == 0{
                
    return 0;
            }


            
    /* possibly wait on the first character */
            
    int c = read();// 利用消費(fèi)單個(gè)產(chǎn)品來檢測通道是否連接,并且通道中是否有東西可消費(fèi)
            if (c < 0{
                
    return -1;// 返回-1表示生產(chǎn)者生產(chǎn)完了,消費(fèi)者也消費(fèi)完了,消費(fèi)者可以關(guān)閉通道了
            }

            b[off] 
    = (byte) c;
            
    int rlen = 1;

            
    // 這里沒有采用receive(byte [], int ,
            
    // int)方法中System.arrayCopy()的方法,其實(shí)用System.arrayCopy()的方法也可以實(shí)現(xiàn)
            /*
             * 這是用System.arrayCopy()實(shí)現(xiàn)的方法 int bytesToConsume = len - 1; while
             * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
             * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
             * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
             * buffer.length - out; }
             * 
             * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
             * bytesToConsume; assert (nextConsumeAmount > 0);
             * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
             * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
             * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
             * buffer.length) { out = 0; } if(in == out) { in = -1; } }
             
    */


            
    while ((in >= 0&& (--len > 0)) {
                b[off 
    + rlen] = buffer[out++];
                rlen
    ++;
                
    if (out >= buffer.length) {
                    out 
    = 0;
                }

                
    if (in == out) {
                    
    /* now empty */
                    in 
    = -1;// in==out,表示滿了,將in置成-1
                }

            }

            
    return rlen;
        }


    雖說功能看似簡單,可是實(shí)現(xiàn)起來卻費(fèi)了一番功夫,在線程的調(diào)度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個(gè)類,讓我對多線程編程有了更多的認(rèn)識.我覺得要想周密的分析整個(gè)功能,得把整個(gè)流程都分析清楚,構(gòu)造好了模型再去實(shí)現(xiàn)細(xì)節(jié),只有整體構(gòu)造對了,才能把正確的實(shí)現(xiàn)局部,自己的分析能力還有待加強(qiáng)啊!

    posted on 2008-04-10 17:47 爪哇豬 閱讀(3945) 評論(3)  編輯  收藏

    評論

    # re: 使用PipedInputStream,PipedOutputStream心得體會 2011-10-20 10:14 王康
    不知道你指的protected關(guān)鍵字有什么問題???  回復(fù)  更多評論
      

    # r的 2012-11-05 16:31
    的  回復(fù)  更多評論
      

    # re: 使用PipedInputStream,PipedOutputStream心得體會[未登錄] 2015-04-03 12:35 aaron
    請問可以引用嗎?謝謝  回復(fù)  更多評論
      


    只有注冊用戶登錄后才能發(fā)表評論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 亚洲av日韩av激情亚洲| 久久精品亚洲福利| 亚洲欧美成人综合久久久| 国产老女人精品免费视频| 美女被暴羞羞免费视频| 亚洲国产精品无码久久一线| 18禁无遮挡无码国产免费网站| 亚洲人成网亚洲欧洲无码| 日韩亚洲国产综合高清| 亚洲精品无码永久中文字幕| 成人免费无毒在线观看网站 | 日韩a级毛片免费视频| caoporn成人免费公开| 亚洲婷婷综合色高清在线| 亚洲国产精品成人AV无码久久综合影院| 久草免费福利资源站| 美女视频黄a视频全免费网站一区| 日韩欧美亚洲中文乱码| 久久亚洲精品成人无码网站| 深夜国产福利99亚洲视频| 亚洲精品在线免费观看视频| 成人免费激情视频| 大地资源网高清在线观看免费| 亚洲熟女综合色一区二区三区| 亚洲色偷偷av男人的天堂 | 免费毛片在线播放| 99爱在线观看免费完整版| 一区二区三区在线免费| 亚洲色www永久网站| 日韩色日韩视频亚洲网站| 亚洲激情视频图片| 亚洲人成电影在在线观看网色| 久久久久亚洲AV无码专区网站| 国产亚洲精品观看91在线| 亚洲综合一区二区精品导航 | 免费看h片的网站| 国产一区二区三区免费观在线| 国产成人亚洲精品91专区高清 | 亚洲人成网77777色在线播放| 亚洲国产精品成人网址天堂| 亚洲av午夜福利精品一区|