<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Vincent

    Vicent's blog
    隨筆 - 74, 文章 - 0, 評論 - 5, 引用 - 0
    數據加載中……

    一個線程池的實現

    設計目標
    ?????提供一個線程池的組件,具有良好的伸縮性,當線程夠用時,銷毀不用線程,當線程不夠用時,自動增加線程數量;
    ?????提供一個工作任務接口和工作隊列,實際所需要的任務都必須實現這個工作任務接口,然后放入工作隊列中;
    ?????線程池中的線程從工作隊列中,自動取得工作任務,執行任務。
    主要控制類和功能接口設計
    線程池管理器?ThreadPoolManager?的功能:
    ?????管理線程池中的各個屬性變量
    ü????最大工作線程數
    ü????最小工作線程數
    ü????激活的工作線程總數
    ü????睡眠的工作線程總數
    ü????工作線程總數?(即:激活的工作線程總數+睡眠的工作線程總數)
    ?????創建工作線程
    ?????銷毀工作線程
    ?????啟動處于睡眠的工作線程
    ?????睡眠處于激活的工作線程
    ?????縮任務:當工作線程總數小于或等于最小工作線程數時,銷毀多余的睡眠的工作線程,使得現有工作線程總數等于最小工作任務總數
    ?????伸任務:當任務隊列任務總數大于工作線程數時,增加工作線程總數至最大工作線程數
    ?????提供線程池啟動接口
    ?????提供線程池銷毀接口
    工作線程?WorkThread??的功能:
    ?????從工作隊列取得工作任務
    ?????執行工作任務接口中的指定任務
    工作任務接口?ITask???的功能:
    ?????提供指定任務動作
    工作隊列?IWorkQueue??的功能:
    ?????提供獲取任務接口,并刪除工作隊列中的任務;
    ?????提供加入任務接口;
    ?????提供刪除任務接口;
    ?????提供取得任務總數接口;
    ?????提供自動填任務接口;(當任務總數少于或等于默認總數的25%時,自動裝填)
    ?????提供刪除所有任務接口;


    Code


    ThreadPoolManager:
    =====================================
    CODE:
    package test.thread.pool1;
    import java.util.ArrayList;
    import java.util.List;
    import test.thread.pool1.impl.MyWorkQueue;
    
    /**
     * <p>Title: 線程池管理器</p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public class ThreadPoolManager {
      /*最大線程數*/
      private int threads_max_num;
    
      /*最小線程數*/
      private int threads_min_num;
      
      /* 線程池線程增長步長 */
      private int threads_increase_step = 5;
    
      /* 任務工作隊列 */
      private IWorkQueue queue;
      
      /* 線程池監視狗 */
      private PoolWatchDog poolWatchDog ;
      
      /* 隊列線程 */
      private Thread queueThread ;
      
      /* 線程池 封裝所有工作線程的數據結構 */
      private List pool = new ArrayList();
      
      /* 線程池中 封裝所有鈍化后的數據結構*/
      private List passivePool = new ArrayList();
      
      /* 空閑60秒 */
      private static final long IDLE_TIMEOUT = 60000L;
      
      /* 關閉連接池標志位 */
      private boolean close = false;
      
      /**
       * 線程池管理器
       * @param queue 任務隊列
       * @param threads_min_num 工作線程最小數
       * @param threads_max_num 工作線程最大數
       */
      public ThreadPoolManager(int threads_max_num
                              ,int threads_min_num
                              ,IWorkQueue queue){
        this.threads_max_num = threads_max_num;
        this.threads_min_num = threads_min_num;
        this.queue = queue;    
      }
    
      /**
       * 線程池啟動
       */
      public void startPool(){
        System.out.println("=== startPool..........");
        poolWatchDog = new PoolWatchDog("PoolWatchDog");
        poolWatchDog.setDaemon(true);
        poolWatchDog.start();
        System.out.println("=== startPool..........over");
      }
    
      /**
       * 線程池銷毀接口
       */
      public void destoryPool(){
        System.out.println("==========================DestoryPool starting ...");
        this.close = true;
        int pool_size = this.pool.size();
        
        //中斷隊列線程
        System.out.println("===Interrupt queue thread ... ");
        queueThread.interrupt();
        queueThread = null;
        
        System.out.println("===Interrupt thread pool ... ");
        Thread pool_thread = null;
        for(int i=0; i<pool_size; i++){
          pool_thread = (Thread)pool.get(i);
          if(pool_thread !=null 
          && pool_thread.isAlive() 
          && !pool_thread.isInterrupted()){
            pool_thread.interrupt();
            System.out.println("Stop pool_thread:"
                              +pool_thread.getName()+"[interrupt] "
                              +pool_thread.isInterrupted());
          }
        }//end for
        
        if(pool != null){
          pool.clear();
        }
        if(passivePool != null){
          pool.clear();
        }
        
        try{
          System.out.println("=== poolWatchDog.join() starting ...");
          poolWatchDog.join();
          System.out.println("=== poolWatchDog.join() is over ...");
        }
        catch(Throwable ex){
          System.out.println("###poolWatchDog ... join method throw a exception ... "
                              +ex.toString());
        }
        
        poolWatchDog =null;
        System.out.println("==============================DestoryPool is over ...");    
      }
      
      
      public static void main(String[] args) throws Exception{
        ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
        
        threadPoolManager1.startPool();
        Thread.sleep(60000);
        threadPoolManager1.destoryPool();
      }
      
      /**
       * 線程池監視狗
       */
      private class PoolWatchDog extends Thread{
        public PoolWatchDog(String name){
          super(name);
        }
      
        public void run(){
          Thread workThread = null;
          Runnable run = null;
          
          //開啟任務隊列線程,獲取數據--------
          System.out.println("===QueueThread starting ... ... ");
          queueThread = new Thread(new QueueThread(),"QueueThread");
          queueThread.start();
          
          System.out.println("===Initial thread Pool ... ...");
          //初始化線程池的最小線程數,并放入池中
          for(int i=0; i<threads_min_num; i++){
            run = new WorkThread();
            workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
            workThread.start();
            if(i == threads_min_num -1){
              workThread = null;
              run = null;
            }
          }
          System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());
    
          //線程池線程動態增加線程算法--------------
          while(!close){
          
            //等待5秒鐘,等上述線程都啟動----------
            synchronized(this){          
              try{
                System.out.println("===Wait the [last time] threads starting ....");
                this.wait(15000);
              }
              catch(Throwable ex){
                System.out.println("###PoolWatchDog invoking is failure ... "+ex);
              }
            }//end synchronized
              
            //開始增加線程-----------------------spread動作
            int queue_size = queue.getTaskSize();
            int temp_size = (queue_size - threads_min_num);
            
            if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
              System.out.println("================Spread thread pool starting ....");
              for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
                System.out.println("=== Spread thread num : "+i);
                run = new WorkThread();
                workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
                workThread.start();
              }//end for
              
              workThread = null;
              run = null;    
              System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
            }//end if
              
            //刪除已經多余的睡眠線程-------------shrink動作
            int more_sleep_size = pool.size() - threads_min_num;//最多能刪除的線程數
            int sleep_threads_size = passivePool.size();
            if(more_sleep_size >0 && sleep_threads_size >0){
              System.out.println("================Shrink thread pool starting ....");        
              for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
                System.out.println("=== Shrink thread num : "+i);
                Thread removeThread = (Thread)passivePool.get(0);
                if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
                  removeThread.interrupt();
                }
              }
              System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());          
            }
    
            System.out.println("===End one return [shrink - spread operator] ....");    
          }//end while
        }//end run 
      }//end private class
      
      /**
       * 工作線程
       */
      class WorkThread implements Runnable{
      
        public WorkThread(){
        }
      
        public void run(){
          String name = Thread.currentThread().getName();
          System.out.println("===Thread.currentThread():"+name);
          pool.add(Thread.currentThread());    
        
          while(true){
          
            //獲取任務---------
            ITask task = null;
            try{
              System.out.println("===Get task from queue is starting ... ");
              //看線程是否被中斷,如果被中斷停止執行任務----
              if(Thread.currentThread().isInterrupted()){
                System.out.println("===Breaking current thread and jump whlie [1] ... ");
                break;
              }
              task = queue.getTask();
            }
            catch(Throwable ex){
              System.out.println("###No task in queue:"+ex);
            }//end tryc
            
            if(task != null){
              //執行任務---------
              try{
                System.out.println("===Execute the task is starting ... ");
                //看線程是否被中斷,如果被中斷停止執行任務----
                if(Thread.currentThread().isInterrupted()){
                  System.out.println("===Breaking current thread and jump whlie [1] ... ");
                  break;
                }     
                task.executeTask();
                //任務執行完畢-------
                System.out.println("===Execute the task is over ... ");
              }
              catch(Throwable ex){
                System.out.println("###Execute the task is failure ... "+ex);
              }//end tryc
              
            }else{
              //沒有任務,則鈍化線程至規定時間--------
              synchronized(this){
                try{
                  System.out.println("===Passivate into passivePool ... ");
                  
                  //看線程是否被中斷,如果被中斷停止執行任務----
                  boolean isInterrupted = Thread.currentThread().isInterrupted();
                  if(isInterrupted){
                    System.out.println("===Breaking current thread and jump whlie [1] ... ");
                    break;
                  }
    //              passivePool.add(this);
                passivePool.add(Thread.currentThread());
    
                  
                  //準備睡眠線程-------
                  isInterrupted = Thread.currentThread().isInterrupted();
                  if(isInterrupted){
                    System.out.println("===Breaking current thread and jump whlie [2] ... ");
                    break;
                  }              
                  this.wait(IDLE_TIMEOUT);
                }
                catch(Throwable ex1){
                  System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
                  break;
                }
              }          
            }        
          }//end while--------
          
          if(pool.contains(passivePool)){
            pool.remove(this);
          }
          if(passivePool.contains(passivePool)){
            passivePool.remove(this);
          }
          System.out.println("===The thread execute over ... "); 
        }//end run----------
      }
      
      
      class QueueThread implements Runnable{
      
        public QueueThread(){
        }
      
        public void run(){
          while(true){
            //自動裝在任務--------
            queue.autoAddTask();
            System.out.println("===The size of queue's task is "+queue.getTaskSize());
          
            synchronized(this){
              if(Thread.currentThread().isInterrupted()){
                break;
              }else{
                  try{
                    this.wait(queue.getLoadDataPollingTime());
                  }
                  catch(Throwable ex){
                    System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
                    break;
                  }
              }//end if
            }//end synchr
            
          }//end while
        }//end run
      } 
    }
    






    WorkQueue
    =====================================
    CODE:
    package test.thread.pool1;
    
    import java.util.LinkedList;
    import test.thread.pool1.impl.MyTask;
    
    /**
     * <p>Title: 工作隊列對象 </p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public abstract class WorkQueue implements IWorkQueue{
      /* 預計裝載量 */
      private int load_size;
      
      /* 數據裝載輪循時間 */
      private long load_polling_time;
      
      /* 隊列 */
      private LinkedList queue = new LinkedList();
      
      /**
       * 
       * @param load_size 預計裝載量
       * @param load_polling_time 數據裝載輪循時間
       */
      public WorkQueue(int load_size,long load_polling_time){
        this.load_size = (load_size <= 10) ? 10 : load_size;
        this.load_polling_time = load_polling_time;
      }
    
      /* 數據裝載輪循時間 */
      public long getLoadDataPollingTime(){
        return this.load_polling_time;
      }
    
    
      /*獲取任務,并刪除隊列中的任務*/
      public synchronized ITask getTask(){
        ITask task = (ITask)queue.getFirst();
        queue.removeFirst();
        return task;
      }
    
      /*加入任務*/
      public void  addTask(ITask task){
        queue.addLast(task);
      }
    
      /*刪除任務*/
      public synchronized void removeTask(ITask task){
        queue.remove(task);
      }
    
      /*任務總數*/
      public synchronized int getTaskSize(){
        return queue.size();
      }
    
      /*自動裝填任務*/
      public synchronized void autoAddTask(){
      
        synchronized(this){
          float load_size_auto = load_size - getTaskSize() / load_size;
          System.out.println("===load_size_auto:"+load_size_auto);
          
          if(load_size_auto > 0.25){        
            autoAddTask0();
          }
          else {
            System.out.println("=== Not must load new work queue ... Now! ");
          }    
        }
      }
    
      /*刪除所有任務*/
      public synchronized void clearAllTask(){
        queue.clear();
      }
      
      /**
       * 程序員自己實現該方法
       */
      protected abstract void autoAddTask0();
    }
    





    MyWorkQueue
    =====================================
    CODE:
    package test.thread.pool1.impl;
    
    import java.util.LinkedList;
    import test.thread.pool1.WorkQueue;
    
    /**
     * <p>Title: 例子工作隊列對象 </p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public class MyWorkQueue extends WorkQueue{
    
      /**
       * @param load_size 預計裝載量
       * @param load_polling_time 數據裝載輪循時間
       */
      public MyWorkQueue(int load_size,long load_polling_time){
        super(load_size,load_polling_time);
      }
    
      /**
       * 自動加載任務
       */
      protected synchronized void autoAddTask0(){
        //-------------------
        System.out.println("===MyWorkQueue ...  invoked autoAddTask0() method ...");
        for(int i=0; i<10; i++){
          System.out.println("===add task :"+i);
          this.addTask(new MyTask());
        }    
        //-------------------
      }
    }
    





    MyTask
    =====================================
    CODE:
    package test.thread.pool1.impl;
    import test.thread.pool1.ITask;
    
    /**
     * <p>Title: 工作任務接口 </p>
     * <p>Description: </p>
     * <p>Copyright: Copyright (c) 2005</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    
    public class MyTask implements ITask {
    
      /**
       * 執行的任務
       * @throws java.lang.Throwable
       */
      public void executeTask() throws Throwable{
        System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
      }
    }
    

    posted on 2006-08-24 16:55 Binary 閱讀(3731) 評論(2)  編輯  收藏 所屬分類: j2se

    評論

    # re: 一個線程池的實現  回復  更多評論   

    編譯時,
    ITask出錯
    IWorkQueue找不到.
    是不是弄少了段代碼?
    2007-03-05 23:29 | Peng

    # re: 一個線程池的實現  回復  更多評論   

    文章不錯,但是缺代碼了,作者能補全嗎?
    2008-05-25 23:30 | lindily
    主站蜘蛛池模板: 美女视频免费看一区二区| 亚洲国产福利精品一区二区| 国产亚洲漂亮白嫩美女在线| 67194成是人免费无码| 亚洲精品mv在线观看 | EEUSS影院WWW在线观看免费| 免费观看国产精品| 免费人成动漫在线播放r18| 在线免费观看韩国a视频| 亚洲乱码无人区卡1卡2卡3| 性一交一乱一视频免费看| 伊人久久五月丁香综合中文亚洲| 免费电视剧在线观看| 亚洲高清乱码午夜电影网| 精品无码国产污污污免费| 亚洲AV色无码乱码在线观看| 日韩免费福利视频| 国产亚洲午夜精品| 国产亚洲精品a在线观看| 成在线人视频免费视频| 亚洲尹人九九大色香蕉网站| 91免费国产在线观看| 亚洲国产日韩精品| 国产成人无码免费视频97| 天天综合亚洲色在线精品| 久久乐国产精品亚洲综合| 午夜免费啪视频在线观看| 亚洲中文字幕人成乱码| 国产成人免费福利网站| 中文字幕在线视频免费观看| 亚洲综合网美国十次| 免费国产高清视频| 久久亚洲免费视频| 老司机午夜在线视频免费观| 亚洲AV综合色区无码一区| 亚洲中文无码永久免费| 美女裸体无遮挡免费视频网站| 亚洲AV无码码潮喷在线观看| 人妻丰满熟妇无码区免费| 青娱乐在线视频免费观看| 久久精品国产精品亚洲人人|