<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 101,  comments - 29,  trackbacks - 0

    最近的工作需要統計一些復雜的報表,為了提高效率,想用多線程去實現,但要在所有線程完成統計任務后,將結果匯總。所以在思考有沒有什么辦法解決,之所以是“系列一”是因為我想記錄下我的思考過程。

    1、首先設計一個Executer,負責任務的執行和匯總:

    Java代碼  收藏代碼
    1. public class Executer {  
    2.     //計算已經派發的任務數(條件謂詞)  
    3.     public static int THREAD_COUNT = 0;  
    4.     //線程池  
    5.     private Executor pool = null;  
    6.     public Executer() {  
    7.         this(1);  
    8.     }  
    9.     public Executer(int threadPoolSize) {  
    10.         pool = Executors.newFixedThreadPool(threadPoolSize);  
    11.     }  
    12.     /** 
    13.      * 任務派發 
    14.      * @param job 
    15.      */  
    16.     public void fork(Job job){  
    17.         //將任務派發給線程池去執行  
    18.         pool.execute(job);  
    19.         THREAD_COUNT++;  
    20.     }  
    21.     /** 
    22.      * 統計任務結果 
    23.      */  
    24.     public void join(){  
    25.         while(THREAD_COUNT > 0){  
    26.             System.out.println("threadCount: "+THREAD_COUNT);  
    27.             try {  
    28.                 wait();//如果任務沒有全部完成,則掛起  
    29.             } catch (Exception e) {}//這里總是拋異常,不知道為什么,好吧!先不管它  
    30.         }  
    31.     }  
    32. }  

     2、寫一個抽象的Job類,負責執行具體的任務

    Java代碼  收藏代碼
    1. public abstract class Job implements Runnable {  
    2.   
    3.     @Override  
    4.     public void run() {  
    5.         this.execute();//執行子類具體任務  
    6.         Executer.THREAD_COUNT--;  
    7.         try{  
    8.             notifyAll();//這里總是拋異常,不知道為什么,好吧!先不管它  
    9.         }catch(Exception e){}  
    10.     }  
    11.     /** 
    12.      * 業務處理函數 
    13.      */  
    14.     public abstract void execute();  
    15.   
    16. }  

     

    3、測試,先來一個具體的任務實現。

    Java代碼  收藏代碼
    1. public class MyJob extends Job {  
    2.   
    3.     @Override  
    4.     public void execute() {  
    5.         //模擬業務需要處理1秒.  
    6.         try {Thread.sleep(1000);} catch (InterruptedException e) {}  
    7.         System.out.println("running thread id = "+Thread.currentThread().getId());  
    8.     }  
    9.   
    10. }  

     

    4、測試。

    Java代碼  收藏代碼
    1. public class Test {  
    2.     public static void main(String[] args) {  
    3.         //初始化任務池  
    4.         Executer exe = new Executer(5);  
    5.         //初始化任務  
    6.         long time = System.currentTimeMillis();  
    7.         for (int i = 0; i < 10; i++) {  
    8.             MyJob job = new MyJob();  
    9.             exe.fork(job);//派發任務  
    10.         }  
    11.         //匯總任務結果  
    12.         exe.join();  
    13.         System.out.println("time: "+(System.currentTimeMillis() - time));  
    14.     }  
    15.   
    16. }  

     

     5、好吧,看一下結果

     

    Java代碼  收藏代碼
    1. threadCount: 10  
    2. ......(表示有N多個)  
    3. threadCount: 10  
    4. running thread id = 8  
    5. running thread id = 9  
    6. running thread id = 11  
    7. running thread id = 10  
    8. running thread id = 12  
    9. threadCount: 5  
    10. ......(表示有N多個)  
    11. threadCount: 5  
    12. running thread id = 9  
    13. running thread id = 10  
    14. running thread id = 12  
    15. running thread id = 8  
    16. running thread id = 11  
    17. threadCount: 3  
    18. time: 2032  

     哈哈,看來是可以了,最后匯總任務的處理時間是2032毫秒,看來是比單個任務順序執行來的快。但是有幾個問題:

    1)如果沒有catch那個超級Exception的話,就會拋下面的異常:

    Java代碼  收藏代碼
    1. java.lang.IllegalMonitorStateException  
    2.     at java.lang.Object.wait(Native Method)  
    3.     at java.lang.Object.wait(Object.java:485)  
    4.     at com.one.Executer.join(Executer.java:38)  
    5.     at com.test.Test.main(Test.java:21)  

     

    2)為啥會打印N多個同樣值threadCount呢?

    于是和同事(河東)溝通,他說wait要放在synchronized里面才行,好吧,試一下,改進一下Executer和Job

     

    Java代碼  收藏代碼
    1. public class Executer {  
    2.     //計算已經派發的任務數(條件謂詞)  
    3.     public static int THREAD_COUNT = 0;  
    4.     //條件隊列鎖  
    5.     public static final Object LOCK = new Object();  
    6.     //線程池  
    7.     private Executor pool = null;  
    8.     public Executer() {  
    9.         this(1);  
    10.     }  
    11.     public Executer(int threadPoolSize) {  
    12.         pool = Executors.newFixedThreadPool(threadPoolSize);  
    13.     }  
    14.     /** 
    15.      * 任務派發 
    16.      * @param job 
    17.      */  
    18.     public void fork(Job job){  
    19.         //將任務派發給線程池去執行  
    20.         pool.execute(job);  
    21.         //增加線程數  
    22.         synchronized (LOCK) {  
    23.             THREAD_COUNT++;  
    24.         }  
    25.     }  
    26.     /** 
    27.      * 統計任務結果 
    28.      */  
    29.     public void join(){  
    30.         synchronized (LOCK) {  
    31.             while(THREAD_COUNT > 0){  
    32.                 System.out.println("threadCount: "+THREAD_COUNT);  
    33.                 try {  
    34.                     LOCK.wait();//如果任務沒有全部完成,則掛起  
    35.                 } catch (InterruptedException e) {  
    36.                     e.printStackTrace();  
    37.                 }  
    38.             }  
    39.         }  
    40.     }  
    41. }  

     

    Java代碼  收藏代碼
    1. public abstract class Job implements Runnable {  
    2.   
    3.     @Override  
    4.     public void run() {  
    5.         this.execute();//執行子類具體任務  
    6.         synchronized (Executer.LOCK) {  
    7.             //處理完業務后,任務結束,遞減線程數,同時喚醒主線程  
    8.             Executer.THREAD_COUNT--;  
    9.             Executer.LOCK.notifyAll();  
    10.         }  
    11.     }  
    12.     /** 
    13.      * 業務處理函數 
    14.      */  
    15.     public abstract void execute();  
    16.   
    17. }  

     6、測試一下:

    Java代碼  收藏代碼
    1. threadCount: 10  
    2. running thread id = 8  
    3. running thread id = 11  
    4. running thread id = 9  
    5. threadCount: 7  
    6. running thread id = 10  
    7. threadCount: 6  
    8. running thread id = 12  
    9. threadCount: 5  
    10. running thread id = 11  
    11. running thread id = 12  
    12. running thread id = 10  
    13. threadCount: 2  
    14. running thread id = 9  
    15. running thread id = 8  
    16. threadCount: 1  
    17. time: 2016  

     還真的行,謝謝河東哈!

    但是原因是什么呢?回去查了查書《Java并發編程實踐》,見附件!

    Java代碼  收藏代碼
    1. 14.2.1節這樣說:  
    2.   
    3. 在條件等待中存在一種重要的三元關系,包括加鎖、wait方法和一個條件謂詞。在條件謂詞中包含多個變量,而狀態變量由一個鎖來保護,因此在測試條件謂詞之前必須先持有這個鎖。鎖對象與條件隊列對象(即調用wait和notify等方法所在的對象)必須是同一個對象。  
    4.   
    5. ...  
    6.   
    7. 由于線程在條件謂詞不為真的情況下也可以反復地醒來,因此必須在一個循環中調用wait,并在每次迭代中都測試條件謂詞。  
    8.   
    9. 14.2.4節:  
    10.   
    11. 由于在調用notify或notifyAll時必須持有條件隊列對象的鎖,而如果這些等待中線程此時不能重新獲得鎖,那么無法從wait返回,因此發出通知的線程應該盡快地釋放,從而確保正在等待的線程盡可能盡快的解除阻塞。  

     

    看來之前是不會用wait和notify,哈哈~!

     

    感謝河東,和你交流收獲很大!

     

    順便測試一下java多線程情況下,多核CPU的利用率,修改上面的線程池大小和任務數(2個線程處理1000000個任務,去掉MyJob的sleep(這樣可以多搶些CPU時間),結果如下:

     

    看來window下是可以利用多核的,雖然是一個JVM進程。之前和斯亮討論的結論是錯誤的。

    posted on 2012-07-15 01:20 mixer-a 閱讀(3782) 評論(2)  編輯  收藏

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 亚洲国产精品xo在线观看| 污污视频网站免费观看| 小小影视日本动漫观看免费| 免费无遮挡无遮羞在线看| 亚洲av无码潮喷在线观看| 成人免费看片又大又黄| 中文字幕无线码中文字幕免费 | 亚洲精品乱码久久久久久| 综合在线免费视频| 亚洲精品黄色视频在线观看免费资源 | 黄色大片免费网站| 亚洲综合激情六月婷婷在线观看| 免费观看的a级毛片的网站| 免费人成在线观看视频高潮| 国产成人精品日本亚洲网址| 亚洲男人的天堂一区二区| 香蕉97超级碰碰碰免费公| 中文永久免费观看网站| 亚洲a∨国产av综合av下载| 亚洲产国偷V产偷V自拍色戒| 日韩精品视频免费观看| 99ee6热久久免费精品6| 一级做a爰片久久毛片免费陪| 亚洲乱人伦精品图片| 亚洲精品色午夜无码专区日韩| 在线播放免费播放av片| 91香蕉国产线在线观看免费| 免费夜色污私人影院网站电影 | 四虎国产精品永久免费网址| 色五月五月丁香亚洲综合网| 91午夜精品亚洲一区二区三区| 亚洲色无码专区在线观看| 国产精品成人四虎免费视频| www视频免费看| 久久国产免费观看精品| 农村寡妇一级毛片免费看视频| 亚洲gay片在线gv网站| 亚洲jjzzjjzz在线观看| 亚洲综合日韩中文字幕v在线| 国产亚洲精久久久久久无码AV| 日产乱码一卡二卡三免费|