<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    大鳥的學(xué)習(xí)樂園
    路漫漫其修遠(yuǎn)兮,吾將上下而求索
    posts - 26,comments - 27,trackbacks - 0

    BlockingQueue
    支持兩個(gè)附加操作的 Queue,這兩個(gè)操作是:檢索元素時(shí)等待隊(duì)列變?yōu)榉强眨约按鎯?chǔ)元素時(shí)等待空間變得可用。

    BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個(gè) null 元素時(shí),某些實(shí)現(xiàn)會(huì)拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。

    BlockingQueue 可以是限定容量的。它在任意給定時(shí)間都可以有一個(gè) remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。
    沒有任何內(nèi)部容量約束的 BlockingQueue 總是報(bào)告 Integer.MAX_VALUE 的剩余容量。

    BlockingQueue 實(shí)現(xiàn)主要用于生產(chǎn)者-使用者隊(duì)列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊(duì)列中移除任意一個(gè)元素是有可能的。
    然而,這種操作通常不 會(huì)有效執(zhí)行,只能有計(jì)劃地偶爾使用,比如在取消排隊(duì)信息時(shí)。

    BlockingQueue 實(shí)現(xiàn)是線程安全的。所有排隊(duì)方法都可以使用內(nèi)部鎖定或其他形式的并發(fā)控制來自動(dòng)達(dá)到它們的目的。
    然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動(dòng)執(zhí)行,除非在實(shí)現(xiàn)中特別說明。
    因此,舉例來說,在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個(gè)異常)。

    BlockingQueue 實(shí)質(zhì)上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項(xiàng)。
    這種功能的需求和使用有依賴于實(shí)現(xiàn)的傾向。例如,一種常用的策略是:對(duì)于生產(chǎn)者,插入特殊的 end-of-stream 或 poison 對(duì)象,并根據(jù)使用者獲取這些對(duì)象的時(shí)間來對(duì)它們進(jìn)行解釋。

    下面的例子演示了這個(gè)阻塞隊(duì)列的基本功能。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;

    public class MyBlockingQueue extends Thread {
    public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

    private int index;

    public MyBlockingQueue(int i) {
       this.index = i;
    }

    public void run() {
       try {
        queue.put(String.valueOf(this.index));
        System.out.println("{" + this.index + "} in queue!");
       } catch (Exception e) {
        e.printStackTrace();
       }
    }

    public static void main(String args[]) {
       ExecutorService service = Executors.newCachedThreadPool();
       for (int i = 0; i < 10; i++) {
        service.submit(new MyBlockingQueue(i));
       }
       Thread thread = new Thread() {
        public void run() {
         try {
          while (true) {
           Thread.sleep((int) (Math.random() * 1000));
           if(MyBlockingQueue.queue.isEmpty())
            break;
           String str = MyBlockingQueue.queue.take();
           System.out.println(str + " has take!");
          }
         } catch (Exception e) {
          e.printStackTrace();
         }
        }
       };
       service.submit(thread);
       service.shutdown();
    }
    }
    ---------------------執(zhí)行結(jié)果-----------------
    {0} in queue!
    {1} in queue!
    {2} in queue!
    {3} in queue!
    0 has take!
    {4} in queue!
    1 has take!
    {6} in queue!
    2 has take!
    {7} in queue!
    3 has take!
    {8} in queue!
    4 has take!
    {5} in queue!
    6 has take!
    {9} in queue!
    7 has take!
    8 has take!
    5 has take!
    9 has take!

    -----------------------------------------


    CompletionService

    將生產(chǎn)新的異步任務(wù)與使用已完成任務(wù)的結(jié)果分離開來的服務(wù)。生產(chǎn)者 submit 執(zhí)行的任務(wù)。使用者 take 已完成的任務(wù),
    并按照完成這些任務(wù)的順序處理它們的結(jié)果。例如,CompletionService 可以用來管理異步 IO ,執(zhí)行讀操作的任務(wù)作為程序或系統(tǒng)的一部分提交,
    然后,當(dāng)完成讀操作時(shí),會(huì)在程序的不同部分執(zhí)行其他操作,執(zhí)行操作的順序可能與所請(qǐng)求的順序不同。

    通常,CompletionService 依賴于一個(gè)單獨(dú)的 Executor 來實(shí)際執(zhí)行任務(wù),在這種情況下,
    CompletionService 只管理一個(gè)內(nèi)部完成隊(duì)列。ExecutorCompletionService 類提供了此方法的一個(gè)實(shí)現(xiàn)。


    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class MyCompletionService implements Callable<String> {
    private int id;

    public MyCompletionService(int i){
       this.id=i;
    }
    public static void main(String[] args) throws Exception{
       ExecutorService service=Executors.newCachedThreadPool();
       CompletionService<String> completion=new ExecutorCompletionService<String>(service);
       for(int i=0;i<10;i++){
        completion.submit(new MyCompletionService(i));
       }
       for(int i=0;i<10;i++){
        System.out.println(completion.take().get());
       }
       service.shutdown();
    }
    public String call() throws Exception {
       Integer time=(int)(Math.random()*1000);
       try{
        System.out.println(this.id+" start");
        Thread.sleep(time);
        System.out.println(this.id+" end");
       }
       catch(Exception e){
        e.printStackTrace();
       }
       return this.id+":"+time;
    }
    }


    CountDownLatch


    一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。

    用給定的計(jì)數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當(dāng)前計(jì)數(shù)到達(dá)零之前,await 方法會(huì)一直受阻塞。
    之后,會(huì)釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次——計(jì)數(shù)無法被重置。如果需要重置計(jì)數(shù),請(qǐng)考慮使用 CyclicBarrier。

    CountDownLatch 是一個(gè)通用同步工具,它有很多用途。將計(jì)數(shù) 1 初始化的 CountDownLatch 用作一個(gè)簡單的開/關(guān)鎖存器,
    或入口:在通過調(diào)用 countDown() 的線程打開入口前,所有調(diào)用 await 的線程都一直在入口處等待。
    用 N 初始化的 CountDownLatch 可以使一個(gè)線程在 N 個(gè)線程完成某項(xiàng)操作之前一直等待,或者使其在某項(xiàng)操作完成 N 次之前一直等待。

    CountDownLatch 的一個(gè)有用特性是,它不要求調(diào)用 countDown 方法的線程等到計(jì)數(shù)到達(dá)零時(shí)才繼續(xù),
    而在所有線程都能通過之前,它只是阻止任何線程繼續(xù)通過一個(gè) await。
    一下的例子是別人寫的,非常形象。

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class TestCountDownLatch {
    public static void main(String[] args) throws InterruptedException {
       // 開始的倒數(shù)鎖
       final CountDownLatch begin = new CountDownLatch(1);
       // 結(jié)束的倒數(shù)鎖
       final CountDownLatch end = new CountDownLatch(10);
       // 十名選手
       final ExecutorService exec = Executors.newFixedThreadPool(10);
      
       for (int index = 0; index < 10; index++) {
        final int NO = index + 1;
        Runnable run = new Runnable() {
         public void run() {
          try {
           begin.await();//一直阻塞
           Thread.sleep((long) (Math.random() * 10000));
           System.out.println("No." + NO + " arrived");
          } catch (InterruptedException e) {
          } finally {
           end.countDown();
          }
         }
        };
        exec.submit(run);
       }
       System.out.println("Game Start");
       begin.countDown();
       end.await();
       System.out.println("Game Over");
       exec.shutdown();
    }
    }
    CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數(shù)一次,后者是等待倒數(shù)到0,如果沒有到達(dá)0,就只有阻塞等待了。


    CyclicBarrier

    一個(gè)同步輔助類,它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)。
    在涉及一組固定大小的線程的程序中,這些線程必須不時(shí)地互相等待,此時(shí) CyclicBarrier 很有用。因?yàn)樵?barrier 在釋放等待線程后可以重用,所以稱它為循環(huán) 的 barrier。

    CyclicBarrier 支持一個(gè)可選的 Runnable 命令,在一組線程中的最后一個(gè)線程到達(dá)之后(但在釋放所有線程之前),
    該命令只在每個(gè)屏障點(diǎn)運(yùn)行一次。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。

    示例用法:下面是一個(gè)在并行分解設(shè)計(jì)中使用 barrier 的例子,很經(jīng)典的旅行團(tuán)例子:
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    public class TestCyclicBarrier {
    // 徒步需要的時(shí)間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
    private static int[] timeWalk = { 5, 8, 15, 15, 10 };
    // 自駕游
    private static int[] timeSelf = { 1, 3, 4, 4, 5 };
    // 旅游大巴
    private static int[] timeBus = { 2, 4, 6, 6, 7 };

    static String now() {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        return sdf.format(new Date()) + ": ";
    }

    static class Tour implements Runnable {
        private int[] times;
        private CyclicBarrier barrier;
        private String tourName;
        public Tour(CyclicBarrier barrier, String tourName, int[] times) {
          this.times = times;
          this.tourName = tourName;
          this.barrier = barrier;
        }
        public void run() {
          try {
            Thread.sleep(times[0] * 1000);
            System.out.println(now() + tourName + " Reached Shenzhen");
            barrier.await();
            Thread.sleep(times[1] * 1000);
            System.out.println(now() + tourName + " Reached Guangzhou");
            barrier.await();
            Thread.sleep(times[2] * 1000);
            System.out.println(now() + tourName + " Reached Shaoguan");
            barrier.await();
            Thread.sleep(times[3] * 1000);
            System.out.println(now() + tourName + " Reached Changsha");
            barrier.await();
            Thread.sleep(times[4] * 1000);
            System.out.println(now() + tourName + " Reached Wuhan");
            barrier.await();
          } catch (InterruptedException e) {
          } catch (BrokenBarrierException e) {
          }
        }
    }

    public static void main(String[] args) {
        // 三個(gè)旅行團(tuán)
        CyclicBarrier barrier = new CyclicBarrier(3);
        ExecutorService exec = Executors.newFixedThreadPool(3);
        exec.submit(new Tour(barrier, "WalkTour", timeWalk));
        exec.submit(new Tour(barrier, "SelfTour", timeSelf));
    //當(dāng)我們把下面的這段代碼注釋后,會(huì)發(fā)現(xiàn),程序阻塞了,無法繼續(xù)運(yùn)行下去。
        exec.submit(new Tour(barrier, "BusTour", timeBus));
        exec.shutdown();
    }
    }

    CyclicBarrier最重要的屬性就是參與者個(gè)數(shù),另外最要方法是await()。當(dāng)所有線程都調(diào)用了await()后,就表示這些線程都可以繼續(xù)執(zhí)行,否則就會(huì)等待。

    Future

    Future 表示異步計(jì)算的結(jié)果。它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并檢索計(jì)算的結(jié)果。
    計(jì)算完成后只能使用 get 方法來檢索結(jié)果,如有必要,計(jì)算完成前可以阻塞此方法。取消則由 cancel 方法來執(zhí)行。
    還提供了其他方法,以確定任務(wù)是正常完成還是被取消了。一旦計(jì)算完成,就不能再取消計(jì)算。
    如果為了可取消性而使用 Future但又不提供可用的結(jié)果,則可以聲明 Future<?> 形式類型、并返回 null 作為基礎(chǔ)任務(wù)的結(jié)果。

    這個(gè)我們?cè)谇懊鍯ompletionService已經(jīng)看到了,這個(gè)Future的功能,而且這個(gè)可以在提交線程的時(shí)候被指定為一個(gè)返回對(duì)象的。


    ScheduledExecutorService

    一個(gè) ExecutorService,可安排在給定的延遲后運(yùn)行或定期執(zhí)行的命令。

    schedule 方法使用各種延遲創(chuàng)建任務(wù),并返回一個(gè)可用于取消或檢查執(zhí)行的任務(wù)對(duì)象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法創(chuàng)建并執(zhí)行某些在取消前一直定期運(yùn)行的任務(wù)。

    用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通過所請(qǐng)求的 0 延遲進(jìn)行安排。
    schedule 方法中允許出現(xiàn) 0 和負(fù)數(shù)延遲(但不是周期),并將這些視為一種立即執(zhí)行的請(qǐng)求。

    所有的 schedule 方法都接受相對(duì) 延遲和周期作為參數(shù),而不是絕對(duì)的時(shí)間或日期。將以 Date 所表示的絕對(duì)時(shí)間轉(zhuǎn)換成要求的形式很容易。
    例如,要安排在某個(gè)以后的日期運(yùn)行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
    但是要注意,由于網(wǎng)絡(luò)時(shí)間同步協(xié)議、時(shí)鐘漂移或其他因素的存在,因此相對(duì)延遲的期滿日期不必與啟用任務(wù)的當(dāng)前 Date 相符。
    Executors 類為此包中所提供的 ScheduledExecutorService 實(shí)現(xiàn)提供了便捷的工廠方法。

    一下的例子也是網(wǎng)上比較流行的。

    import static java.util.concurrent.TimeUnit.SECONDS;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;

    public class TestScheduledThread {
    public static void main(String[] args) {
       final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
       final Runnable beeper = new Runnable() {
        int count = 0;

        public void run() {
         System.out.println(new Date() + " beep " + (++count));
        }
       };
       // 1秒鐘后運(yùn)行,并每隔2秒運(yùn)行一次
       final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
       // 2秒鐘后運(yùn)行,并每次在上次任務(wù)運(yùn)行完后等待5秒后重新運(yùn)行
       final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
       // 30秒后結(jié)束關(guān)閉任務(wù),并且關(guān)閉Scheduler
       scheduler.schedule(new Runnable() {
        public void run() {
         beeperHandle.cancel(true);
         beeperHandle2.cancel(true);
         scheduler.shutdown();
        }
       }, 30, SECONDS);
    }
    }

    這樣我們就把concurrent包下比較重要的功能都已經(jīng)總結(jié)完了,希望對(duì)我們理解能有幫助。

    posted on 2009-09-14 17:07 大鳥 閱讀(294) 評(píng)論(0)  編輯  收藏 所屬分類: JAVA
    主站蜘蛛池模板: 永久免费观看的毛片的网站| 一个人免费日韩不卡视频| 一二三四免费观看在线视频中文版| 日韩va亚洲va欧洲va国产| 国产免费一区二区三区免费视频 | 亚洲乱码一二三四区麻豆| 在线成人精品国产区免费| 亚洲一区二区三区AV无码| free哆拍拍免费永久视频| 全免费一级毛片在线播放| 亚洲人成未满十八禁网站| 四虎成人精品一区二区免费网站| 亚洲伊人久久大香线蕉结合| 成人毛片免费视频| 亚洲精品无码永久在线观看男男| 日本最新免费不卡二区在线| 黄网站在线播放视频免费观看| 国产精品免费看久久久久 | 精品免费久久久久国产一区 | 99精品国产免费久久久久久下载| 久久精品国产亚洲αv忘忧草| 在线观看人成视频免费| 精品一区二区三区无码免费直播| 亚洲人午夜射精精品日韩| 皇色在线免费视频| 中文字幕亚洲综合久久2| 亚洲免费综合色在线视频| 羞羞视频免费网站含羞草| 国产黄色一级毛片亚洲黄片大全 | a级毛片在线免费看| 噜噜噜亚洲色成人网站∨| 成人人观看的免费毛片| 色老头综合免费视频| 亚洲国产一区在线| 大学生一级特黄的免费大片视频 | 亚洲无吗在线视频| 亚洲欧洲国产成人综合在线观看 | 亚洲另类激情专区小说图片| baoyu777永久免费视频 | 午夜网站在线观看免费完整高清观看| 亚洲成AV人片久久|