<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-124  評論-194  文章-0  trackbacks-0

    最近需要一個能根據請求數變化的線程池,JAVA有這樣的東西,可是C++下好像一般只是固定大小的線程池。所以就基于ACE寫了個,只做了初步測試。

    主要思想是:
    1. 重載ACE_Task,這相當于是個固定線程池,用一個信號量(ACE_Thread_Semaphore)來記數空閑線程數。
    2. 初始化時根據用戶的輸入,確定最少線程數minnum和最大線程數maxnum,當多個請求到來,并且無空閑線程(信號量用光),判斷總線程數小于maxnum,就開始強迫增加線程數。
    3. 當線程響應完一個請求(任務)后,如果當前任務隊列為空,且線程數大于minnum,就退出本線程。這里做了一個優化,就算滿足條件,線程也會在隊列上再等待10秒,防止線程池抖動帶來不必要的開銷。

    使用:
    重載這個類,重載service_func函數實現自己的任務處理。
    start_pool初始化線程池,之后,就可以用add_task向線程池添加任務。
    它會根據請求的數量自動控制池大小進行處理。
    已經在LINUX下測試通過。由于ACE是跨平臺的,所以這個實現也應該可以在WINDOWS下工作。

    編譯:
    帶THREAD_POOL_UNIT_TEST選項,則編譯出自測程序test
    gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl


    thread_pool.h頭文件:

    #ifndef THREAD_POOL
    #define THREAD_POOL

    #include 
    "ace/Task.h"
    #include 
    "ace/Thread_Mutex.h"
    #include 
    "ace/Thread_Semaphore.h"

    class thread_pool : public ACE_Task<ACE_MT_SYNCH>
    {
    public:
        thread_pool ();

        
    ~thread_pool ();

        
    // begin the initial threads and waiting for request
        int start_pool (
            
    int minnum = 5// min number of thread
            int maxnum = 100,  // max number of thread
            int waitsize = 1024// request queue length
            int parsize = 1024); // your parameter size


        
    // pending request in work queue
        int wait_cnt ();

        
    // add one task to thread pool
        int add_task (void *arg, int size);

        
    // user defined work thread function
        virtual int service_func (void* arg);

        
    // overide base class function for thread pool logical
        virtual int svc (void);

        
    // not use
        virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);

    private:
        
    int minnum_, maxnum_;
        
    int waitsize_, parsize_;

    //    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;

        ACE_Thread_Semaphore 
    *pfree_thread_; // for free thread count

        
    long thread_flags_; // ace thread create flag
    }
    ;


    #endif 
    /* THREAD_POOL */




    thread_pool.cpp實現文件:
    #include "thread_pool.h"

    #define THREAD_POOL_DONOT_ACQUIRE    
    0x1001 // do not aquire again in new added thread

    thread_pool::thread_pool () 
    {
        thread_flags_ 
    = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
        pfree_thread_ 
    = NULL;
    }


    thread_pool::
    ~thread_pool () {
        
    if (pfree_thread_)
            delete pfree_thread_;
    }


    int thread_pool::wait_cnt () {
        
    return this->msg_queue()->message_count ();
    }


    int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg) {
        
    return 0;
    }


    int thread_pool::start_pool (
        
    int minnum,
        
    int maxnum, 
        
    int waitsize, 
        
    int parsize) {
        minnum_ 
    = minnum;
        maxnum_ 
    = maxnum;
        waitsize_ 
    = waitsize;
        parsize_ 
    = parsize;
        
        
    this->msg_queue()->high_water_mark (waitsize * parsize);

        pfree_thread_ 
    = new ACE_Thread_Semaphore (minnum);

        
    int ret = this->activate (thread_flags_, minnum);

        
    return ret;
    }


    int thread_pool::add_task (void *arg, int size) {
        ACE_Message_Block 
    *mb = new ACE_Message_Block (parsize_);
        
        
    // test free threads condition
        if (pfree_thread_->tryacquire () == -1// acquire one free thread to do work
            printf ("free thread used up\n");

            
    if (this->thr_count () < maxnum_) {
                
    this->activate (thread_flags_, 11);
                
                printf (
    "new thread release\n");
                pfree_thread_
    ->release ();
                
                printf (
    "new one thread, now %d\n"this->thr_count ());
            }
     else {
                printf (
    "can't new more threads, queue len %d\n", wait_cnt () + 1);
            }

        }
     else {
            
    // pfree_thread_->release (); // restore cnt, let svc function do acquire work
            printf ("new task acquire\n");
            mb
    ->set_flags (THREAD_POOL_DONOT_ACQUIRE);
        }

        
        
    // create msg
        printf ("add msg\n");

        memcpy (mb
    ->wr_ptr (), (char*) arg, size);
                
        
    this->putq (mb);

        
    return 0;
    }



    int thread_pool::service_func (void* arg) {
        sleep (
    1);
        printf (
    "finished task %d in thread %02X\n"*(int*) arg, (int)ACE_Thread::self ());
        
    return 0;
    }



    int thread_pool::svc (void{
        printf (
    "thread started\n");

        
    while (1)
        
    {                
            ACE_Message_Block 
    *= 0;
            ACE_Time_Value wait 
    = ACE_OS::gettimeofday ();
            wait.sec (wait.sec () 
    + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
            
            
    if (this->getq (b, &wait) < 0{
                
    if (this->thr_count () > minnum_) {
                    printf (
    "over task acquire\n");
                    pfree_thread_
    ->acquire ();
                    printf (
    "delete one thread, now %d\n"this->thr_count ()-1);
                    
                    
    return 0;
                }
     else 
                    
    continue// I'm the one of last min number of threads
            }


            
    if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0{
                printf (
    "queue task acquire\n");
                pfree_thread_
    ->acquire (); // I'll use one free thread
            }

            
    else 
                printf (
    "no need to acquire\n");

            
    this->service_func ((void*)b->rd_ptr());
                                
            printf (
    "finished release\n");
            b
    ->release();
            
            pfree_thread_
    ->release (); // added one free thread
        }


        
    return 0;
    }



    #ifdef THREAD_POOL_UNIT_TEST 
    int main (int argc, int ** argv) {
        printf (
    "begin test:\n");
    /*
        ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
        s->release (3);
        s->acquire ();
        s->acquire ();
        s->acquire ();
        printf ("ok");
        return 0;
    */
        
        thread_pool t;
        t.start_pool (
    10100);

        
    for (int i=0; i<200; i++{
            t.add_task (
    &i, sizeof(i));
            
    if (i % 20 == 0)
                sleep (
    1);
        }


        sleep (
    1000);
        
        printf (
    "end test:\n");
        
    return 0;
    }


    #endif

    posted on 2007-08-14 17:56 我愛佳娃 閱讀(6082) 評論(4)  編輯  收藏 所屬分類: 自寫類庫

    評論:
    # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2007-08-14 21:08 | pass86
    怎么寫道了BLOGJAVA.COM,不過學ACE是好的。  回復  更多評論
      
    # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2007-08-16 23:35 | alwayscy
    嘿嘿,大部分BLOGJAVA的同學都只有一個技術博客吧,只要保證大部分與JAVA有關就好了。  回復  更多評論
      
    # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2008-01-13 14:40 | liuruigong
    編譯錯誤修改
    1#include <ace/OS.h>
    2.ACE_OS::sleep();
    3.最好把主函數的sleep(2000) 修改為
    ACE_Thread_Manager::instance()->wait();

    這個線程池寫的不錯  回復  更多評論
      
    # re: C++實現的帶最大最小線程數的線程池(基于ACE)[未登錄] 2008-01-14 22:09 | 我愛佳娃
    以前搞C++,ACE是個不錯的框架,最近接觸了不少JAVA的東西,感覺JAVA這東西琳瑯滿目。  回復  更多評論
      

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 国产精品色拉拉免费看| 老司机在线免费视频| 99在线免费观看视频| 67194熟妇在线永久免费观看| 久久精品a一国产成人免费网站| 国产裸模视频免费区无码| 中文字幕中韩乱码亚洲大片| 久久国产亚洲精品无码| 亚洲一本一道一区二区三区| 一级白嫩美女毛片免费| 免费人成视频在线观看网站| 在线精品免费视频无码的 | 两个人看的www高清免费观看| 无码人妻AV免费一区二区三区| 免费a级毛片无码a∨蜜芽试看| 免费真实播放国产乱子伦| 亚洲成AV人片在线观看ww| 亚洲中文无码a∨在线观看| 男男gay做爽爽的视频免费| 国产午夜免费高清久久影院 | 国产在线精品一区免费香蕉| ww在线观视频免费观看| 免费一级成人毛片| 亚洲国产综合91精品麻豆| 亚洲av色香蕉一区二区三区蜜桃| 99久久免费国产特黄| 成年私人影院免费视频网站| 中文字幕精品亚洲无线码一区应用| 亚洲美女激情视频| 无套内谢孕妇毛片免费看看| 亚洲黄色免费在线观看| 免费一级大黄特色大片| 亚洲美女色在线欧洲美女| 曰批全过程免费视频免费看| 91av在线免费视频| 亚洲国产成人久久综合一区77| 亚洲福利电影在线观看| 一区二区三区免费精品视频 | 久久久久久久久免费看无码| 综合亚洲伊人午夜网 | 免费无码看av的网站|