<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0
    1.ThreadPoolExecutor#execute(Runnable command)

         public void execute(Runnable command) {
                   
    // 如果任務為空,則直接拋出空指針異常
                    if (command == null)
                        
    throw new NullPointerException();
                    
    // 1.如果線程池線程數目UnderCorePoolSize且RUNNING則直接添加worker線程并啟動
                    
    // 2.如果超過了corePoolSize或者addIfUnderCorePoolSize失敗則
                    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                    
    // 如果線程池是RUNNING狀態且可將任務command加入workQueue(即不違反容量限制)
                        if (runState == RUNNING && workQueue.offer(command)) {
                       
    // 因為是并發執行.如果此時發現線程池狀態不再是RUNNING(可能執行了類似shutdown的操作)或者線程池中已無Worker線程
                            if (runState != RUNNING || poolSize == 0)
                      
    //1.如果線程池狀態不再是RUNNING且此時command依然在隊列中,即還未執行則直接拒絕.
                     
    // 2.否則如果線程池狀態 < STOP,即可能是SHUTDOWN狀態且任務隊列中依然有任務且工作線程的數目不足corePoolSize,則額外添加一個Worker線程并啟動
                                ensureQueuedTaskHandled(command);
                        }

                        
    else if (!addIfUnderMaximumPoolSize(command))
                       
    // 如果在UnderMaximumPoolSize下增加worker線程失敗則執行拒絕策略,直接調用RejectedExecutionHandler#rejectedExecution
                            reject(command); // is shutdown or saturated
                    }

                }


    2. addIfUnderCorePoolSize(Runnable firstTask)

    // poolSize < corePoolSize && RUNNING的情況下添加worker線程并啟動worker線程
            private boolean addIfUnderCorePoolSize(Runnable firstTask) {
                    Thread t = null;
                   
    final ReentrantLock mainLock = this.mainLock;
                        
    //
                    mainLock.lock();
                   
    try {
                       
    // 初始poolSize為0,runState為0,即RUNNING
                       
    // RUNNING = 0 / SHUTDOWN = 1 / STOP = 2
                       
    // TERMINATED = 3
                        if (poolSize < corePoolSize && runState == RUNNING)
                            t = addThread(firstTask);
                    }
    finally {
                        mainLock.unlock();
                    }

                   
    if (t == null)
                       
    return false;
                    t.start();
                   
    return true;
                }


    3.addThread(Runnable firstTask)

    private Thread addThread(Runnable firstTask) {
                   
    // 初始化Worker,傳入firstTask
                    Worker w = new Worker(firstTask);
                   
    // 利用線程工廠新建線程,注意這里傳入的參數是w
                    Thread t = threadFactory.newThread(w);
                   
    if (t != null) {
                        w.thread = t;
                   
    // 添加至workers
                        workers.add(w);
                   
    // ++poolSize
                        int nt = ++poolSize;
                       
    if (nt > largestPoolSize)
                            largestPoolSize = nt;
                    }

                   
    return t;
                }


    4.Worker

    private final class Worker implements Runnable {
           
    /**
             * The runLock is acquired and released surrounding each task
             * execution. It mainly protects against interrupts that are
             * intended to cancel the worker thread from instead
             * interrupting the task being run.
            
    */

           
    private final ReentrantLock runLock = new ReentrantLock();

           
    /**
             * Initial task to run before entering run loop. Possibly null.
            
    */

           
    private Runnable firstTask;

           
    /**
             * Per thread completed task counter; accumulated
             * into completedTaskCount upon termination.
            
    */

           
    volatile long completedTasks;

           
    /**
             * Thread this worker is running in.  Acts as a final field,
             * but cannot be set until thread is created.
            
    */

            Thread thread;

            Worker(Runnable firstTask) {
               
    this.firstTask = firstTask;
            }


           
    boolean isActive() {
               
    return runLock.isLocked();
            }


           
    /**
             * 中斷線程如果沒有正在運行任務(可能在等待任務)
              * {@link ThreadPoolExecutor#interruptIdleWorkers}
              * {@link ThreadPoolExecutor#getTask}
              * {@link ThreadPoolExecutor#shutdown}
            
    */

           
    void interruptIfIdle() {
               
    final ReentrantLock runLock = this.runLock;
               
    if (runLock.tryLock()) {
                   
    try {
                       
    // 注意只有該方法是被其他線程調用才會執行interrupt.
                       
    // 1.個人認為如果是當前自身線程執行到這里的時候,說明getTask返回了null.線程就會結束了.
             
    // 2.Worker線程在自身任務的執行中調用此方法時沒有作用的.即恰恰說明了運行時不被中斷.(因為不太可能存在這樣的類似業務,內部線程自己在運行任務的時候中斷自己.沒有任何作用.你懂的.這壓根就是錯誤的做法)
              
    // 3.還有一個很重要的原因是:這里加了運行鎖.即如果此時有任務正在運行則獨占runLock,則其他線程必須等待任務完畢釋放鎖才可以進行interrupt.
                        if (thread != Thread.currentThread())
                            thread.interrupt();
                    }
    finally {
                        runLock.unlock();
                    }

                }

            }


           
    /**
             * Interrupts thread even if running a task.
            
    */

           
    void interruptNow() {
       
    // 直接進行中斷,無論是內部線程還是其他線程
       
    // 無論是否正在運行任務
       
    // 沒有獲得鎖
       
    // 此時如果線程正在等待任務或者任務執行過程中阻塞都可以被中斷
       
    // 個人認為該方法也肯定是由外部線程進行調用的,而非內部的線程,你懂的.用了也沒有作用.
                thread.interrupt();
            }


           
    /**
             * 運行任務在beforeExecute/afterExecute之間
            
    */

           
    private void runTask(Runnable task) {
               
    final ReentrantLock runLock = this.runLock;
                runLock.lock();
               
    try {
                   
    /*
                     * Ensure that unless pool is stopping, this thread
                     * does not have its interrupt set. This requires a
                     * double-check of state in case the interrupt was
                     * cleared concurrently with a shutdownNow -- if so,
                     * the interrupt is re-enabled.
                    
    */

               
    // 這段代碼乍看起來可能有些奇怪.個人認為是因為多線程的原因,如線程池調用了shutdownNow方法.
               
    // 1.如果線程池是RUNNING/SHUTDOWN且之前被中斷過,則清除中斷狀態(interrupted)  2.再次檢查如果執行了shutdownNow的話,則會直接interrupt thread.而此時的中斷狀態可能被清除了.->需要需要再次調用interrupt重置中斷狀態.(還需要仔細考證)
                    if (runState < STOP &&
                        Thread.interrupted() &&
                        runState >= STOP)
                        thread.interrupt();

                   
    boolean ran = false;
              
    // 任務執行前的一些業務,空實現,子類可覆蓋
              
    // 任務完成或者任務執行出現異常則可通過afterExecute(空實現)追蹤
                    beforeExecute(thread, task);
                   
    try {
                        task.run();
                        ran = true;
                        afterExecute(task, null);
                  
    // 任務計數
                        ++completedTasks;
                    }
    catch (RuntimeException ex) {
                       
    if (!ran)
                            afterExecute(task, ex);
                       
    throw ex;
                    }

                }
    finally {
                    runLock.unlock();
                }

            }


           
    /**
             * Work線程主任務循環
            
    */

           
    public void run() {
               
    try {
                    Runnable task = firstTask;
                    firstTask = null;
              
    // 1.如果第一個任務不為null則一定會執行第一個任務
              
    // 2.如果getTask為null則線程結束.
                    while (task != null || (task = getTask()) != null) {
                        runTask(task);
                        task = null;
                    }

                }
    finally {
             
    //  跳出while,線程即結束
             
    // 1.completedTaskCount 計數
             
    // 2.workers.remove(w) 從workers移除
            
    // 3.--poolSize,如果poolSize為0則tryTerminate
                    workerDone(this);
                }

            }

        }


    5.Runnable getTask()

    Runnable getTask() {
           
    for (;;) {
               
    try {
                   
    int state = runState;
              
    // 線程池運行狀態為STOP或者TERMINATED,直接返回null,則Worker線程跳出while,終止
                    if (state > SHUTDOWN)
                       
    return null;
                    Runnable r;
              
    // 如果線程池運行狀態恰好是SHUTDOWN,則繼續從隊列獲取任務(隊列為空則返回null),也在該狀態下如果線程池不為空則一直獲取任務
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();
                   
    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                  
    // RUNNING狀態下,poolSize超出了corePoolSize 或者allowCoreThreadTimeOut(允許核心線程超時) {@link ThreadPoolExecutor#allowCoreThreadTimeOut(boolean value)}
                  
    // 在keepAliveTime時間內等待可用的元素,等待時可被中斷.如果超時則返回null.
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                   
    else
                 
    // Running狀態下,poolSize未超出corPoolSize且不允許核心線程超時,則在元素變得可用之前一直等待,可被中斷
                        r = workQueue.take();
                   
    if (r != null)
                       
    return r;
              
    // 如果此時返回的任務為null且worker線程可退出(該方法其實是重復校驗,因為是并發執行.所以可能任務隊列已經有了任務等條件出現)
                    if (workerCanExit()) {
                  
    // 如果此時線程池狀態不是RUNNING
                        if (runState >= SHUTDOWN) // Wake up others
                   
    // 喚醒可能阻塞的任務,{@link Worker#interruptIfIdle}
                            interruptIdleWorkers();
                 
    // 返回null,結束任務
                        return null;
                    }

                   
    // Else retry
              
    // 繼續for-循環
                }
    catch (InterruptedException ie) {
                   
    // On interruption, re-check runState
                }

            }

        }


    6.workerCanExit()

    // 判斷worker線程是否可退出
    private boolean workerCanExit() {
           
    final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
           
    boolean canExit;
           
    try {
           
    // 運行狀態為STOP或者TERMINATED
           
    // 或者任務隊列為空
           
    // 或者池中至少有一個線程且允許核心線程超時
                canExit = runState >= STOP ||
                    workQueue.isEmpty() ||
                    (allowCoreThreadTimeOut &&
                     poolSize > Math.max(1, corePoolSize));
            }
    finally {
                mainLock.unlock();
            }

           
    return canExit;
        }


    7.tryTerminate()

    // 嘗試終止
        private void tryTerminate() {
          
    // 如果當前池中沒有線程
            if (poolSize == 0) {
               
    int state = runState;
           
    // 如果當前運行狀態時是Running/SHUTDOWN且任務隊列不為空
                if (state < STOP && !workQueue.isEmpty()) {
              
    // 重新設置為運行狀態
                    state = RUNNING; // disable termination check below
             
    // 添加一個firstTask為null的worker并啟動.因為隊列不為空則可以getTask
                    Thread t = addThread(null);
                   
    if (t != null)
                        t.start();
                }

      
                
    // 如果運行狀態為STOP或者SHUTDOWN則置狀態為TERMINATED并喚醒等待終止的線程 {@link #awaitTermination(long timeout, TimeUnit unit)}
                if (state == STOP || state == SHUTDOWN) {
                    runState = TERMINATED;
                    termination.signalAll();
                    terminated();// 此方法暫時未實現
                }

            }

        }


    8.awaitTermination(long timeout, TimeUnit unit)

    // 等待線程池終止 {@link #tryTerminate()}
            public boolean awaitTermination(long timeout, TimeUnit unit)
                   
    throws InterruptedException {
                   
    long nanos = unit.toNanos(timeout);
                   
    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                   
    try {
                   
    // 注這是一個無限循環,直到線程池終止或者超時
                        for (;;) {
                           
    if (runState == TERMINATED)
                               
    return true;
                           
    if (nanos <= 0)
                               
    return false;
                   
    //  {@link Condition#long awaitNanos(long nanosTimeout)}
                   
    //  此方法返回的是一個估算(nanosTimeout - awaitTime),如果小于等于0則表示沒有剩余時間,即超時.不過如果返回值是一個正值的話且線程池未終止的話->所以由將返回值繼續傳入了參數->確保肯定會發生超時而導致nanos<=0而跳出循環
                            nanos = termination.awaitNanos(nanos);
                        }

                    }
    finally {
                        mainLock.unlock();
                    }

                }


    9.shutdown()

    public void shutdown() {
           
    // 檢查是否有shutdown的權限
            SecurityManager security = System.getSecurityManager();
           
    if (security != null)
                security.checkPermission(shutdownPerm);

           
    final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
           
    try {
               
    if (security != null) { // Check if caller can modify our threads
           
    // 檢查所有的worker線程是否有修改線程的權限
                    for (Worker w : workers)
                        security.checkAccess(w.thread);
                }


               
    int state = runState;
           
    // 設置線程池當前狀態是RUNNING,則設置為SHUTDOWN狀態
                if (state < SHUTDOWN)
                    runState = SHUTDOWN;

               
    try {
           
    // 嘗試打斷空閑的worker線程
                    for (Worker w : workers) {
                        w.interruptIfIdle();
                    }

                }
    catch (SecurityException se) { // Try to back out
           
    // 如果出現異常,則還原狀態
                    runState = state;
                   
    // tryTerminate() here would be a no-op 這個注釋的意思是出現了這個異常,tryTerminate是不起作用的.因為tryTerminate的條件是poolSize == 0.但是異常說明interruptIfIdle失敗則不可能poolSize == 0.
           
    // 繼續向上拋出異常,這個異常是SecurityException
                    throw se;
                }

           
    // 嘗試終止(隊列為空的時候直接終止)
                tryTerminate(); // Terminate now if pool and queue empty
            }
    finally {
                mainLock.unlock();
            }

        }


    10.shutdownNow()

    public List<Runnable> shutdownNow() {
            
    // 檢查shutdown權限以及修改工作線程的權限
            SecurityManager security = System.getSecurityManager();
            
    if (security != null)
                security.checkPermission(shutdownPerm);

            
    final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            
    try {
                
    if (security != null// Check if caller can modify our threads
                    for (Worker w : workers)
                        security.checkAccess(w.thread);
                }


                
    int state = runState;
        
    // 置狀態為STOP(可能未RUNNING或者SHUTDOWN)
                if (state < STOP)
                    runState = STOP;

                
    try {
                    
    for (Worker w : workers) {
             
    // 直接中斷
                        w.interruptNow();
                    }

                }
     catch (SecurityException se) // Try to back out
                    runState = state;
                    
    // tryTerminate() here would be a no-op
                    throw se;
                }


        
    // 將隊列中的所有可用元素添加list中并返回
                List<Runnable> tasks = drainQueue();
        
    // 嘗試終止
                tryTerminate(); // Terminate now if pool and queue empty
                return tasks;
            }
     finally {
                mainLock.unlock();
            }

        }


    11.總結:
          1.corePoolSize/maximumPoolSize/keepAliveTime/workQueue/threadFactory/rejectedExecutionHandler 為線程池6大參數.
         2.corePoolSize:當線程池poolSize少于corePoolSize時,則會新增worker線程.
         3.線程池數目超過corePoolSize則向workQueue offer 任務.如果offer失敗則在maximumPoolSize下新增worker線程;如果超過了maximumPoolSize,則執行拒絕策略.
         4.keepAliveTime:poolSize超過了corePoolSize時(或者允許core thread timeout),此參數指明workQueue pool的超時時間,超時則返回null,即表示當前線程空閑.(workerCanExit中有判斷workQueue為空的條件)然后worker線程結束(被回收).
         5.Worker有兩個方法interruptIfIdle,這個方法會先獲得運行鎖,即如果當前有任務運行(占有鎖),則其他線程無法中斷.只有執行完workQueue的任務才會結束并釋放鎖.(shutdown);而另一個方法interruptNow則是不管任何條件,直接interrupt.
    posted on 2013-12-26 11:43 landon 閱讀(1665) 評論(2)  編輯  收藏 所屬分類: Sources

    FeedBack:
    # re: JDK源碼筆記1-ThreadPoolExecutor
    2013-12-26 12:28 | 零柒鎖業
    對國有銀行的辦事效率深表懷疑  回復  更多評論
      
    # re: JDK源碼筆記1-ThreadPoolExecutor
    2013-12-27 12:49 | 零柒鎖業
    支持博主分享  回復  更多評論
      
    主站蜘蛛池模板: 国产又黄又爽又猛免费app| 最近2022中文字幕免费视频| 99精品视频在线观看免费| 精品免费tv久久久久久久| 国产精品免费精品自在线观看| 日韩免费a级毛片无码a∨| 国产午夜无码视频免费网站| 中文字幕人成人乱码亚洲电影| 亚洲欧洲日韩不卡| 亚洲国产精品综合久久20| 免费无码专区毛片高潮喷水| 国产拍拍拍无码视频免费| 99爱在线精品免费观看| 免费大香伊蕉在人线国产| 亚洲男人的天堂www| 亚洲AV无码成人专区| 深夜免费在线视频| 蜜桃成人无码区免费视频网站| 毛片免费全部播放一级| MM131亚洲国产美女久久| 久久久久久亚洲AV无码专区| 亚洲国产精品网站在线播放| 中文字幕手机在线免费看电影| 男女超爽刺激视频免费播放| 亚洲AV中文无码乱人伦在线视色| 亚洲人成网址在线观看 | 亚洲国产精品lv| 亚洲中文字幕无码中文| 中文字幕不卡免费高清视频| 国产91色综合久久免费分享| 亚洲色婷婷综合开心网| 亚洲国产精品综合久久2007| 免费看黄网站在线看| 国产免费女女脚奴视频网| 亚洲精品第一国产综合境外资源| 亚洲色图黄色小说| 亚洲免费一区二区| 毛片免费观看的视频在线| 国产亚洲人成网站观看| 亚洲欧美日本韩国| 日韩精品人妻系列无码专区免费|