使用一个东西,我们要明白为什么使用它线程池linux,如何使用它,使用它能达到什么效果
在写本文章时,我也借鉴了网上的部分资源,因为是之前很早搜索到的资料无法追踪源头,所以在此不再写来源,谨感谢各位大神。
1 使用线程池的原因
通常使用多线程都是在需要的时候创建一个新的线程,然后执行任务,完成后退出。一般情况下是完全够满足我们的程序的。
但是当我们需要创建大量的线程,并且执行一个简单的任务之后销毁,比如:在web,email,db里面的一些应用,如彩铃,或者网络通信编程,或者云计算里面后台镜像处理的时候,我们的应用在任何时候都要准备面对数目巨大的连接请求,同时,这些请求执行的任务却又比较简单,占用的时间很少,这样我们可能就会处于不停的创建线程并销毁线程的状态。虽说比起进程的创建,线程的创建时间已经大大缩短,但是如果需要频繁的创建线程,并且每个线程所占用的处理时间又非常简短,则线程创建和销毁带给处理器的额外负担也是很可观的。
线程池的作用正是在这种情况下有效的降低频繁创建销毁线程所带来的额外开销。一般来说,线程池都是采用预创建的技术,在应用启动之初便预先创建一定数目的线程。应用在运行的过程中,需要时可以从这些线程所组成的线程池里申请分配一个空闲的线程,来执行一定的任务,任务完成后,并不是将线程销毁,而是将它返还给线程池,由线程池自行管理。如果线程池中预先分配的线程已经全部分配完毕,但此时又有新的任务请求,则线程池会动态的创建新的线程去适应这个请求。当然,有可能,某些时段应用并不需要执行很多的任务,导致了线程池中的线程大多处于空闲的状态,为了节省系统资源,线程池就需要动态的销毁其中的一部分空闲线程。因此,线程池都需要一个管理者,按照一定的要求去动态的维护其中线程的数目。
当然,如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。
2 如何使用它
线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。
pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中
while(pool->cur_queue_size == 0) { pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock)); }
表示如果任务链表中没有任务,则该线程出于阻塞等待状态。否则从队列中取出任务并执行。
pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(如果有的话)。
pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出
3 使用它能达到的效果
实际上,创建太多的线程可能会导致由于过度使用系统资源而耗尽内存。为了防止资源不足,服务器应用程序可以采用线程池来限制同一时刻处理的线程数目。线程池不会占用主线程,也不会延迟后续请求的处理。一旦池中的某个线程完成任务,它将返回到等待线程队列中,等待被再次使用。这种重用使应用程序可以避免为每个任务创建新线程引起的资源和时间消耗。
4 代码实现
这是个比较简单的版本,如果想更好的话,需要再改进一点,推荐大家去网上找一下tinyftp的源码,里面有个thread pool的实现很完善很好。
"code"?class="cpp">?? - ??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- #include???
- #include???
- #include???
- #include???
- #include???
- ??
- ?
- ?
- ?
- ??
- typedef?struct?worker??
- {??
- ???????
- ????void?*(*process_interface)(void?*?arg);??
- ????void?*arg;??
- ????struct?worker?*?next;??
- ??
- }?Thread_worker;??
- ??
- typedef?struct??
- {??
- ????pthread_mutex_t?pool_mutex;??
- ????pthread_cond_t?pool_cond;??
- ??
- ??????
- ????Thread_worker?*worker_queue_head;??
- ??
- ??????
- ????int?is_destory;??
- ????pthread_t?*?thread_id;??
- ????int?thread_max_num;??
- ????int?cur_queue_size;??
- ??
- }?Thread_pool;??
- ??
- ??
- Thread_pool?*pool?=?NULL;??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- void?pool_init(int?thread_max_num);??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- int?pool_add_worker(void?*(*process)(void?*args),?void?*args);??
- ??
- void?*thread_routine(void?*?args);??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- ??
- int?pool_destory();??
- ??
- ??
- ??
- ??
- ??
- void?pool_init(int?thread_max_num)??
- {??
- ????pool?=?(Thread_pool?*)?malloc(sizeof(Thread_pool));??
- ??
- ????pthread_mutex_init(&(pool->pool_mutex),?NULL?);??
- ????pthread_cond_init(&(pool->pool_cond),?NULL?);??
- ??
- ????pool->is_destory?=?0;???
- ????pool->worker_queue_head?=?NULL;??
- ????pool->thread_max_num?=?thread_max_num;??
- ????pool->cur_queue_size?=?0;??
- ????pool->thread_id?=?(pthread_t?*)?malloc(sizeof(pthread_t)?*?thread_max_num);??
- ????int?i?=?0;??
- ????for?(;?i?<?thread_max_num;?i++)??
- ????{??
- ????????pthread_create(&(pool->thread_id[i]),?NULL,?thread_routine,?NULL?);??
- ????}??
- ??
- }??
- ??
- void?*?thread_routine(void?*args)??
- {??
- ????printf("starting?thread?0x%x\n",?pthread_self());??
- ????while?(1)??
- ????{??
- ?????????
- ??
- ????????pthread_mutex_lock(&(pool->pool_mutex));??
- ????????while?(pool->cur_queue_size?==?0?&&?!pool->is_destory)??
- ????????{??
- ????????????printf("thread?0x%x?is?waiting\n",?pthread_self());??
- ????????????pthread_cond_wait(&(pool->pool_cond),?&(pool->pool_mutex));??
- ????????}??
- ????????????
- ????????if?(pool->is_destory)??
- ????????{??
- ????????????pthread_mutex_unlock(&(pool->pool_mutex));??
- ????????????printf("thread?0x%x?will?exit\n",?pthread_self());??
- ????????????pthread_exit(NULL?);??
- ????????}??
- ????????printf("thread?0x%x?is?starting?to?work\n",?pthread_self());??
- ??
- ????????assert(pool->cur_queue_size!=0);??
- ????????assert(pool->worker_queue_head!=NULL);??
- ??
- ????????pool->cur_queue_size--;??
- ????????Thread_worker?*worker?=?pool->worker_queue_head;??
- ????????pool->worker_queue_head?=?worker->next;??
- ????????pthread_mutex_unlock(&(pool->pool_mutex));??
- ??????????
- ????????(*(worker->process_interface))(worker->arg);??
- ????????free(worker);??
- ????????worker?=?NULL;??
- ????}??
- ????pthread_exit(NULL?);??
- }??
- ??
- int?pool_add_worker(void?*(*process)(void?*?args),?void*args)??
- {??
- ????Thread_worker?*new_worker?=?(Thread_worker?*)?malloc(sizeof(Thread_worker));??
- ????new_worker->process_interface?=?process;??
- ????new_worker->arg?=?args;??
- ????new_worker->next?=?NULL;??
- ??
- ????????
- ????pthread_mutex_lock(&(pool->pool_mutex));??
- ????Thread_worker?*member?=?pool->worker_queue_head;??
- ????if?(member?!=?NULL?)??
- ????{??
- ????????while?(member->next?!=?NULL?)??
- ????????????member?=?member->next;??
- ????????member->next?=?new_worker;??
- ????}??
- ????else??
- ????{??
- ????????pool->worker_queue_head?=?new_worker;??
- ????}??
- ????assert(pool->worker_queue_head!=NULL);??
- ??
- ????pool->cur_queue_size++;??
- ????pthread_mutex_unlock(&(pool->pool_mutex));??
- ???????
- ????pthread_cond_signal(&(pool->pool_cond));??
- ????return?0;??
- }??
- ??
- int?pool_destory()??
- {??
- ????if?(pool->is_destory)??
- ????????return?-1;??
- ????pool->is_destory?=?1;??
- ???????
- ????pthread_cond_broadcast(&(pool->pool_cond));??
- ??
- ????int?i;??
- ????for?(i?=?0;?i?<?pool->thread_max_num;?i++)??
- ????????pthread_join(pool->thread_id[i],?NULL?);??
- ??????
- ????free(pool->thread_id);??
- ??
- ????Thread_worker?*temp?=?NULL;??
- ????while?(pool->worker_queue_head?!=?NULL?)??
- ????{??
- ????????temp?=?pool->worker_queue_head;??
- ????????pool->worker_queue_head?=?pool->worker_queue_head->next;??
- ????????free(temp);??
- ????}??
- ??
- ????pthread_mutex_destroy(&(pool->pool_mutex));??
- ????pthread_cond_destroy(&(pool->pool_cond));??
- ??
- ????free(pool);??
- ????pool?=?NULL;??
- ????return?0;??
- }??
- ??
- ??
- ??
- ??
- ??
- void?*?my_process(void?*arg)??
- {??
- ????printf("Thread_id?is?0x%x?,?working?on?task?%d\n",?pthread_self(),??
- ????????????*(int?*)?arg);??
- ??
- ????sleep(1);??
- ????return?NULL?;??
- }??
- ??
- int?main(int?argc,?char?*argv[])??
- {??
- ????pool_init(3);??
- ??
- ????int?*working_num?=?(int?*)?malloc(sizeof(int)?*?5);??
- ????int?i?=?0;??
- ????for?(;?i?<?5;?i++)??
- ????{??
- ????????working_num[i]?=?i;??
- ????????pool_add_worker(my_process,?&working_num[i]);??
- ????}??
- ??
- ????sleep(5);??
- ????pool_destory();??
- ????free(working_num);??
- ????return?0;??
- }??
-
(编辑:通辽站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|