<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢蝶

    生活、程序、未來
       :: 首頁 ::  ::  :: 聚合  :: 管理

    線程任務(wù)的取消

    Posted on 2007-09-03 15:50 dennis 閱讀(1808) 評論(0)  編輯  收藏 所屬分類: java
        當(dāng)外部代碼能夠在活動自然完成之前,把它的狀態(tài)更改為完成狀態(tài),那么這個活動被稱為可取消(cancellable)。取消任務(wù)是一個很常見的需求,無論是由于用戶請求還是系統(tǒng)錯誤引起的服務(wù)關(guān)閉等等原因。最簡單的任務(wù)取消策略就是在線程中維持一個bool變量,在run方法中判斷此變量的bool值來決定是否取消任務(wù)。顯然,這個bool變量需要聲明為volatile,以保持多線程環(huán)境下可見性(所謂可見性,就是當(dāng)一個線程修改共享對象的某個狀態(tài)變量后,另一個線程可以馬上看到修改結(jié)果)。下面是一個來自《java并發(fā)編程實(shí)踐》的例子:
    package net.rubyeye.concurrency.chapter7;

    import java.math.BigInteger;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;

    public class PrimeGenerator implements Runnable {
        
    private final List<BigInteger> primes = new ArrayList<BigInteger>();

        
    private volatile boolean cancelled;
       
    public void run() {
            BigInteger p 
    = BigInteger.ONE;
            
    while (!cancelled) {
                p 
    = p.nextProbablePrime();
                
    synchronized (this) {
                    primes.add(p);
                }
            }
        }
       
    public void cancel() {
            cancelled 
    = true;
        }
       
    public synchronized List<BigInteger> get() {
            
    return new ArrayList<BigInteger>(primes);
        }

       
    public static void main(String args[]) throws InterruptedException {
            PrimeGenerator generator 
    = new PrimeGenerator();
            
    new Thread(generator).start();
            
    try {
                TimeUnit.SECONDS.sleep(
    1);
            } 
    finally {
                generator.cancel();
            }
        }
    }
        main中啟動一個素數(shù)生成的任務(wù),線程運(yùn)行一秒就取消掉。通過線程中的cancelled變量來表征任務(wù)是否繼續(xù)執(zhí)行。既然是最簡單的策略,那么什么是例外情況?顯然,阻塞操作下(比如調(diào)用join,wait,sleep方法),這樣的策略會出問題。任務(wù)因?yàn)檎{(diào)用這些阻塞方法而被阻塞,它將不會去檢查volatile變量,導(dǎo)致取消操作失效。那么解決辦法是什么?中斷!考慮我們用BlockingQueue去保存生成的素數(shù),BlockingQueue的put方法是阻塞的(當(dāng)BlockingQueue滿的時候,put操作會阻塞直到有元素被take),讓我們看看不采用中斷,仍然采用簡單策略會出現(xiàn)什么情況:
    package net.rubyeye.concurrency.chapter7;

    import java.math.BigInteger;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    public class BrokenPrimeProducer extends Thread {
        
    static int i = 1000;

        
    private final BlockingQueue<BigInteger> queue;

        
    private volatile boolean cancelled = false;

        BrokenPrimeProducer(BlockingQueue
    <BigInteger> queue) {
            
    this.queue = queue;
        }

        
    public void run() {
            BigInteger p 
    = BigInteger.ONE;
            
    try {
                
    while (!cancelled) {
                    p 
    = p.nextProbablePrime();
                    queue.put(p);
                }
            } 
    catch (InterruptedException cusumed) {
            }
        }

        
    public void cancel() {
            
    this.cancelled = false;
        }

        
    public static void main(String args[]) throws InterruptedException {
            BlockingQueue
    <BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                    
    10);
            BrokenPrimeProducer producer 
    = new BrokenPrimeProducer(queue);
            producer.start();
            
    try {
                
    while (needMorePrimes())
                    queue.take();
            } 
    finally {
                producer.cancel();
            }
        }

        
    public static boolean needMorePrimes() throws InterruptedException {
            
    boolean result = true;
            i
    --;
            
    if (i == 0)
                result 
    = false;
            
    return result;
        }
    }
        我們在main中通過queue.take來消費(fèi)產(chǎn)生的素數(shù)(雖然僅僅是取出扔掉),我們只消費(fèi)了1000個素數(shù),然后嘗試取消產(chǎn)生素數(shù)的任務(wù),很遺憾,取消不了,因?yàn)楫a(chǎn)生素數(shù)的線程產(chǎn)生素數(shù)的速度大于我們消費(fèi)的速度,我們在消費(fèi)1000后就停止消費(fèi)了,那么任務(wù)將被queue的put方法阻塞,永遠(yuǎn)也不會去判斷cancelled狀態(tài)變量,任務(wù)取消不了。正確的做法應(yīng)當(dāng)是使用中斷(interrupt):
    package net.rubyeye.concurrency.chapter7;

    import java.math.BigInteger;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    public class PrimeProducer extends Thread {
        
    static int i = 1000;

        
    private final BlockingQueue<BigInteger> queue;

        
    private volatile boolean cancelled = false;

        PrimeProducer(BlockingQueue
    <BigInteger> queue) {
            
    this.queue = queue;
        }

        
    public void run() {
            BigInteger p 
    = BigInteger.ONE;
            
    try {
                
    while (!Thread.currentThread().isInterrupted()) {
                    p 
    = p.nextProbablePrime();
                    queue.put(p);
                }
            } 
    catch (InterruptedException cusumed) {
            }
        }

        
    public void cancel() {
            interrupt();
        }

        
    public static void main(String args[]) throws InterruptedException {
            BlockingQueue
    <BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                    
    10);
            PrimeProducer producer 
    = new PrimeProducer(queue);
            producer.start();
            
    try {
                
    while (needMorePrimes())
                    queue.take();
            } 
    finally {
                producer.cancel();
            }
        }

        
    public static boolean needMorePrimes() throws InterruptedException {
            
    boolean result = true;
            i
    --;
            
    if (i == 0)
                result 
    = false;
            
    return result;
        }
    }
       在run方法中,通過Thread的isInterrupted來判斷interrupt status是否已經(jīng)被修改,從而正確實(shí)現(xiàn)了任務(wù)的取消。關(guān)于interrupt,有一點(diǎn)需要特別說明,調(diào)用interrupt并不意味著必然停止目標(biāo)線程的正在進(jìn)行的工作,它僅僅是傳遞一個請求中斷的信號給目標(biāo)線程,目標(biāo)線程會在下一個方便的時刻中斷。而對于阻塞方法產(chǎn)生的InterruptedException的處理,兩種選擇:要么重新拋出讓上層代碼來處理,要么在catch塊中調(diào)用Thread的interrupt來保存中斷狀態(tài)。除非你確定要讓工作線程終止(如上所示代碼),否則不要僅僅是catch而不做任務(wù)處理工作(生吞了InterruptedException),更詳細(xì)可以參考這里。如果不清楚外部線程的中斷策略,生搬硬套地調(diào)用interrupt可能產(chǎn)生不可預(yù)料的后果,可參見書中7.1.4例子。

       另外一個取消任務(wù)的方法就是采用Future來管理任務(wù),這是JDK5引入的,用于管理任務(wù)的生命周期,處理異常等。比如調(diào)用ExecutorService的sumit方法會返回一個Future來描述任務(wù),而Future有一個cancel方法用于取消任務(wù)。
       那么,如果任務(wù)調(diào)用了不可中斷的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么該怎么處理呢?簡單地,關(guān)閉它們!參考下面的例子:
    package net.rubyeye.concurrency.chapter7;

    import java.io.IOException;
    import java.io.InputStream;
    import java.net.Socket;

    /**
     * 展示對于不可中斷阻塞的取消任務(wù) 通過關(guān)閉socket引發(fā)異常來中斷
     * 
     * 
    @author Admin
     * 
     
    */
    public abstract class ReaderThread extends Thread {
        
    private final Socket socket;

        
    private final InputStream in;

        
    public ReaderThread(Socket socket) throws IOException {
            
    this.socket = socket;
            
    this.in = socket.getInputStream();
        }

        
    // 重寫interrupt方法
        public void interrupt() {
            
    try {
                socket.close();
            } 
    catch (IOException e) {
            } 
    finally {
                
    super.interrupt();
            }
        }

        
    public void run() {
            
    try {
                
    byte[] buf = new byte[1024];
                
    while (true) {
                    
    int count = in.read(buf);
                    
    if (count < 0)
                        
    break;
                    
    else if (count > 0)
                        processBuff(buf, count);
                }
            } 
    catch (IOException e) {
            }
        }

        
    public abstract void processBuff(byte[] buf, int count);
    }
        Reader線程重寫了interrupt方法,其中調(diào)用了socket的close方法用于中斷read方法,最后,又調(diào)用了super.interrupt(),防止當(dāng)調(diào)用可中斷的阻塞方法時不能正常中斷。


    主站蜘蛛池模板: 亚洲综合视频在线观看| 91免费在线播放| 亚洲男同gay片| 久久99国产亚洲精品观看| 日本免费观看网站| 国产精彩免费视频| 国产成人免费ā片在线观看老同学 | 免费jjzz在线播放国产| 国拍在线精品视频免费观看| 日韩电影免费在线观看网站| 色妞www精品视频免费看| 国产91在线|亚洲| 久久精品蜜芽亚洲国产AV| 亚洲国产三级在线观看| 亚洲国产人成中文幕一级二级| 午夜国产精品免费观看| 美女被cao网站免费看在线看| 特黄特色的大片观看免费视频| 亚洲欧美成aⅴ人在线观看| 亚洲av成人综合网| 亚洲激情视频网站| 亚洲激情中文字幕| 国产AV无码专区亚洲A∨毛片| 久久精品国产精品亚洲| 亚洲欧洲中文日韩久久AV乱码 | 久久亚洲精品无码av| 亚洲国产综合AV在线观看| 亚洲啪啪免费视频| 亚洲欧洲日韩在线电影| 亚洲欧洲日产国码久在线观看| 亚洲成Av人片乱码色午夜| 亚洲av福利无码无一区二区| 亚洲激情在线视频| 亚洲视频手机在线| 亚洲国产成人九九综合| 亚洲伦理中文字幕| 亚洲国产熟亚洲女视频| 色偷偷噜噜噜亚洲男人| 无遮挡a级毛片免费看| 国产福利在线观看永久免费| 中国一级特黄高清免费的大片中国一级黄色片 |