<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    I want to fly higher
    programming Explorer
    posts - 114,comments - 263,trackbacks - 0
    1.ThreadPoolExecutor#execute(Runnable command)

         public void execute(Runnable command) {
                   
    // 如果任務(wù)為空,則直接拋出空指針異常
                    if (command == null)
                        
    throw new NullPointerException();
                    
    // 1.如果線程池線程數(shù)目UnderCorePoolSize且RUNNING則直接添加worker線程并啟動
                    
    // 2.如果超過了corePoolSize或者addIfUnderCorePoolSize失敗則
                    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                    
    // 如果線程池是RUNNING狀態(tài)且可將任務(wù)command加入workQueue(即不違反容量限制)
                        if (runState == RUNNING && workQueue.offer(command)) {
                       
    // 因為是并發(fā)執(zhí)行.如果此時發(fā)現(xiàn)線程池狀態(tài)不再是RUNNING(可能執(zhí)行了類似shutdown的操作)或者線程池中已無Worker線程
                            if (runState != RUNNING || poolSize == 0)
                      
    //1.如果線程池狀態(tài)不再是RUNNING且此時command依然在隊列中,即還未執(zhí)行則直接拒絕.
                     
    // 2.否則如果線程池狀態(tài) < STOP,即可能是SHUTDOWN狀態(tài)且任務(wù)隊列中依然有任務(wù)且工作線程的數(shù)目不足corePoolSize,則額外添加一個Worker線程并啟動
                                ensureQueuedTaskHandled(command);
                        }

                        
    else if (!addIfUnderMaximumPoolSize(command))
                       
    // 如果在UnderMaximumPoolSize下增加worker線程失敗則執(zhí)行拒絕策略,直接調(diào)用RejectedExecutionHandler#rejectedExecution
                            reject(command); // is shutdown or saturated
                    }

                }


    2. addIfUnderCorePoolSize(Runnable firstTask)

    // poolSize < corePoolSize && RUNNING的情況下添加worker線程并啟動worker線程
            private boolean addIfUnderCorePoolSize(Runnable firstTask) {
                    Thread t = null;
                   
    final ReentrantLock mainLock = this.mainLock;
                        
    //
                    mainLock.lock();
                   
    try {
                       
    // 初始poolSize為0,runState為0,即RUNNING
                       
    // RUNNING = 0 / SHUTDOWN = 1 / STOP = 2
                       
    // TERMINATED = 3
                        if (poolSize < corePoolSize && runState == RUNNING)
                            t = addThread(firstTask);
                    }
    finally {
                        mainLock.unlock();
                    }

                   
    if (t == null)
                       
    return false;
                    t.start();
                   
    return true;
                }


    3.addThread(Runnable firstTask)

    private Thread addThread(Runnable firstTask) {
                   
    // 初始化Worker,傳入firstTask
                    Worker w = new Worker(firstTask);
                   
    // 利用線程工廠新建線程,注意這里傳入的參數(shù)是w
                    Thread t = threadFactory.newThread(w);
                   
    if (t != null) {
                        w.thread = t;
                   
    // 添加至workers
                        workers.add(w);
                   
    // ++poolSize
                        int nt = ++poolSize;
                       
    if (nt > largestPoolSize)
                            largestPoolSize = nt;
                    }

                   
    return t;
                }


    4.Worker

    private final class Worker implements Runnable {
           
    /**
             * The runLock is acquired and released surrounding each task
             * execution. It mainly protects against interrupts that are
             * intended to cancel the worker thread from instead
             * interrupting the task being run.
            
    */

           
    private final ReentrantLock runLock = new ReentrantLock();

           
    /**
             * Initial task to run before entering run loop. Possibly null.
            
    */

           
    private Runnable firstTask;

           
    /**
             * Per thread completed task counter; accumulated
             * into completedTaskCount upon termination.
            
    */

           
    volatile long completedTasks;

           
    /**
             * Thread this worker is running in.  Acts as a final field,
             * but cannot be set until thread is created.
            
    */

            Thread thread;

            Worker(Runnable firstTask) {
               
    this.firstTask = firstTask;
            }


           
    boolean isActive() {
               
    return runLock.isLocked();
            }


           
    /**
             * 中斷線程如果沒有正在運行任務(wù)(可能在等待任務(wù))
              * {@link ThreadPoolExecutor#interruptIdleWorkers}
              * {@link ThreadPoolExecutor#getTask}
              * {@link ThreadPoolExecutor#shutdown}
            
    */

           
    void interruptIfIdle() {
               
    final ReentrantLock runLock = this.runLock;
               
    if (runLock.tryLock()) {
                   
    try {
                       
    // 注意只有該方法是被其他線程調(diào)用才會執(zhí)行interrupt.
                       
    // 1.個人認(rèn)為如果是當(dāng)前自身線程執(zhí)行到這里的時候,說明getTask返回了null.線程就會結(jié)束了.
             
    // 2.Worker線程在自身任務(wù)的執(zhí)行中調(diào)用此方法時沒有作用的.即恰恰說明了運行時不被中斷.(因為不太可能存在這樣的類似業(yè)務(wù),內(nèi)部線程自己在運行任務(wù)的時候中斷自己.沒有任何作用.你懂的.這壓根就是錯誤的做法)
              
    // 3.還有一個很重要的原因是:這里加了運行鎖.即如果此時有任務(wù)正在運行則獨占runLock,則其他線程必須等待任務(wù)完畢釋放鎖才可以進(jìn)行interrupt.
                        if (thread != Thread.currentThread())
                            thread.interrupt();
                    }
    finally {
                        runLock.unlock();
                    }

                }

            }


           
    /**
             * Interrupts thread even if running a task.
            
    */

           
    void interruptNow() {
       
    // 直接進(jìn)行中斷,無論是內(nèi)部線程還是其他線程
       
    // 無論是否正在運行任務(wù)
       
    // 沒有獲得鎖
       
    // 此時如果線程正在等待任務(wù)或者任務(wù)執(zhí)行過程中阻塞都可以被中斷
       
    // 個人認(rèn)為該方法也肯定是由外部線程進(jìn)行調(diào)用的,而非內(nèi)部的線程,你懂的.用了也沒有作用.
                thread.interrupt();
            }


           
    /**
             * 運行任務(wù)在beforeExecute/afterExecute之間
            
    */

           
    private void runTask(Runnable task) {
               
    final ReentrantLock runLock = this.runLock;
                runLock.lock();
               
    try {
                   
    /*
                     * Ensure that unless pool is stopping, this thread
                     * does not have its interrupt set. This requires a
                     * double-check of state in case the interrupt was
                     * cleared concurrently with a shutdownNow -- if so,
                     * the interrupt is re-enabled.
                    
    */

               
    // 這段代碼乍看起來可能有些奇怪.個人認(rèn)為是因為多線程的原因,如線程池調(diào)用了shutdownNow方法.
               
    // 1.如果線程池是RUNNING/SHUTDOWN且之前被中斷過,則清除中斷狀態(tài)(interrupted)  2.再次檢查如果執(zhí)行了shutdownNow的話,則會直接interrupt thread.而此時的中斷狀態(tài)可能被清除了.->需要需要再次調(diào)用interrupt重置中斷狀態(tài).(還需要仔細(xì)考證)
                    if (runState < STOP &&
                        Thread.interrupted() &&
                        runState >= STOP)
                        thread.interrupt();

                   
    boolean ran = false;
              
    // 任務(wù)執(zhí)行前的一些業(yè)務(wù),空實現(xiàn),子類可覆蓋
              
    // 任務(wù)完成或者任務(wù)執(zhí)行出現(xiàn)異常則可通過afterExecute(空實現(xiàn))追蹤
                    beforeExecute(thread, task);
                   
    try {
                        task.run();
                        ran = true;
                        afterExecute(task, null);
                  
    // 任務(wù)計數(shù)
                        ++completedTasks;
                    }
    catch (RuntimeException ex) {
                       
    if (!ran)
                            afterExecute(task, ex);
                       
    throw ex;
                    }

                }
    finally {
                    runLock.unlock();
                }

            }


           
    /**
             * Work線程主任務(wù)循環(huán)
            
    */

           
    public void run() {
               
    try {
                    Runnable task = firstTask;
                    firstTask = null;
              
    // 1.如果第一個任務(wù)不為null則一定會執(zhí)行第一個任務(wù)
              
    // 2.如果getTask為null則線程結(jié)束.
                    while (task != null || (task = getTask()) != null) {
                        runTask(task);
                        task = null;
                    }

                }
    finally {
             
    //  跳出while,線程即結(jié)束
             
    // 1.completedTaskCount 計數(shù)
             
    // 2.workers.remove(w) 從workers移除
            
    // 3.--poolSize,如果poolSize為0則tryTerminate
                    workerDone(this);
                }

            }

        }


    5.Runnable getTask()

    Runnable getTask() {
           
    for (;;) {
               
    try {
                   
    int state = runState;
              
    // 線程池運行狀態(tài)為STOP或者TERMINATED,直接返回null,則Worker線程跳出while,終止
                    if (state > SHUTDOWN)
                       
    return null;
                    Runnable r;
              
    // 如果線程池運行狀態(tài)恰好是SHUTDOWN,則繼續(xù)從隊列獲取任務(wù)(隊列為空則返回null),也在該狀態(tài)下如果線程池不為空則一直獲取任務(wù)
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();
                   
    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                  
    // RUNNING狀態(tài)下,poolSize超出了corePoolSize 或者allowCoreThreadTimeOut(允許核心線程超時) {@link ThreadPoolExecutor#allowCoreThreadTimeOut(boolean value)}
                  
    // 在keepAliveTime時間內(nèi)等待可用的元素,等待時可被中斷.如果超時則返回null.
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                   
    else
                 
    // Running狀態(tài)下,poolSize未超出corPoolSize且不允許核心線程超時,則在元素變得可用之前一直等待,可被中斷
                        r = workQueue.take();
                   
    if (r != null)
                       
    return r;
              
    // 如果此時返回的任務(wù)為null且worker線程可退出(該方法其實是重復(fù)校驗,因為是并發(fā)執(zhí)行.所以可能任務(wù)隊列已經(jīng)有了任務(wù)等條件出現(xiàn))
                    if (workerCanExit()) {
                  
    // 如果此時線程池狀態(tài)不是RUNNING
                        if (runState >= SHUTDOWN) // Wake up others
                   
    // 喚醒可能阻塞的任務(wù),{@link Worker#interruptIfIdle}
                            interruptIdleWorkers();
                 
    // 返回null,結(jié)束任務(wù)
                        return null;
                    }

                   
    // Else retry
              
    // 繼續(xù)for-循環(huán)
                }
    catch (InterruptedException ie) {
                   
    // On interruption, re-check runState
                }

            }

        }


    6.workerCanExit()

    // 判斷worker線程是否可退出
    private boolean workerCanExit() {
           
    final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
           
    boolean canExit;
           
    try {
           
    // 運行狀態(tài)為STOP或者TERMINATED
           
    // 或者任務(wù)隊列為空
           
    // 或者池中至少有一個線程且允許核心線程超時
                canExit = runState >= STOP ||
                    workQueue.isEmpty() ||
                    (allowCoreThreadTimeOut &&
                     poolSize > Math.max(1, corePoolSize));
            }
    finally {
                mainLock.unlock();
            }

           
    return canExit;
        }


    7.tryTerminate()

    // 嘗試終止
        private void tryTerminate() {
          
    // 如果當(dāng)前池中沒有線程
            if (poolSize == 0) {
               
    int state = runState;
           
    // 如果當(dāng)前運行狀態(tài)時是Running/SHUTDOWN且任務(wù)隊列不為空
                if (state < STOP && !workQueue.isEmpty()) {
              
    // 重新設(shè)置為運行狀態(tài)
                    state = RUNNING; // disable termination check below
             
    // 添加一個firstTask為null的worker并啟動.因為隊列不為空則可以getTask
                    Thread t = addThread(null);
                   
    if (t != null)
                        t.start();
                }

      
                
    // 如果運行狀態(tài)為STOP或者SHUTDOWN則置狀態(tài)為TERMINATED并喚醒等待終止的線程 {@link #awaitTermination(long timeout, TimeUnit unit)}
                if (state == STOP || state == SHUTDOWN) {
                    runState = TERMINATED;
                    termination.signalAll();
                    terminated();// 此方法暫時未實現(xiàn)
                }

            }

        }


    8.awaitTermination(long timeout, TimeUnit unit)

    // 等待線程池終止 {@link #tryTerminate()}
            public boolean awaitTermination(long timeout, TimeUnit unit)
                   
    throws InterruptedException {
                   
    long nanos = unit.toNanos(timeout);
                   
    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                   
    try {
                   
    // 注這是一個無限循環(huán),直到線程池終止或者超時
                        for (;;) {
                           
    if (runState == TERMINATED)
                               
    return true;
                           
    if (nanos <= 0)
                               
    return false;
                   
    //  {@link Condition#long awaitNanos(long nanosTimeout)}
                   
    //  此方法返回的是一個估算(nanosTimeout - awaitTime),如果小于等于0則表示沒有剩余時間,即超時.不過如果返回值是一個正值的話且線程池未終止的話->所以由將返回值繼續(xù)傳入了參數(shù)->確??隙〞l(fā)生超時而導(dǎo)致nanos<=0而跳出循環(huán)
                            nanos = termination.awaitNanos(nanos);
                        }

                    }
    finally {
                        mainLock.unlock();
                    }

                }


    9.shutdown()

    public void shutdown() {
           
    // 檢查是否有shutdown的權(quán)限
            SecurityManager security = System.getSecurityManager();
           
    if (security != null)
                security.checkPermission(shutdownPerm);

           
    final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
           
    try {
               
    if (security != null) { // Check if caller can modify our threads
           
    // 檢查所有的worker線程是否有修改線程的權(quán)限
                    for (Worker w : workers)
                        security.checkAccess(w.thread);
                }


               
    int state = runState;
           
    // 設(shè)置線程池當(dāng)前狀態(tài)是RUNNING,則設(shè)置為SHUTDOWN狀態(tài)
                if (state < SHUTDOWN)
                    runState = SHUTDOWN;

               
    try {
           
    // 嘗試打斷空閑的worker線程
                    for (Worker w : workers) {
                        w.interruptIfIdle();
                    }

                }
    catch (SecurityException se) { // Try to back out
           
    // 如果出現(xiàn)異常,則還原狀態(tài)
                    runState = state;
                   
    // tryTerminate() here would be a no-op 這個注釋的意思是出現(xiàn)了這個異常,tryTerminate是不起作用的.因為tryTerminate的條件是poolSize == 0.但是異常說明interruptIfIdle失敗則不可能poolSize == 0.
           
    // 繼續(xù)向上拋出異常,這個異常是SecurityException
                    throw se;
                }

           
    // 嘗試終止(隊列為空的時候直接終止)
                tryTerminate(); // Terminate now if pool and queue empty
            }
    finally {
                mainLock.unlock();
            }

        }


    10.shutdownNow()

    public List<Runnable> shutdownNow() {
            
    // 檢查shutdown權(quán)限以及修改工作線程的權(quán)限
            SecurityManager security = System.getSecurityManager();
            
    if (security != null)
                security.checkPermission(shutdownPerm);

            
    final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            
    try {
                
    if (security != null// Check if caller can modify our threads
                    for (Worker w : workers)
                        security.checkAccess(w.thread);
                }


                
    int state = runState;
        
    // 置狀態(tài)為STOP(可能未RUNNING或者SHUTDOWN)
                if (state < STOP)
                    runState = STOP;

                
    try {
                    
    for (Worker w : workers) {
             
    // 直接中斷
                        w.interruptNow();
                    }

                }
     catch (SecurityException se) // Try to back out
                    runState = state;
                    
    // tryTerminate() here would be a no-op
                    throw se;
                }


        
    // 將隊列中的所有可用元素添加list中并返回
                List<Runnable> tasks = drainQueue();
        
    // 嘗試終止
                tryTerminate(); // Terminate now if pool and queue empty
                return tasks;
            }
     finally {
                mainLock.unlock();
            }

        }


    11.總結(jié):
          1.corePoolSize/maximumPoolSize/keepAliveTime/workQueue/threadFactory/rejectedExecutionHandler 為線程池6大參數(shù).
         2.corePoolSize:當(dāng)線程池poolSize少于corePoolSize時,則會新增worker線程.
         3.線程池數(shù)目超過corePoolSize則向workQueue offer 任務(wù).如果offer失敗則在maximumPoolSize下新增worker線程;如果超過了maximumPoolSize,則執(zhí)行拒絕策略.
         4.keepAliveTime:poolSize超過了corePoolSize時(或者允許core thread timeout),此參數(shù)指明workQueue pool的超時時間,超時則返回null,即表示當(dāng)前線程空閑.(workerCanExit中有判斷workQueue為空的條件)然后worker線程結(jié)束(被回收).
         5.Worker有兩個方法interruptIfIdle,這個方法會先獲得運行鎖,即如果當(dāng)前有任務(wù)運行(占有鎖),則其他線程無法中斷.只有執(zhí)行完workQueue的任務(wù)才會結(jié)束并釋放鎖.(shutdown);而另一個方法interruptNow則是不管任何條件,直接interrupt.
    posted on 2013-12-26 11:43 landon 閱讀(1666) 評論(2)  編輯  收藏 所屬分類: Sources

    FeedBack:
    # re: JDK源碼筆記1-ThreadPoolExecutor
    2013-12-26 12:28 | 零柒鎖業(yè)
    對國有銀行的辦事效率深表懷疑  回復(fù)  更多評論
      
    # re: JDK源碼筆記1-ThreadPoolExecutor
    2013-12-27 12:49 | 零柒鎖業(yè)
    支持博主分享  回復(fù)  更多評論
      
    主站蜘蛛池模板: 四虎免费影院4hu永久免费| 丰满少妇作爱视频免费观看| 免费观看的毛片大全| 亚洲精品国产成人| 蜜臀98精品国产免费观看| 91大神亚洲影视在线| 亚洲大片免费观看| 亚洲国产中文在线二区三区免| 国产黄色免费网站| ASS亚洲熟妇毛茸茸PICS| 成年人免费网站在线观看| 亚洲kkk4444在线观看| 成年性生交大片免费看| 亚洲人成网站色7799| 日韩成全视频观看免费观看高清| 亚洲爆乳无码精品AAA片蜜桃| 日本免费一区尤物| 国产亚洲福利精品一区二区| gogo全球高清大胆亚洲| 一级一看免费完整版毛片| 亚洲色精品aⅴ一区区三区| a级毛片免费高清毛片视频| 亚洲av色影在线| 最新黄色免费网站| 色婷五月综激情亚洲综合| 四虎影院免费视频| 羞羞的视频在线免费观看| 国产黄色一级毛片亚洲黄片大全| 国产在线观看免费av站| 亚洲欧洲日韩不卡| 动漫黄网站免费永久在线观看| 亚洲欧美黑人猛交群| 亚洲国产成人精品女人久久久 | 亚洲av无码日韩av无码网站冲| 四虎影视永久免费观看网址| 一级成人生活片免费看| 亚洲av午夜福利精品一区人妖| 亚洲大片免费观看| 亚洲第一se情网站| 久久久久亚洲AV成人网人人网站 | 亚洲人成无码久久电影网站|