linux的多任务编程-线程池

2019-07-12 16:19发布

简介

在嵌入式系统环境下,由于系统资源和任务的特点,多线程成了实现多任务处理的重要方式.在一些常见的应用环境中,如Web服务器,Email服务器以及数据库服务器等都具有一个共同点:单位时间内必须处理很多并发的连接请求,但处理时间却相对较短.传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数及其频繁,那么服务器将处于不停的创建线程,销毁线程的状态.
线程池是采用多线程解决方案来提高系统性能的一个最为出 {MOD}的模型,它通过预创建一定数量的工作线程来对系统进行并发处理,使得系统的运行效率提高.因为线程池能使得系统采用较为轻量的,可控的系统资源实现系统并发处理能力的最大化,所以许多应用软件都采用了线程池模型.
除此之外,线程是能够限制创建的进程个数.通常线程池所允许的并发线程具有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将等待.而传统方案中,如果同时请求数据为200,那么最坏情况下,系统可能需要产生200个线程.尽管这不是一个很大的数目,但是也有系统达不到这个要求.
线程池模型有许多种,常见的有三种:任务队列控制的线程池模型,工作线程控制的线程池模型,主控线程控制的线程池模型.本文将给出一中任务队列控制的线程池模型的具体实现.
任务队列控制的线程池模型是通过任务队列来对线程池进行并发调度,如下图所示.线程池是由预创建的一个任务队列和一组工作线程组成,其中任务队列中存放工作对象.线程池启动后,工作线程将采用轮询的方式从任务队列中获取任务对象,由于初始化时任务队列中不存在任务对象,这时的信号量为0,所有的工作线程都处于阻塞状态.主控线程将任务对象放入任务队列中,并将信号量加1,这样信号量就会唤醒一个阻塞中的工作线程(操作系统层面决定唤醒哪个阻塞的工作线程).工作线程唤醒后从任务队列中获取一个任务对象并执行该任务,执行完后,工作线程将再次访问信号量,如果信号信号量大于0,那么工作线程将继续从任务队列中获取任务对象并执行,知道信号量等于0,这时的工作线程将再次被阻塞.

任务队列控制的线程模型主要是通过任务队列上的信号来控制线程池中的线程调度.

线程池的实现

本例程共由5个文件构成:tpool.h,tpool.c,log.c,log.h,testpool.c.其中tpool.h,tpool.c是实现线程池的核心文件,在tpool.h中定义了线程池和工作线程的数据结构及创建线程池和添加工作线程等方法,tpool.c中为具体的实现代码.log.c,log.h提供了一种记录文件的生成手段,能在文件中记录自定义的信息.testpool.c为线程池的测试程序. 该例子由多个源文件组成,编译命令如下:
gcc -g -pthread -o testpool testpool.c tpool.c log.c 下面给出这个例子的源代码,首先是线程池的定义文件tpool.h: [cpp] view plain copy  print?
  1. /*------------------------------------------------------------------------- 
  2.  * tpool.h – 线程池定义 
  3.  * ------------------------------------------------------------------------- 
  4.  */  
  5. #ifndef _TPOOL_H_  
  6. #define _TPOOL_H_  
  7. #include    
  8. #include    
  9. /*工作线程链表*/  
  10. typedef struct tpool_work  
  11. {  
  12.   void (*handler_routine)();            /*任务函数指针*/  
  13.   void *arg;                        /*任务函数参数*/  
  14.   struct tpool_work *next;              /*下一个任务链表*/  
  15. } tpool_work_t;  
  16. /*线程池结构体*/  
  17. typedef struct tpool  
  18. {  
  19.   int num_threads;                  /*最大线程数*/  
  20.   int max_queue_size;               /*最大任务链表数*/  
  21.   int do_not_block_when_full;           /*当链表满时是否阻塞*/  
  22.   pthread_t *threads;               /*线程指针*/  
  23.   int cur_queue_size;  
  24.   tpool_work_t *queue_head;         /*链表头*/  
  25.   tpool_work_t *queue_tail;         /*链表尾*/  
  26.   pthread_mutex_t queue_lock;       /*链表互斥量*/  
  27.   pthread_cond_t queue_not_full;        /*链表条件量-未满*/  
  28.   pthread_cond_t queue_not_empty;   /*链表条件量-非空*/  
  29.   pthread_cond_t queue_empty;       /*链表条件量-空*/  
  30.   int queue_closed;  
  31.   int shutdown;  
  32. } tpool_t;  
  33. /* 初始化连接池 */  
  34. extern tpool_t *tpool_init(int num_worker_threads,  
  35.     int max_queue_size, int do_not_block_when_full);  
  36. /* 添加一个工作线程 */  
  37. extern int tpool_add_work(tpool_t *pool, void  (*routine)(), void *arg);  
  38. /* 清除线程池*/  
  39. extern int tpool_destroy(tpool_t *pool, int finish);  
  40. #endif /* _TPOOL_H_ */  
在这个文件中定义了线程池的结构,工作线程函数的原型以及线程池的基本操作.线面给出的tpool.c是线程池的实现.函数tpool_init完成了线程池的初始化操作,包括对内存的设置,对线程属性的设置和池中线程的预创建.
[cpp] view plain copy  print?
  1. /* ------------------------------------------------------------------------- 
  2.  * tpool.c – 线程池的实现 
  3.  * ------------------------------------------------------------------------- 
  4.  */  
  5. #include    
  6. #include    
  7. #include     
  8. #include    
  9. #include "tpool.h"  
  10. #include "log.h"  
  11. /* 工作线程 */  
  12. void *tpool_thread(void *tpool);  
  13. /***************线程池初始化*****************************/  
  14. tpool_t *tpool_init(int num_worker_threads,    /*线程池线程个数*/  
  15.                   int max_queue_size,      /*最大任务数*/  
  16.               int do_not_block_when_full)   /*是否阻塞任务满的时候*/  
  17. {  
  18.     int i, rtn;  
  19.     tpool_t *pool;   
  20.     lprintf(log, INFO, "init pool begin ... ");  
  21.     /* 创建线程池结构体 */  
  22.     if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL)   
  23.     {  
  24.         lprintf(log, FATAL, "Unable to malloc() thread pool! ");  
  25.         return NULL;  
  26.     }  
  27.     /* 设置线程池架构体成员 */  
  28.     pool->num_threads = num_worker_threads;                      /*工作线程个数*/  
  29.     pool->max_queue_size = max_queue_size;                       /*任务链表最大长度*/  
  30.     pool->do_not_block_when_full = do_not_block_when_full;       /*任务链表满时是否等待*/  
  31.     /* 生成线程池缓存 */  
  32.     if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)   
  33.     {  
  34.         lprintf(log, FATAL,"Unable to malloc() thread info array ");  
  35.         return NULL;