<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    java人

    愛生活,更愛java!

      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      1 隨筆 :: 2 文章 :: 3 評論 :: 0 Trackbacks

            今天無意中看到PipedInputStream這個類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:

            PipedInputStream類與PipedOutputStream類用于在應用程序中創建管道通信.一個PipedInputStream實例對象必須和一個PipedOutputStream實例對象進行連接而產生一個通信管道.PipedOutputStream可以向管道中寫入數據,PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數據.這兩個類主要用來完成線程之間的通信.一個線程的PipedInputStream對象能夠從另外一個線程的PipedOutputStream對象中讀取數據.

            原來如此,不過這只是說說而已,具體怎么實現的呢,愛刨根問底的我可不會輕易放過這些疑惑,于是看了一下SUN源碼這兩個類的實現,恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會在此寫上,供各位批判:)

            首先簡單的介紹一下這兩個類的實現原理,PipedInputStreamPipedOutputStream的實現原理類似于"生產者-消費者"原理,PipedOutputStream是生產者,PipedInputStream是消費者,在PipedInputStream中有一個buffer字節數組,默認大小為1024,作為緩沖區,存放"生產者"生產出來的東東.還有兩個變量,in,out,in是用來記錄"生產者"生產了多少,out是用來記錄"消費者"消費了多少,in為-1表示消費完了,in==out表示生產滿了.當消費者沒東西可消費的時候,也就是當in為-1的時候,消費者會一直等待,直到有東西可消費.

            因為生產和消費的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對對象上鎖,之前一直以為只是對這個方法上鎖,別的synchronized方法仍然可以進入,哎,慚愧慚愧~~),所以肯定是生產者先生產出一定數量的東西,消費者才可以開始消費,所以在生產的時候發現in==out,那一定是滿了,同理,在消費的時候發現in==out,那一定是消費完了,因為生產的東西永遠要比消費來得早,消費者最多可以消費和生產的數量相等的東西,而不會超出.

            好了,介紹完之后,看看SUN高手是怎么實現這些功能的.由于buffer(存放產品的通道)這個關鍵變量在PipedInputStream消費者這個類中,所以要想對buffer操作,只能通過PipedInputStream來操作,因此將產品放入通道的操作是在PipedInputStream中.

    存放產品的行為:

        protected synchronized void receive(int b) throws IOException {// 這里好像有些問題,因為這個方法是在PipedOutputStream類中調用的,而這個方法是protected的,下面另一個receive方法就不是protected,可能是我的源碼有些問題,也請大家幫我看看
            checkStateForReceive();// 檢測通道是否連接,準備好接收產品
            writeSide = Thread.currentThread();// 當前線程是生產者
            if (in == out)
                awaitSpace();
    // 發現通道滿了,沒地方放東西啦,等吧~~
            if (in < 0{// in<0,表示通道是空的,將生產和消費的位置都置成第一個位置
                in = 0;
                out 
    = 0;
            }

            buffer[in
    ++= (byte) (b & 0xFF);
            
    if (in >= buffer.length) {// 如果生產位置到達了通道的末尾,為了循環利用通道,將in置成0
                in = 0;
            }

        }


        
    synchronized void receive(byte b[], int off, int len) throws IOException {// 看,這個方法不是protected的!
            checkStateForReceive();
            writeSide 
    = Thread.currentThread();
            
    int bytesToTransfer = len;// 需要接收多少產品的數量
            while (bytesToTransfer > 0{
                
    if (in == out)
                    awaitSpace();
                
    int nextTransferAmount = 0;// 本次實際可以接收的數量
                if (out < in) {
                    nextTransferAmount 
    = buffer.length - in;// 如果消費的當前位置<生產的當前位置,則還可以再生產buffer.length-in這么多
                }
     else if (in < out) {
                    
    if (in == -1{
                        in 
    = out = 0;// 如果已經消費完,則將in,out置成0,從頭開始接收
                        nextTransferAmount = buffer.length - in;
                    }
     else {
                        nextTransferAmount 
    = out - in;// 如果消費的當前位置>生產的當前位置,而且還沒消費完,那么至少還可以再生產out-in這么多,注意,這種情況是因為通道被重復利用而產生的!
                    }

                }

                
    if (nextTransferAmount > bytesToTransfer)// 如果本次實際可以接收的數量要大于當前傳過來的數量,
                    nextTransferAmount = bytesToTransfer;// 那么本次實際只能接收當前傳過來的這么多了
                assert (nextTransferAmount > 0);
                System.arraycopy(b, off, buffer, in, nextTransferAmount);
    // 把本次實際接收的數量放進通道
                bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進通道
                off += nextTransferAmount;
                in 
    += nextTransferAmount;
                
    if (in >= buffer.length) {// 到末尾了,該從頭開始了
                    in = 0;
                }

            }

        }

     

    消費產品的行為:

        public synchronized int read() throws IOException {// 消費單個產品
            if (!connected) {
                
    throw new IOException("Pipe not connected");
            }
     else if (closedByReader) {
                
    throw new IOException("Pipe closed");
            }
     else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                    
    && (in < 0)) {
                
    throw new IOException("Write end dead");
            }


            readSide 
    = Thread.currentThread();
            
    int trials = 2;
            
    while (in < 0{// in<0,表示通道是空的,等待生產者生產
                if (closedByWriter) {
                    
    /* closed by writer, return EOF */
                    
    return -1;// 返回-1表示生產者已經不再生產產品了,closedByWriter為true表示是由生產者將通道關閉的
                }

                
    if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                    
    throw new IOException("Pipe broken");
                }

                
    /* might be a writer waiting */
                notifyAll();
                
    try {
                    wait(
    1000);
                }
     catch (InterruptedException ex) {
                    
    throw new java.io.InterruptedIOException();
                }

            }

            
    int ret = buffer[out++& 0xFF;
            
    if (out >= buffer.length) {
                out 
    = 0;// 如果消費到通道的末尾了,從通道頭開始繼續循環消費
            }

            
    if (in == out) {
                
    /* now empty */
                in 
    = -1;// 消費的位置和生產的位置重合了,表示消費完了,需要生產者生產,in置為-1
            }

            
    return ret;
        }


        
    public synchronized int read(byte b[], int off, int len) throws IOException {
            
    if (b == null{
                
    throw new NullPointerException();
            }
     else if ((off < 0|| (off > b.length) || (len < 0)
                    
    || ((off + len) > b.length) || ((off + len) < 0)) {
                
    throw new IndexOutOfBoundsException();
            }
     else if (len == 0{
                
    return 0;
            }


            
    /* possibly wait on the first character */
            
    int c = read();// 利用消費單個產品來檢測通道是否連接,并且通道中是否有東西可消費
            if (c < 0{
                
    return -1;// 返回-1表示生產者生產完了,消費者也消費完了,消費者可以關閉通道了
            }

            b[off] 
    = (byte) c;
            
    int rlen = 1;

            
    // 這里沒有采用receive(byte [], int ,
            
    // int)方法中System.arrayCopy()的方法,其實用System.arrayCopy()的方法也可以實現
            /*
             * 這是用System.arrayCopy()實現的方法 int bytesToConsume = len - 1; while
             * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
             * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
             * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
             * buffer.length - out; }
             * 
             * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
             * bytesToConsume; assert (nextConsumeAmount > 0);
             * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
             * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
             * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
             * buffer.length) { out = 0; } if(in == out) { in = -1; } }
             
    */


            
    while ((in >= 0&& (--len > 0)) {
                b[off 
    + rlen] = buffer[out++];
                rlen
    ++;
                
    if (out >= buffer.length) {
                    out 
    = 0;
                }

                
    if (in == out) {
                    
    /* now empty */
                    in 
    = -1;// in==out,表示滿了,將in置成-1
                }

            }

            
    return rlen;
        }


    雖說功能看似簡單,可是實現起來卻費了一番功夫,在線程的調度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個類,讓我對多線程編程有了更多的認識.我覺得要想周密的分析整個功能,得把整個流程都分析清楚,構造好了模型再去實現細節,只有整體構造對了,才能把正確的實現局部,自己的分析能力還有待加強啊!

    posted on 2008-04-10 17:47 爪哇豬 閱讀(3945) 評論(3)  編輯  收藏

    評論

    # re: 使用PipedInputStream,PipedOutputStream心得體會 2011-10-20 10:14 王康
    不知道你指的protected關鍵字有什么問題???  回復  更多評論
      

    # r的 2012-11-05 16:31
    的  回復  更多評論
      

    # re: 使用PipedInputStream,PipedOutputStream心得體會[未登錄] 2015-04-03 12:35 aaron
    請問可以引用嗎?謝謝  回復  更多評論
      


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 亚洲欧洲国产精品你懂的| 亚洲人成影院午夜网站| 免费看搞黄视频网站| 91亚洲精品麻豆| 亚洲成?v人片天堂网无码| 无码av免费网站| 黄色片网站在线免费观看| 亚洲欧洲免费视频| 哒哒哒免费视频观看在线www| 一个人免费视频观看在线www| 亚洲香蕉在线观看| 亚洲午夜福利AV一区二区无码| 野花高清在线电影观看免费视频| 青青草国产免费国产是公开| 精品亚洲成AV人在线观看| 四虎永久免费地址在线网站| 中文字幕在线观看免费视频| 特级毛片爽www免费版| 亚洲精品国产国语| 亚洲av无码不卡| 亚洲成av人片天堂网老年人| 国产曰批免费视频播放免费s | 在线观看免费视频网站色| 亚洲日本一线产区和二线产区对比| 亚洲理论电影在线观看| 国产婷婷高清在线观看免费| 最近2018中文字幕免费视频| 国产黄在线观看免费观看不卡| 中文字幕乱码亚洲精品一区| 亚洲av永久无码精品秋霞电影影院| 啊v在线免费观看| 免费看的成人yellow视频| 久久成人国产精品免费软件| 免费观看91视频| 亚欧乱色国产精品免费视频| 亚洲国产精品成人综合色在线| 亚洲精品午夜久久久伊人| 亚洲成a人片在线观看无码专区| 亚洲视频人成在线播放| 国产乱子伦精品免费无码专区| 啦啦啦高清视频在线观看免费|