一般在任务处理线程池中会有一个公共任务队列m_event_list,任务监视线程有任务添加到m_event_list时,可以通过semaphore.post()增加信号量数来唤醒在semaphore信号量上的睡眠任务处理线程简要代码为:
void add_event(Event &e)
{
m_event_list.push_back(e);
m_semaphore.post(); //唤醒在信号量队列上处于阻塞状态的任务处理线程来处理event
}
在每个任务线程的线程执行体threadProc()中,通过semaphore.pend()消费信号量来接收任务并处理任务,如果任务队列中没有任务要处理,则当前线程会被阻塞简要代码为:
void threadProc()
{
while(looping())
{
if (m_event_list.empty() != false)
{
m_semaphore.pend();
Event e = m_event_list.front();
m_event_list.pop_front();
handle_event(e);
}//if
//do other thing
}//while
}
注:此处未做锁同步处理,在实际的使用过程当中一定要对m_event_list进行加锁同步。注:还有一点,在实际项目中,主线程和子线程们之间用信号量进行同步,而用关键段来处理子线程们之间的互斥!
系统内核semaphore接口如下所示:
下面给出自定义的semaphore功能代码,如下所示,有不对地方请指教。
//Semaphore.h
#ifndef _SEMAPHORE_H__
#define _SEMAPHORE_H__
#include
using namespace std;
typedef unsigned int uint32_t;
struct internal
{
sem_t* m_sem;
};
/// class CSemaphore 信号量类
class CSemaphore {
private:
CSemaphore(CSemaphore const&);
CSemaphore& operator=(CSemaphore const&);
public:
/// 构造函数,创建系统信号量,initCnt表示信号量的初始计数
explicit CSemaphore(int initCnt = 0);
/// 析构函数,销毁系统信号量
~CSemaphore();
/// 消费信号量,当信号量为0时线程再消费信号量,线程就会被阻塞,进入信号量等待队列的队尾
///
eturn pend操作后当前信号量计数
int pend();
/// 生产信号量,如果信号量为0时有线程生产信号量,会唤醒信号量其等待队列的第一个线程
///
eturn post操作后当前信号量计数
int post();
/// 消费信号量,当信号量为0时线程再消费信号量,线程就会被阻塞,直到超时(毫秒)
///
eturn >=0表示当前信号量计数,-1:表示超时
int pend(uint32_t timeout);
/// 尝试减少信号量,如果信号量已经为0则马上返回
///
eturn 0:信号量减少成功,-1:信号量减少失败
int tryPend();
private:
struct internal* m_internal;
};//class CSemaphore
#endif
//Semaphore.cpp
#include "semaphore.h"
#include
#include
#include
#include
#include
#define PRINT_LINE_INFO
do {
printf("%s:%d:%s
", __FILE__, __LINE__, __FUNCTION__);
}while(0);
#define SEM m_internal->m_sem
CSemaphore::CSemaphore(int initCnt)
:m_internal(new struct internal)
{
SEM = (sem_t*)malloc(sizeof(sem_t));
memset (SEM, 0, sizeof(sem_t));
sem_init(SEM, 0, initCnt);
PRINT_LINE_INFO;
}
CSemaphore::~CSemaphore()
{
sem_destroy(SEM);
free(SEM);
delete m_internal, m_internal = NULL;
PRINT_LINE_INFO;
}
int CSemaphore::pend()
{
int ret = 0;
do {
PRINT_LINE_INFO;
ret = sem_wait(SEM);
}while(ret != 0 && errno == EINTR);
return ret;
}
int CSemaphore::post()
{
PRINT_LINE_INFO;
return sem_post(SEM);
}
int CSemaphore::pend(uint32_t timeout)
{
//此处不使用sem_timedwait函数的原因是sem_timedwait使用系统时间,
//假如系统时间被修改可能导致sem_timeout的超时机制混乱
int times = (timeout + 1) / 10;
int ret = 0;
while ((ret = tryPend() != 0) && (times-- > 0))
{
PRINT_LINE_INFO;
::usleep(1000);
}
return ret;
}
int CSemaphore::tryPend()
{
int ret = sem_trywait(SEM);
if (ret == -1 && errno == EAGAIN)
return -1;
if (ret == 0)
return 0;
return 1;
}
//Main.cpp
#include "Semaphore.h"
int main()
{
CSemaphore sem(0);
sem.pend(10 * 1000);
sem.pend();
return 0;
}
运行结果如下所示:
(完)