DSP

线程同步之semaphore信号量,代码实现

2019-07-13 19:15发布

一般在任务处理线程池中会有一个公共任务队列m_event_list,任务监视线程有任务添加到m_event_list时,可以通过semaphore.post()增加信号量数来唤醒在semaphore信号量上的睡眠任务处理线程简要代码为:void add_event(Event &e) {     m_event_list.push_back(e);     m_semaphore.post(); //唤醒在信号量队列上处于阻塞状态的任务处理线程来处理event }在每个任务线程的线程执行体threadProc()中,通过semaphore.pend()消费信号量来接收任务并处理任务,如果任务队列中没有任务要处理,则当前线程会被阻塞简要代码为:void threadProc() { while(looping()) { if (m_event_list.empty() != false) { m_semaphore.pend(); Event e = m_event_list.front(); m_event_list.pop_front(); handle_event(e); }//if //do other thing }//while }注:此处未做锁同步处理,在实际的使用过程当中一定要对m_event_list进行加锁同步。注:还有一点,在实际项目中,主线程和子线程们之间用信号量进行同步,而用关键段来处理子线程们之间的互斥!
系统内核semaphore接口如下所示:


下面给出自定义的semaphore功能代码,如下所示,有不对地方请指教。//Semaphore.h #ifndef _SEMAPHORE_H__ #define _SEMAPHORE_H__ #include using namespace std; typedef unsigned int uint32_t; struct internal { sem_t* m_sem; }; /// class CSemaphore 信号量类 class CSemaphore { private: CSemaphore(CSemaphore const&); CSemaphore& operator=(CSemaphore const&); public: /// 构造函数,创建系统信号量,initCnt表示信号量的初始计数 explicit CSemaphore(int initCnt = 0); /// 析构函数,销毁系统信号量 ~CSemaphore(); /// 消费信号量,当信号量为0时线程再消费信号量,线程就会被阻塞,进入信号量等待队列的队尾 /// eturn pend操作后当前信号量计数 int pend(); /// 生产信号量,如果信号量为0时有线程生产信号量,会唤醒信号量其等待队列的第一个线程 /// eturn post操作后当前信号量计数 int post(); /// 消费信号量,当信号量为0时线程再消费信号量,线程就会被阻塞,直到超时(毫秒) /// eturn >=0表示当前信号量计数,-1:表示超时 int pend(uint32_t timeout); /// 尝试减少信号量,如果信号量已经为0则马上返回 /// eturn 0:信号量减少成功,-1:信号量减少失败 int tryPend(); private: struct internal* m_internal; };//class CSemaphore #endif

//Semaphore.cpp #include "semaphore.h" #include #include #include #include #include #define PRINT_LINE_INFO do { printf("%s:%d:%s ", __FILE__, __LINE__, __FUNCTION__); }while(0); #define SEM m_internal->m_sem CSemaphore::CSemaphore(int initCnt) :m_internal(new struct internal) { SEM = (sem_t*)malloc(sizeof(sem_t)); memset (SEM, 0, sizeof(sem_t)); sem_init(SEM, 0, initCnt); PRINT_LINE_INFO; } CSemaphore::~CSemaphore() { sem_destroy(SEM); free(SEM); delete m_internal, m_internal = NULL; PRINT_LINE_INFO; } int CSemaphore::pend() { int ret = 0; do { PRINT_LINE_INFO; ret = sem_wait(SEM); }while(ret != 0 && errno == EINTR); return ret; } int CSemaphore::post() { PRINT_LINE_INFO; return sem_post(SEM); } int CSemaphore::pend(uint32_t timeout) { //此处不使用sem_timedwait函数的原因是sem_timedwait使用系统时间, //假如系统时间被修改可能导致sem_timeout的超时机制混乱 int times = (timeout + 1) / 10; int ret = 0; while ((ret = tryPend() != 0) && (times-- > 0)) { PRINT_LINE_INFO; ::usleep(1000); } return ret; } int CSemaphore::tryPend() { int ret = sem_trywait(SEM); if (ret == -1 && errno == EAGAIN) return -1; if (ret == 0) return 0; return 1; }

//Main.cpp #include "Semaphore.h" int main() { CSemaphore sem(0); sem.pend(10 * 1000); sem.pend(); return 0; }
运行结果如下所示:
(完)