linux c 开启线程池,Linux线程池(C语言描述)
下面的程序就是完整的线程池实现,主要采用互斥量和条件变量
创建线程或者进程的开销是很大的,为了防止频繁的创建线程,提高程序的运行效率,往往都会建立一个线程池用于多线程程序的调度 下面的程序就是完整的线程池实现,主要采用互斥量和条件变量实现同步 首先定义头文件threadpool.h 在该文件中定义了线程池的数据结构和所有的函数 #ifndef THREADPOOL_H_ #define THREADPOOL_H_ #include #include #include #include /** * 线程体数据结构 */ typedef struct runner { void (*callback)(void* arg); // 回调函数指针 void* arg; // 回调函数的参数 struct runner* next; } thread_runner; /** * 线程池数据结构 */ typedef struct { pthread_mutex_t mutex; //互斥量 pthread_cond_t cond; // 条件变量 thread_runner* runner_head; // 线程池中所有等待任务的头指针 thread_runner* runner_tail; // 线程池所有等待任务的尾指针 int shutdown; // 线程池是否销毁 pthread_t* threads; // 所有线程 int max_thread_size; //线程池中允许的活动线程数目 } thread_pool; /** * 线程体 */ void run(void *arg); /** * 初始化线程池 * 参数: * pool:指向线程池结构有效地址的动态指针 * max_thread_size:最大的线程数 */ void threadpool_init(thread_pool* pool, int max_thread_size); /** * 向线程池加入任务 * 参数: * pool:指向线程池结构有效地址的动态指针 * callback:线程回调函数 * arg:回调函数参数 */ void threadpool_add_runner(thread_pool* pool, void (*callback)(void *arg), void *arg); /** * 销毁线程池 * 参数: * ppool:指向线程池结构有效地址的动态指针地址(二级指针),销毁后释放内存,该指针为NULL */ void threadpool_destroy(thread_pool** ppool); #endif 然后是函数实现threadpool.h 该文件实现了threadpool.h的函数定义 #include "threadpool.h" #define DEBUG 1 /** * 线程体 */ void run(void *arg) { thread_pool* pool = (thread_pool*) arg; while (1) { // 加锁 pthread_mutex_lock(&(pool->mutex)); #ifdef DEBUG printf("run-> locked\n"); #endif // 如果等待队列为0并且线程池未销毁线程池linux,则处于阻塞状态 while (pool->runner_head == NULL && !pool->shutdown) { pthread_cond_wait(&(pool->cond), &(pool->mutex)); } //如果线程池已经销毁 if (pool->shutdown) { // 解锁 pthread_mutex_unlock(&(pool->mutex)); #ifdef DEBUG printf("run-> unlocked and thread exit\n"); #endif pthread_exit(NULL); } // 取出链表中的头元素 thread_runner *runner = pool->runner_head; pool->runner_head = runner->next; // 解锁 pthread_mutex_unlock(&(pool->mutex)); #ifdef DEBUG printf("run-> unlocked\n"); #endif // 调用回调函数,执行任务 (runner->callback)(runner->arg); free(runner); runner = NULL; #ifdef DEBUG printf("run-> runned and free runner\n"); #endif } pthread_exit(NULL); } /** * 初始化线程池 * 参数: * pool:指向线程池结构有效地址的动态指针 * max_thread_size:最大的线程数 */ void threadpool_init(thread_pool* pool, int max_thread_size) { // 初始化互斥量 pthread_mutex_init(&(pool->mutex), NULL); // 初始化条件变量 pthread_cond_init(&(pool->cond), NULL); pool->runner_head = NULL; pool->runner_tail = NULL; pool->max_thread_size = max_thread_size; pool->shutdown = 0; // 创建所有分离态线程 pool->threads = (pthread_t *) malloc(max_thread_size * sizeof(pthread_t)); int i = 0; for (i = 0; i < max_thread_size; i++) { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_create(&(pool->threads[i]), &attr, (void*) run, (void*) pool); } #ifdef DEBUG printf("threadpool_init-> create %d detached thread\n", max_thread_size); #endif } /** * 向线程池加入任务 * 参数: * pool:指向线程池结构有效地址的动态指针 * callback:线程回调函数 * arg:回调函数参数 */ void threadpool_add_runner(thread_pool* pool, void (*callback)(void *arg), void *arg) { // 构造一个新任务 thread_runner *newrunner = (thread_runner *) malloc(sizeof(thread_runner)); newrunner->callback = callback; newrunner->arg = arg; newrunner->next = NULL; // 加锁 pthread_mutex_lock(&(pool->mutex)); #ifdef DEBUG printf("threadpool_add_runner-> locked\n"); #endif // 将任务加入到等待队列中 if (pool->runner_head != NULL) { pool->runner_tail->next = newrunner; pool->runner_tail = newrunner; } else { pool->runner_head = newrunner; pool->runner_tail = newrunner; } // 解锁 pthread_mutex_unlock(&(pool->mutex)); #ifdef DEBUG printf("threadpool_add_runner-> unlocked\n"); #endif // 唤醒一个等待线程 pthread_cond_signal(&(pool->cond)); #ifdef DEBUG printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n"); #endif } /** * 销毁线程池 * 参数: * ppool:指向线程池结构有效地址的动态指针地址(二级指针) */ void threadpool_destroy(thread_pool** ppool) { thread_pool *pool = *ppool; // 防止2次销毁 if (!pool->shutdown) { pool->shutdown = 1; // 唤醒所有等待线程,线程池要销毁了 pthread_cond_broadcast(&(pool->cond)); // 等待所有线程中止 sleep(1); #ifdef DEBUG printf("threadpool_destroy-> wakeup all waiting threads\n"); #endif // 回收空间 free(pool->threads); // 销毁等待队列 thread_runner *head = NULL; while (pool->runner_head != NULL) { head = pool->runner_head; pool->runner_head = pool->runner_head->next; free(head); } #ifdef DEBUG printf("threadpool_destroy-> all runners freed\n"); #endif /*条件变量和互斥量也别忘了销毁*/ pthread_mutex_destroy(&(pool->mutex)); pthread_cond_destroy(&(pool->cond)); #ifdef DEBUG printf("threadpool_destroy-> mutex and cond destoryed\n"); #endif free(pool); (*ppool) = NULL; #ifdef DEBUG printf("threadpool_destroy-> pool freed\n"); #endif } } (编辑:通辽站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |