<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 110, comments - 101, trackbacks - 0, articles - 7
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

     原文:http://www.iteye.com/topic/1118660

    整個ThreadPoolExecutor的任務處理有4步操作:

     

    • 第一步,初始的poolSize < corePoolSize,提交的runnable任務,會直接做為new一個Thread的參數,立馬執行
    • 第二步,當提交的任務數超過了corePoolSize,就進入了第二步操作。會將當前的runable提交到一個block queue中
    • 第三步,如果block queue是個有界隊列,當隊列滿了之后就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
    • 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
    幾點說明:(相信這些網上一搜一大把,我這里簡單介紹下,為后面做一下鋪墊)
    • block queue有以下幾種實現:
      1. ArrayBlockingQueue :  有界的數組隊列
      2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現
      3. PriorityBlockingQueue : 優先隊列,可以針對任務排序
      4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。
    • RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
      1. Reject 直接拋出Reject exception
      2. Discard 直接忽略該runnable,不可取
      3. DiscardOldest 丟棄最早入隊列的的任務
      4. CallsRun 直接讓原先的client thread做為worker線程,進行執行

    容易被人忽略的點:
    1.  pool threads啟動后,以后的任務獲取都會通過block queue中,獲取堆積的runnable task.

    所以建議: block size >= corePoolSize ,不然線程池就沒任何意義
    2.  corePoolSize 和 maximumPoolSize的區別, 和大家正常理解的數據庫連接池不太一樣。
      *  據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐內存中的threads數量,maxActive代表是工作的最大線程數。
      *  這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設置keepAliveTime,超過多少時間多有任務后銷毀線程,但不會固定保持一定數量的threads)。 
      * 這里的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完后立馬退出。

    所以建議:  maximumPoolSize >= corePoolSize =期望的最大線程數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最后就成了單線程工作的pool。典型的配置錯誤)

    3. 善用blockqueue和reject組合. 這里要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運行。
    我們經常會在線上使用一些線程池做異步處理,比如我前面做的(業務層)異步并行加載技術分析和設計將原本串行的請求都變為了并行操作,但過多的并行會增加系統的負載(比如軟中斷,上下文切換)。所以肯定需要對線程池做一個size限制。但是為了引入異步操作后,避免因在block queue的等待時間過長,所以需要在隊列滿的時,執行一個callsRun的策略,并行的操作又轉為一個串行處理,這樣就可以保證盡量少的延遲影響。

    所以建議:  RejectExecutionHandler = CallsRun ,  blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)

    Btrace容量規劃

    再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運行時輸出poolSize等一些數據。

     

     

    Java代碼  
    1. import static com.sun.btrace.BTraceUtils.addToAggregation;   
    2. import static com.sun.btrace.BTraceUtils.field;   
    3. import static com.sun.btrace.BTraceUtils.get;   
    4. import static com.sun.btrace.BTraceUtils.newAggregation;   
    5. import static com.sun.btrace.BTraceUtils.newAggregationKey;   
    6. import static com.sun.btrace.BTraceUtils.printAggregation;   
    7. import static com.sun.btrace.BTraceUtils.println;   
    8. import static com.sun.btrace.BTraceUtils.str;   
    9. import static com.sun.btrace.BTraceUtils.strcat;   
    10.   
    11. import java.lang.reflect.Field;   
    12. import java.util.concurrent.atomic.AtomicInteger;   
    13.   
    14. import com.sun.btrace.BTraceUtils;   
    15. import com.sun.btrace.aggregation.Aggregation;   
    16. import com.sun.btrace.aggregation.AggregationFunction;   
    17. import com.sun.btrace.aggregation.AggregationKey;   
    18. import com.sun.btrace.annotations.BTrace;   
    19. import com.sun.btrace.annotations.Kind;   
    20. import com.sun.btrace.annotations.Location;   
    21. import com.sun.btrace.annotations.OnEvent;   
    22. import com.sun.btrace.annotations.OnMethod;   
    23. import com.sun.btrace.annotations.OnTimer;   
    24. import com.sun.btrace.annotations.Self;   
    25.   
    26. /**  
    27.  * 并行加載監控  
    28.  *   
    29.  * @author jianghang 2011-4-7 下午10:59:53  
    30.  */  
    31. @BTrace  
    32. public class AsyncLoadTracer {   
    33.   
    34.     private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);   
    35.     private static Aggregation   histogram    = newAggregation(AggregationFunction.QUANTIZE);   
    36.     private static Aggregation   average      = newAggregation(AggregationFunction.AVERAGE);   
    37.     private static Aggregation   max          = newAggregation(AggregationFunction.MAXIMUM);   
    38.     private static Aggregation   min          = newAggregation(AggregationFunction.MINIMUM);   
    39.     private static Aggregation   sum          = newAggregation(AggregationFunction.SUM);   
    40.     private static Aggregation   count        = newAggregation(AggregationFunction.COUNT);   
    41.   
    42.     @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))   
    43.     public static void executeMonitor(@Self Object self) {   
    44.         Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor""poolSize");   
    45.         Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor""largestPoolSize");   
    46.         Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor""workQueue");   
    47.   
    48.         Field countField = field("java.util.concurrent.ArrayBlockingQueue""count");   
    49.         int poolSize = (Integer) get(poolSizeField, self);   
    50.         int largestPoolSize = (Integer) get(largestPoolSizeField, self);   
    51.         int queueSize = (Integer) get(countField, get(workQueueField, self));   
    52.   
    53.         println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),   
    54.                                      str(largestPoolSize)), " queueSize : "), str(queueSize)));   
    55.     }   
    56.   
    57.     @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))   
    58.     public static void rejectMonitor(@Self Object self) {   
    59.         String name = str(self);   
    60.         if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {   
    61.             BTraceUtils.incrementAndGet(rejecctCount);   
    62.         }   
    63.     }   
    64.   
    65.     @OnTimer(1000)   
    66.     public static void rejectPrintln() {   
    67.         int reject = BTraceUtils.getAndSet(rejecctCount, 0);   
    68.         println(strcat("reject count in 1000 msec: ", str(reject)));   
    69.         AggregationKey key = newAggregationKey("rejectCount");   
    70.         addToAggregation(histogram, key, reject);   
    71.         addToAggregation(average, key, reject);   
    72.         addToAggregation(max, key, reject);   
    73.         addToAggregation(min, key, reject);   
    74.         addToAggregation(sum, key, reject);   
    75.         addToAggregation(count, key, reject);   
    76.     }   
    77.   
    78.     @OnEvent  
    79.     public static void onEvent() {   
    80.         BTraceUtils.truncateAggregation(histogram, 10);   
    81.         println("---------------------------------------------");   
    82.         printAggregation("Count", count);   
    83.         printAggregation("Min", min);   
    84.         printAggregation("Max", max);   
    85.         printAggregation("Average", average);   
    86.         printAggregation("Sum", sum);   
    87.         printAggregation("Histogram", histogram);   
    88.         println("---------------------------------------------");   
    89.     }   
    90. }  
     

    運行結果:

     

    Java代碼  
    1. poolSize : 1 , largestPoolSize = 10 , queueSize = 10  
    2. reject count in 1000 msec: 0  

     

    說明:

    1. poolSize 代表為當前的線程數

    2. largestPoolSize 代表為歷史最大的線程數

    3. queueSize 代表blockqueue的當前堆積的size

    4. reject count 代表在1000ms內的被reject的數量

     

     

    最后

      這是我對ThreadPoolExecutor使用過程中的一些經驗總結,希望能對大家有所幫助,如有描述不對的地方歡迎拍磚。


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 亚洲AV无码专区在线观看成人| 亚洲av无码专区国产乱码在线观看 | 女人体1963午夜免费视频| 亚洲精品岛国片在线观看| 精品国产亚洲第一区二区三区| 成人男女网18免费视频| 亚洲国产精品久久丫| 精品无码免费专区毛片| 亚洲日本国产精华液| 久久久久国色AV免费看图片| 亚洲一区二区三区高清不卡| 国内外成人免费视频| 国产成人综合亚洲| 综合亚洲伊人午夜网| 免费国产99久久久香蕉| 亚洲福利电影在线观看| 成年女人18级毛片毛片免费| 久久亚洲精品无码gv| 亚洲天堂中文字幕在线| 国产成人精品免费久久久久| 亚洲免费一级视频| 日韩免费a级在线观看| eeuss草民免费| 亚洲白色白色在线播放| 在线视频免费观看高清| 国产亚洲高清在线精品不卡| 国产亚洲一区二区三区在线观看| 亚洲视频在线免费观看| 亚洲欧美成人一区二区三区| 亚洲精品线路一在线观看| 日本免费污片中国特一级| 亚洲中文无码线在线观看| 国产a级特黄的片子视频免费| 和老外3p爽粗大免费视频| 亚洲小说图片视频| 又黄又爽一线毛片免费观看| 欧洲人成在线免费| 亚洲AV无码成人精品区狼人影院 | 成人av片无码免费天天看| 亚洲国产精品张柏芝在线观看 | 亚洲中文字幕久久精品无码喷水 |