基于嵌入式Linux的视频采集系统13-----源程序----rtp_service.cpp

2019-07-13 04:25发布

本文来自:
http://blog.chinaunix.net/uid-23093301-id-86402.html

#include "rtp_service.h"
#define logtrace(x) printf x #define logerror(x) printf x

static void *  thread_fn(void *arg){ CRtpService* p = (CRtpService*)arg; p->process_task();; return ((void *)0); }
BaseRTPSession::BaseRTPSession():num_dest(0) { }
string rtp_error(int rtperr) { string ret; if (rtperr < 0) { ret = RTPGetErrorString(rtperr); std::cout << "ERROR: " << ret << std::endl; //exit(-1); } return ret; }

void BaseRTPSession::OnNewSource(RTPSourceData *dat) { if (dat->IsOwnSSRC()) return;
uint32_t ip; uint16_t port; if (dat->GetRTPDataAddress() != 0) { const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress()); ip = addr->GetIP(); port = addr->GetPort(); } else if (dat->GetRTCPDataAddress() != 0) { const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress()); ip = addr->GetIP(); port = addr->GetPort()-1; } else return; RTPIPv4Address dest(ip,port); AddDestination(dest);
struct in_addr inaddr; inaddr.s_addr = htonl(ip); num_dest ++; std::cout << "Adding destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl; }
void BaseRTPSession::OnBYEPacket(RTPSourceData *dat) { if (dat->IsOwnSSRC()) return; uint32_t ip; uint16_t port; if (dat->GetRTPDataAddress() != 0) { const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress()); ip = addr->GetIP(); port = addr->GetPort(); } else if (dat->GetRTCPDataAddress() != 0) { const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress()); ip = addr->GetIP(); port = addr->GetPort()-1; } else return; RTPIPv4Address dest(ip,port); DeleteDestination(dest); struct in_addr inaddr; inaddr.s_addr = htonl(ip); num_dest--; std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl; }
void BaseRTPSession::OnRemoveSource(RTPSourceData *dat) { if (dat->IsOwnSSRC()) return; if (dat->ReceivedBYE()) return; uint32_t ip; uint16_t port; if (dat->GetRTPDataAddress() != 0) { const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress()); ip = addr->GetIP(); port = addr->GetPort(); } else if (dat->GetRTCPDataAddress() != 0) { const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress()); ip = addr->GetIP(); port = addr->GetPort()-1; } else return; RTPIPv4Address dest(ip,port); DeleteDestination(dest); struct in_addr inaddr; inaddr.s_addr = htonl(ip); num_dest--; std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl; }

CRtpService::CRtpService():started(false),m_timestamp(0) { int err = pthread_mutex_init(&m_mutex,NULL); if(err) { logerror(("CRtpService::CRtpService pthread mutex init fail ")); return; } err = pthread_cond_init(&m_pthread_cond,NULL); if(err) { logerror(("CRtpService::CRtpService pthread cond init fail ")); return; } logtrace(("CRtpService::CRtpService end ok ")); }
CRtpService::~CRtpService() { logtrace(("CRtpService::~CRtpService end ok ")); if(started) stop(); }

int CRtpService::start(string port) { logtrace(("CRtpService::start begin ")); int err; if(started) return 0; started = true; RTPUDPv4TransmissionParams transparams; RTPSessionParams sessparams; sessparams.SetOwnTimestampUnit(1.0/10.0); sessparams.SetAcceptOwnPackets(true); transparams.SetPortbase(atoi(port.c_str())); int status = m_rtp_session.Create(sessparams,&transparams); if(status) { string ret = rtp_error(status); logerror(("CRtpService::start sess create fail=<%s> ",ret.c_str())); return -1; } err = pthread_create(&m_thread_id,NULL,thread_fn,this); if(err != 0){ logerror(("can't create thread: %s ",strerror(err))); return -1; } err = pthread_detach( m_thread_id ); if(err != 0){ logerror(("thread detach fail: %s ",strerror(err))); return -1; } logtrace(("CRtpService::start end ok ")); return 0; }
int CRtpService::stop() { logtrace(("CRtpService::stop begin ")); if(!started) { return 0; } started = false; pthread_mutex_lock(&m_mutex);  pthread_cond_signal(&m_pthread_cond); pthread_mutex_unlock(&m_mutex);  logtrace(("CRtpService::stop end ok ")); return 0; }

int CRtpService::send_rtp_packet(const string& data) { //logtrace(("CRtpService::send_rtp_packet begin data size=<%u> ",data.size())); pthread_mutex_lock(&m_mutex);  m_task_queue.push(data); pthread_cond_signal(&m_pthread_cond);  pthread_mutex_unlock(&m_mutex); 
//logtrace(("CRtpService::send_rtp_packet end ok ")); return 0; }
int CRtpService::process_task() { logtrace(("CRtpService::process_task begin ")); string data; int status; bool flag_begin = true; while(1) { m_rtp_session.BeginDataAccess(); //logtrace(("CRtpService::process_task BeginDataAccess ")); // check incoming packets if (m_rtp_session.GotoFirstSourceWithData()) { do { RTPPacket *pack; while ((pack = m_rtp_session.GetNextPacket()) != NULL) { // You can examine the data here //logtrace(("Got packet ! ")); // we don't longer need the packet, so // we'll delete it m_rtp_session.DeletePacket(pack); } } while (m_rtp_session.GotoNextSourceWithData()); } m_rtp_session.EndDataAccess(); status = m_rtp_session.Poll(); if(status) { logerror(("process_task::Poll error<%s>",rtp_error(status).c_str())); } if( data.empty() ){ m_timestamp = 1; flag_begin = true; pthread_mutex_lock(&m_mutex); while(m_task_queue.empty()){    pthread_cond_wait(&m_pthread_cond, &m_mutex);      if(!started)    {     m_rtp_session.BYEDestroy(RTPTime(10,0),0,0);     pthread_mutex_unlock (&m_mutex);     return 0;    } }//! end while if(!m_task_queue.empty()){    data =  m_task_queue.front();    m_task_queue.pop();        } pthread_mutex_unlock (&m_mutex); }//! end if( data.empty() ) else{ m_timestamp = 0; flag_begin = false; } m_timestamp=10; do{ int size = data.size();  if(size > 1000) { size = 1000; } //logtrace(("process_task::size=<%d> m_timestamp=<%ld> ",size,m_timestamp)); status = m_rtp_session.SendPacket(data.data(),size,0,false,m_timestamp); if(status) { logerror(("process_task::SendPacket error<%s>",rtp_error(status).c_str())); continue; } data = data.substr(size); m_timestamp = 0; usleep(1000); }while(!data.empty()); } logtrace(("CRtpService::process_task end ok ")); return 0; }