<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-124  評論-194  文章-0  trackbacks-0

    最近需要一個能根據請求數變化的線程池,JAVA有這樣的東西,可是C++下好像一般只是固定大小的線程池。所以就基于ACE寫了個,只做了初步測試。

    主要思想是:
    1. 重載ACE_Task,這相當于是個固定線程池,用一個信號量(ACE_Thread_Semaphore)來記數空閑線程數。
    2. 初始化時根據用戶的輸入,確定最少線程數minnum和最大線程數maxnum,當多個請求到來,并且無空閑線程(信號量用光),判斷總線程數小于maxnum,就開始強迫增加線程數。
    3. 當線程響應完一個請求(任務)后,如果當前任務隊列為空,且線程數大于minnum,就退出本線程。這里做了一個優化,就算滿足條件,線程也會在隊列上再等待10秒,防止線程池抖動帶來不必要的開銷。

    使用:
    重載這個類,重載service_func函數實現自己的任務處理。
    start_pool初始化線程池,之后,就可以用add_task向線程池添加任務。
    它會根據請求的數量自動控制池大小進行處理。
    已經在LINUX下測試通過。由于ACE是跨平臺的,所以這個實現也應該可以在WINDOWS下工作。

    編譯:
    帶THREAD_POOL_UNIT_TEST選項,則編譯出自測程序test
    gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl


    thread_pool.h頭文件:

    #ifndef THREAD_POOL
    #define THREAD_POOL

    #include 
    "ace/Task.h"
    #include 
    "ace/Thread_Mutex.h"
    #include 
    "ace/Thread_Semaphore.h"

    class thread_pool : public ACE_Task<ACE_MT_SYNCH>
    {
    public:
        thread_pool ();

        
    ~thread_pool ();

        
    // begin the initial threads and waiting for request
        int start_pool (
            
    int minnum = 5// min number of thread
            int maxnum = 100,  // max number of thread
            int waitsize = 1024// request queue length
            int parsize = 1024); // your parameter size


        
    // pending request in work queue
        int wait_cnt ();

        
    // add one task to thread pool
        int add_task (void *arg, int size);

        
    // user defined work thread function
        virtual int service_func (void* arg);

        
    // overide base class function for thread pool logical
        virtual int svc (void);

        
    // not use
        virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);

    private:
        
    int minnum_, maxnum_;
        
    int waitsize_, parsize_;

    //    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;

        ACE_Thread_Semaphore 
    *pfree_thread_; // for free thread count

        
    long thread_flags_; // ace thread create flag
    }
    ;


    #endif 
    /* THREAD_POOL */




    thread_pool.cpp實現文件:
    #include "thread_pool.h"

    #define THREAD_POOL_DONOT_ACQUIRE    
    0x1001 // do not aquire again in new added thread

    thread_pool::thread_pool () 
    {
        thread_flags_ 
    = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
        pfree_thread_ 
    = NULL;
    }


    thread_pool::
    ~thread_pool () {
        
    if (pfree_thread_)
            delete pfree_thread_;
    }


    int thread_pool::wait_cnt () {
        
    return this->msg_queue()->message_count ();
    }


    int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg) {
        
    return 0;
    }


    int thread_pool::start_pool (
        
    int minnum,
        
    int maxnum, 
        
    int waitsize, 
        
    int parsize) {
        minnum_ 
    = minnum;
        maxnum_ 
    = maxnum;
        waitsize_ 
    = waitsize;
        parsize_ 
    = parsize;
        
        
    this->msg_queue()->high_water_mark (waitsize * parsize);

        pfree_thread_ 
    = new ACE_Thread_Semaphore (minnum);

        
    int ret = this->activate (thread_flags_, minnum);

        
    return ret;
    }


    int thread_pool::add_task (void *arg, int size) {
        ACE_Message_Block 
    *mb = new ACE_Message_Block (parsize_);
        
        
    // test free threads condition
        if (pfree_thread_->tryacquire () == -1// acquire one free thread to do work
            printf ("free thread used up\n");

            
    if (this->thr_count () < maxnum_) {
                
    this->activate (thread_flags_, 11);
                
                printf (
    "new thread release\n");
                pfree_thread_
    ->release ();
                
                printf (
    "new one thread, now %d\n"this->thr_count ());
            }
     else {
                printf (
    "can't new more threads, queue len %d\n", wait_cnt () + 1);
            }

        }
     else {
            
    // pfree_thread_->release (); // restore cnt, let svc function do acquire work
            printf ("new task acquire\n");
            mb
    ->set_flags (THREAD_POOL_DONOT_ACQUIRE);
        }

        
        
    // create msg
        printf ("add msg\n");

        memcpy (mb
    ->wr_ptr (), (char*) arg, size);
                
        
    this->putq (mb);

        
    return 0;
    }



    int thread_pool::service_func (void* arg) {
        sleep (
    1);
        printf (
    "finished task %d in thread %02X\n"*(int*) arg, (int)ACE_Thread::self ());
        
    return 0;
    }



    int thread_pool::svc (void{
        printf (
    "thread started\n");

        
    while (1)
        
    {                
            ACE_Message_Block 
    *= 0;
            ACE_Time_Value wait 
    = ACE_OS::gettimeofday ();
            wait.sec (wait.sec () 
    + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
            
            
    if (this->getq (b, &wait) < 0{
                
    if (this->thr_count () > minnum_) {
                    printf (
    "over task acquire\n");
                    pfree_thread_
    ->acquire ();
                    printf (
    "delete one thread, now %d\n"this->thr_count ()-1);
                    
                    
    return 0;
                }
     else 
                    
    continue// I'm the one of last min number of threads
            }


            
    if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0{
                printf (
    "queue task acquire\n");
                pfree_thread_
    ->acquire (); // I'll use one free thread
            }

            
    else 
                printf (
    "no need to acquire\n");

            
    this->service_func ((void*)b->rd_ptr());
                                
            printf (
    "finished release\n");
            b
    ->release();
            
            pfree_thread_
    ->release (); // added one free thread
        }


        
    return 0;
    }



    #ifdef THREAD_POOL_UNIT_TEST 
    int main (int argc, int ** argv) {
        printf (
    "begin test:\n");
    /*
        ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
        s->release (3);
        s->acquire ();
        s->acquire ();
        s->acquire ();
        printf ("ok");
        return 0;
    */
        
        thread_pool t;
        t.start_pool (
    10100);

        
    for (int i=0; i<200; i++{
            t.add_task (
    &i, sizeof(i));
            
    if (i % 20 == 0)
                sleep (
    1);
        }


        sleep (
    1000);
        
        printf (
    "end test:\n");
        
    return 0;
    }


    #endif

    posted on 2007-08-14 17:56 我愛佳娃 閱讀(6082) 評論(4)  編輯  收藏 所屬分類: 自寫類庫

    評論:
    # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2007-08-14 21:08 | pass86
    怎么寫道了BLOGJAVA.COM,不過學ACE是好的。  回復  更多評論
      
    # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2007-08-16 23:35 | alwayscy
    嘿嘿,大部分BLOGJAVA的同學都只有一個技術博客吧,只要保證大部分與JAVA有關就好了。  回復  更多評論
      
    # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2008-01-13 14:40 | liuruigong
    編譯錯誤修改
    1#include <ace/OS.h>
    2.ACE_OS::sleep();
    3.最好把主函數的sleep(2000) 修改為
    ACE_Thread_Manager::instance()->wait();

    這個線程池寫的不錯  回復  更多評論
      
    # re: C++實現的帶最大最小線程數的線程池(基于ACE)[未登錄] 2008-01-14 22:09 | 我愛佳娃
    以前搞C++,ACE是個不錯的框架,最近接觸了不少JAVA的東西,感覺JAVA這東西琳瑯滿目。  回復  更多評論
      

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 玖玖在线免费视频| 永久免费不卡在线观看黄网站 | 国产免费黄色无码视频| 日本精品人妻无码免费大全| 婷婷亚洲综合一区二区| 亚洲AV无码不卡在线播放| 午夜a级成人免费毛片| 国产黄色片免费看| 亚洲av乱码一区二区三区| 亚洲国产精品综合久久网络| 久久午夜伦鲁片免费无码| 久久精品亚洲日本波多野结衣| 亚洲精品你懂的在线观看| 成人最新午夜免费视频| 久久免费精品视频| 亚洲AV成人精品一区二区三区| 久久亚洲精品中文字幕无码| 看全色黄大色大片免费久久| 午夜免费福利小电影| 老司机午夜在线视频免费| 亚洲春色在线观看| 亚洲美女又黄又爽在线观看| 日韩在线看片免费人成视频播放| 最近中文字幕免费大全| 亚洲AV噜噜一区二区三区 | 亚洲成av人片在线看片| 最近免费中文字幕视频高清在线看 | 国产V亚洲V天堂无码| 99ee6热久久免费精品6| 亚洲性无码AV中文字幕| 亚洲无线一二三四区手机| 桃子视频在线观看高清免费视频 | 国产成人aaa在线视频免费观看| 无码精品人妻一区二区三区免费| 亚洲无线码在线一区观看| 亚洲精品视频免费在线观看| 苍井空亚洲精品AA片在线播放 | 亚洲精品第五页中文字幕| 在线观看免费为成年视频| 一区二区三区免费视频播放器 | 老司机亚洲精品影院在线观看|