Rtp Class Reference

<class description="" goes="" here=""> <short description=""> More...

#include <Rtp.hpp>

Inheritance diagram for Rtp:

IO VThread List of all members.

Public Member Functions

 Rtp (const char *url, int remotePrt, const char *address, int localPrt, ESInfo *es, PacketizationLayer *packetization, bool writeOnly, const TerminalCapabilities *tc, GlobalTimer *globalTimer, Statistics *statistics, bool allowVopTimeDetection=true)
 creates an RTP object
StatisticsgetStatistics ()
void setESInfo (ESInfo *new_es)
ESInfogetESInfo ()
FramegetFrame ()
 returns a frame if one complete frame is available, otherwise null is returned.
int writeFrame (Frame *frm, ESInfo *out_es=NULL)
 just adds a frame to the send buffer.
void resendHeader ()
 if this is called, the next sent frame will also include a decoder header this could be used for multicast sessions
void run ()
 this thread works on the read buffer, using the estimated avail.
bool open ()
 opens the IO connection.
State play (double prefetchTime)
State pause ()
State mute ()
bool close (bool immediate=false)
 closes the IO class.
int getLocalPort () const
int getBufferFillLevelInPercent () const
 returns preQ buffer fill in percent
int getBufferFillLevel () const
 returns a value from 0..100 indicating buffer usage
int getBufferFillLevelInBytes () const
 returns preQ buffer fill in Bytes
int flushBuffer (long from_ts=0, long to_ts=-1)
 flushes the preQ buffer from starting TimeStamp to end TS
bool isInput ()
bool isOutput ()
bool destroy ()
 you can't delete a remote url :-) will always return true
const char * getURL () const
 returns a pointer to the local file name or an URL
char * getSSRC ()
 returns NULL if the session was not opened, Otherwise, a char* containing a 08x hexValue
void setRtxInfo (const rtx_info *rtx)
 sets a shallow copy of
const rtx_info * getRtxInfo () const
bool setToFrameNumber (u32 frameNumber)
 repositions the IO class to the given frame.
bool setEndFrameNumber (u32 frameNumber)
 sets the last frame that should be sent.

Protected Types

typedef Rtp::NetPacket_t NetPacket_t
enum  QOrder { Q_HEAD, Q_TAIL, Q_TS, Q_SEQ }

Protected Member Functions

bool insertToPreQ (u8 *payload, long pl_size, Frame::FrameType type, long vop_size, int prio, u32 rtpTimestamp, u32 fragCounter, bool lastFrag, u32 rtp_seq, QOrder qo)
 inserts an SL packet to the preQ in the given order
bool reinsertToPreQ (NetPacket_t *p, uint rtp_seq)
 reinserts a SL packet for retransmission to the head of the preQ
bool addToPreQ (NetPacket_t *p, QOrder qo, bool pushback=false)
 adds a NetPacket to the head or tail of the preQ (this is used from insertToPreQ or for retransmissions) or sorted by a frames' timestamp (used for receiving RTP)
NetPacket_t * popPreQ (bool blocking=true)
 block or get the first packet off the preQ this does neither free the packet, nor the packet payload!
void calcAdaptPreQValues (int net_bw)
 it decides, how many stream seconds should be brought thru in this second (this is needed to fill up the buffers or send a better quality even if the network does not allow it --> only if client buffers are full!) it also sets the percentage of adaptation
void markFullFramePartially (bool dropped, bool sent, u32 ts)
 searches all packets with the same TS (so they belong to one single frame) and marks them all as partially_dropped or partially_sent this is used for better adaptPreQ() results (it has to adapt full frames, not packets)
void adaptPreQ (int net_bw)
 marks unimportant packets for the next streamout sec, according to their priority marks packets for the next streamout second starting from the preQ head.
int getNextPreQ_TS ()
bool pushNetPacket (NetPacket_t *p)
NetPacket_t * popNetPacket (uint16_t rtp_seq)
NetPacket_t * popNetPacketByRTXseq (uint16_t rtx_rtp_seq)
void sendQupdate (long streamout_sec)
 scans the SendQ for already ack'd or out-dated frames and releases them.
void preQupdate ()
 scans the preQ for missing packets and requests their retransmission since in the client, not frame type is known, EVERY missing packet is reqeusted...
int doSend (NetPacket_t *p)
void sendThread ()
void readThread ()
void ucl_update (u32 ts, u32 timeout)
 calls UCL lib for RTP,RTCP updates and callback functions (recv data)

Static Protected Member Functions

void RtpCallback (struct rtp *session, rtp_event *e)
void fb_print (struct rtp *session, uint32_t ssrc, rtcp_fb *fb)
void fb_handle (struct rtp *session, uint32_t ssrc, rtcp_fb *fb)
void rtp_data_handle (struct rtp *session, uint32_t ssrc, rtp_packet *packet)

Protected Attributes

Rtp::PreQ_t preQ
Rtp::SendQ_t sendQ
Statisticsstats
rtp * session
uint32_t sender_ssrc
int remotePort
int localPort
char * remoteAddress
bool writeToNetwork
bool firstFrame
struct {
   int   numProcessedFrames
   int   seqno
   int   act_ts
IOFrameStat
GlobalTimerglobalTimer
PacketizationLayerpLayer
ESInfoes
char * url
rtx_info * rtx
bool feedbackEnabled
bool rtxEnabled
int64_t last_rr_seq_no
int64_t last_rr_sum_pkts_lost
double last_rr_lost_fract
double last_rr_received
int64_t first_rtp_seq
uint16_t last_rtp_seq
int ts_offset
int64_t first_rtx_rtp_seq
uint16_t last_rtx_rtp_seq
int interarrival_time
int net_bw_before_superdrain
int real_bw
bool threadWasPaused
bool allowVopTimeDetection

Static Protected Attributes

const int TTL = 16
const int MAX_PAYLOAD_SIZE = 65000
const int RTP_PAYLOAD_TYPE = 96

Detailed Description

<class description="" goes="" here=""> <short description="">

Author:
Michael Kropfberger and Peter Schojer
Version:
Id
Rtp.hpp,v 1.30 2006/01/26 11:42:00 mkropfbe Exp

Definition at line 108 of file Rtp.hpp.


Constructor & Destructor Documentation

Rtp::Rtp const char *  url,
int  remotePrt,
const char *  address,
int  localPrt,
ESInfo es,
PacketizationLayer packetization,
bool  writeOnly,
const TerminalCapabilities tc,
GlobalTimer globalTimer,
Statistics statistics,
bool  allowVopTimeDetection = true
 

creates an RTP object

Parameters:
url: the source/dest for this IO class, deep-copied
remotePrt: the remote port where data is sent to
address: the ip address of the remote host, deep-copied
localPrt: the local port, where we are binding and receiving
es: the ElementaryStream sent, shallow-copied
packetization: specifies the transport mode: can be RFC3016, multiSL or flexmux
writeonly specifies if the RTP connection is for sending (writeonly is set to true), of for reading
tc: the TerminalCapabilities, shallow-copied (eg. RTX on/off)
allowVopTimeDetection: if set to true (default), allows rtp to detect and CHANGE the VOPTIMEincrement for the ESInfo object
Definition at line 133 of file Rtp.cpp.

References Statistics::getBaseStreamBW(), TerminalCapabilities::getNetworkCapacityInByte(), Statistics::getStreamBW(), Statistics::setBaseStreamBW(), and IO::setState().

00136 : IO() 00137 { 00138 assert(e); 00139 assert(p); 00140 assert(uri && addr); 00141 allowVopTimeDetection=avtd; 00142 url = new char[strlen(uri) + 1]; 00143 rtx=NULL; 00144 remotePort = remotePrt; 00145 remoteAddress = new char[strlen(addr) + 1]; 00146 localPort = localPrt; 00147 globalTimer = gt; 00148 stats = statistics; 00149 00150 writeToNetwork = w; 00151 strcpy(remoteAddress, addr); 00152 strcpy(url, uri); 00153 setState(CLOSED); 00154 session = NULL; 00155 pLayer = p; 00156 firstFrame=false; // must use open to set to true 00157 memset(&sendQ.Q,0,sizeof(sendQ.Q)); 00158 preQ.Q = preQ.Q_tail = NULL; 00159 sendQ.size = preQ.size = 0; 00160 resendFrameHeader=false; 00161 preQ.needsPrefetching=false; //initial prefetching autodetected 00162 00163 preQ.MUTEX_Q.initialize("RTP-preQ.MUTEX_Q"); 00164 00165 preQ.MUTEX_Q_DATA_AVAIL.initialize("RTP-preQ.MUTEX_Q_DATA_AVAIL"); 00166 preQ.MUTEX_Q_DATA_AVAIL.lock(); 00167 00168 preQ.MUTEX_Q_FULL.initialize("RTP-preQ.MUTEX_Q_FULL"); 00169 preQ.MUTEX_Q_DONE.initialize("RTP-preQ.MUTEX_Q_DONE"); 00170 preQ.MUTEX_Q_DONE.lock(); 00171 00172 sendQ.MUTEX_Q.initialize("RTP-sendQ.MUTEX_Q"); 00173 00174 last_rr_seq_no = -1; 00175 last_rr_sum_pkts_lost = -1; 00176 00177 last_rtp_seq=0; 00178 last_rtx_rtp_seq=0; 00179 interarrival_time=0; 00180 //net_bw_before_superdrain=-1; 00181 real_bw=0; 00182 first_rtp_seq = -1; 00183 first_rtx_rtp_seq = -1; 00184 ts_offset = 0; 00185 00186 feedbackEnabled = false; 00187 rtxEnabled = false; 00188 threadWasPaused=false; 00189 00190 this->setESInfo(e); 00191 if(tc) 00192 stats->setBaseStreamBW(tc->getNetworkCapacityInByte()); //set base, unaffected by switching 00193 else 00194 stats->setBaseStreamBW(stats->getStreamBW()); //set base, unaffected by switching 00195 00196 00197 dprintf_full("Rtp::Rtp (url=%s, remotePort=%i, remoteAddr=%s, localPort=%i, writeOnly=%i, avgBW=%i, TCbw=%i)\r\n", 00198 url, remotePort,remoteAddress,localPort,(int)writeToNetwork,stats->getStreamBW()/128, stats->getBaseStreamBW()/128 ); 00199 00200 }


Member Function Documentation

void Rtp::adaptPreQ int  net_bw  )  [protected]
 

marks unimportant packets for the next streamout sec, according to their priority marks packets for the next streamout second starting from the preQ head.

if new data or RTX makes the preQ dirty, a new marking is done, now starting at exactly from the new preQ head. RTX is never marked for adaptation. it has to adapt full frames, not packets. to do so, it looks at partially_sent and partially_dropped markers of packets

Parameters:
net_bw sets the available bandwidth of the network for the next streamout second
Definition at line 2780 of file Rtp.cpp.

References Statistics::getAdaptRate(), Statistics::getAdaptSecs(), Statistics::getBufAheadSec(), and Statistics::getStreamBW().

02780 { 02781 CompareNetPackets cmpFnc; 02782 priority_queue<NetPacket_t*,std::vector<NetPacket_t*>,CompareNetPackets> pq(cmpFnc); 02783 int size=0; 02784 NetPacket_t *p=NULL, *pp=NULL; 02785 02786 dprintf_full("Rtp:adaptPreQ: keeping %2i%% reach netBW %i KBits of streamBW" 02787 " %i KBits for %2.3f secs == realBW %i (ClBuf: %2.3f)\n", 02788 stats->getAdaptRate(), net_bw*8/1024, stats->getStreamBW()*8/1024, 02789 stats->getAdaptSecs(),real_bw/128, stats->getBufAheadSec()); 02790 02791 // dprintf_full("Rtp::adaptPreQ waiting for preQ mutex\n"); 02792 preQ.MUTEX_Q.lock(); 02793 //dprintf_full("Rtp::adaptPreQ got preQ mutex\n"); 02794 p=preQ.Q; 02795 02796 // read in full streamout block and mark it to non-drop 02797 // this includes all partially dropped or sent frame-packets, I_VOPs, P_VOPS.... and RTX 02798 // because all of them shall be streamed in the next sec. 02799 while (p && (size+p->pl_size <= (unsigned)real_bw)) { 02800 pq.push(p); 02801 size += p->pl_size; 02802 p->drop_me = false; 02803 p=p->next; 02804 } 02805 02806 // chop off the packets to meet the net_bw 02807 // we have to ignore partially sent frames and RTX, we force dropping of partially dropped ones 02808 while (!pq.empty() && (size > net_bw)) { 02809 p=pq.top(); 02810 pq.pop(); 02811 02812 // if (p->prio == IO_NETWORK_HIGHEST_PRIORITY) { 02813 if (p->prio <= RTP_MIN_PRIO) { //keep at least 10 fps 02814 //FIXME: what now? switching? 02815 dprintf_full("Rtp::adaptPreQ BAILING OUT because below RTP_MIN_PRIO!! (prio %i VOP_type %s TS %i:%i)\n" 02816 ,p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_ts,p->rtp_seq); 02817 break; 02818 } 02819 02820 if ( ((p->resent_counter == 0) && (!p->partially_sent)) || (p->partially_dropped) ) { 02821 //BVOP for first send, so drop it! 02822 size -= p->pl_size; //dont reduce size for RTX, so we will adapt more to fit RTX in network 02823 p->drop_me = true; 02824 dprintf_full("Rtp::adaptPreQ marking prio %i VOP_type %s TS %i:%i for dropping... partially_dropped %i\n",p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_ts,p->rtp_seq,p->partially_dropped); 02825 } else 02826 dprintf_full("Rtp::adaptPreQ NOT marking since its a RTX or partially sent (prio %i VOP_type %s TS %i:%i)\n" 02827 ,p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_ts,p->rtp_seq); 02828 } 02829 02830 // DELETE the rest of FULL FRAME 02831 // only if B-VOP for first send, 02832 if (p && !pq.empty() && (p->prio > RTP_MIN_PRIO) 02833 && (p->resent_counter == 0) && (!p->partially_sent) ) { 02834 do { 02835 pp = pq.top(); 02836 pq.pop(); 02837 if (pp->rtp_ts == p->rtp_ts) { //no checking on RTX is needed, since list is sorted by TS:seqNo 02838 pp->drop_me = true; 02839 dprintf_full("Rtp::adaptPreQ rest-marking prio %i VOP_type %s TS %i:%i for dropping...\n",pp->prio,Frame::VOPTypeToChar(p->vop_type),pp->rtp_ts,p->rtp_seq); 02840 } else 02841 break; 02842 } while (!pq.empty() ); 02843 } 02844 02845 preQ.isDirty = false; 02846 preQ.MUTEX_Q.release(); 02847 }

void Rtp::calcAdaptPreQValues int  net_bw  )  [protected]
 

it decides, how many stream seconds should be brought thru in this second (this is needed to fill up the buffers or send a better quality even if the network does not allow it --> only if client buffers are full!) it also sets the percentage of adaptation

Parameters:
net_bw sets the available bandwidth of the network for the next streamout second
Definition at line 2698 of file Rtp.cpp.

References Statistics::getAdaptRate(), Statistics::getAdaptSecs(), Statistics::getBufAheadSec(), Statistics::getStreamBW(), Statistics::getStreamoutSec(), Statistics::setAdaptRate(), and Statistics::setAdaptSecs().

02698 { 02699 CompareNetPackets cmpFnc; 02700 priority_queue<NetPacket_t*,std::vector<NetPacket_t*>,CompareNetPackets> pq(cmpFnc); 02701 02702 02703 if (stats->getStreamoutSec() < RTP_PREFETCH_SECS) 02704 return; 02705 stats->setAdaptSecs(1.0); //normal adaptation 02706 if (stats->getBufAheadSec() < RTP_BUF_LOWWATER) { 02707 stats->setAdaptSecs(stats->getAdaptSecs() + (float)RTP_BUF_FILL_FACT); //fill buf (eg. 1.10) 02708 } 02709 02710 if (net_bw_before_superdrain != -1) { //we are in super draining phase 02711 dprintf_full("Rtp::calcAdaptPreQValues turning off adaptation, totally use buffer, we have sooo much of it...\n"); 02712 real_bw = net_bw; //turn off adaptation, totally use buffer, we have sooo much of it... 02713 } else { 02714 real_bw = (int)((float)stats->getStreamBW() * stats->getAdaptSecs()); 02715 if (real_bw < net_bw) //if buf_drain more than net_bw 02716 real_bw = net_bw; //no need to spoil net bw! :) 02717 } 02718 //if (stats->getAdaptRate(0)) //smoother adapt changes 02719 stats->setAdaptRate(net_bw*100 / real_bw); 02720 //else 02721 // stats->setAdaptRate((int)((float)stats->getAdaptRate()*(1 - 0.3) + (net_bw*100 / real_bw)*0.3) + 1); 02722 02723 if (stats->getBufAheadSec() > RTP_BUF_HIGHWATER) { 02724 if (net_bw <= stats->getStreamBW()) { //so adaptSecs is < 1.0 02725 stats->setAdaptSecs((float)net_bw / (float)stats->getStreamBW()); 02726 real_bw = (int)((float)stats->getStreamBW() * stats->getAdaptSecs()); 02727 stats->setAdaptRate(net_bw*100 / real_bw); 02728 dprintf_full("Rtp::calcAdaptPreQValues: superdrain mode: TURNING OFF ADAPTATION since we have so much buffer!\n"); 02729 } 02730 } 02731 02732 if ((stats->getBufAheadSec() > RTP_BUF_LOWWATER) && (stats->getAdaptRate() < (float)RTP_MAX_ADAPT/100.0)) { 02733 //we have enough buffers to reduce necessary adaptation to B-frames only 02734 //stats->setAdaptSecs((float)net_bw / ((float)RTP_MAX_ADAPT/100.0 * (float)stats->getStreamBW())); 02735 stats->setAdaptSecs(stats->getAdaptSecs() - (float)RTP_BUF_DRAIN_FACT); //drain buf (eg. 0.70) 02736 02737 real_bw = (int)((float)stats->getStreamBW() * stats->getAdaptSecs()); 02738 stats->setAdaptRate(net_bw*100 / real_bw); 02739 dprintf_full("Rtp::calcAdaptPreQValues: superdrain mode: ACTIVELY DRAINING BUFFER to preserve quality!\n"); 02740 } 02741 02742 dprintf_full("Rtp:calcAdaptPreQValues: keeping %2i%% reach netBW %i KBits of " 02743 "streamBW %i KBits for %2.3f secs == realBW %i (ClBuf: %2.3f)\n", 02744 stats->getAdaptRate(), net_bw*8/1024, stats->getStreamBW()*8/1024, 02745 stats->getAdaptSecs(),real_bw/128, stats->getBufAheadSec()); 02746 02747 }

bool Rtp::close bool  immediate = false  )  [virtual]
 

closes the IO class.

Parameters:
immediate specifies, if (optional) buffered data should be read/sent to the client (==false), or immediately dumped (==true)

Implements IO.

Definition at line 673 of file Rtp.cpp.

References PortGenerator::closePortPair(), popPreQ(), sendQupdate(), and IO::setState().

Referenced by getFrame().

00673 { 00674 if (state == CLOSED) 00675 return true; 00676 dprintf_full("Rtp::close: incoming state = %i\n",state); 00677 if ((state == CLOSING) && (immediate)) { //multiple closes 00678 dprintf_full("Rtp::close: multiple close... ignoring\n"); 00679 setState(FORCE_CLOSING); 00680 preQ.MUTEX_Q_DONE.release(); //signal that all data is sent 00681 sleep(2); //workaround to let datachannel::teardown wait! 00682 return true; 00683 } 00684 00685 if((state == OPEN) || (state == PAUSED) || (state == MUTED) 00686 || (state == PREFETCHING) || (state == STREAMEOF)) { 00687 if (!immediate) { 00688 setState(CLOSING); 00689 dprintf_full("Rtp::close waiting for correct sending/displaying (NOT IMMEDIATE)\n"); 00690 } else { //drop all frames, is IMMEDIATE 00691 dprintf_full("Rtp::close IMMEDIATE close requested\n"); 00692 setState(FORCE_CLOSING); 00693 } 00694 if (immediate) { //drop all frames, is IMMEDIATE 00695 NetPacket_t *p; 00696 00697 sendQupdate(-1); 00698 do { 00699 p = popPreQ(false); 00700 if ( (p != NULL) && (p->payload != NULL) ) { 00701 dprintf_full("freeing preQ Frame seq %i ts %i:%i\n", p->rtp_seq,p->rtp_ts,p->frag_no); 00702 delete p->payload; 00703 p->payload=NULL; 00704 delete p; 00705 p=NULL; 00706 } 00707 } while (p != NULL); 00708 preQ.MUTEX_Q_DONE.tryLock(); 00709 } 00710 00711 00712 dprintf_full("Rtp::close: waiting for data to be sent out...\n"); 00713 00714 preQ.MUTEX_Q.lock(); 00715 do { 00716 if (state == OPEN) //happens after unpause or unmute 00717 setState(CLOSING); 00718 //if (preQ.Q == NULL) //make sure that other thread in popPreQ exits 00719 preQ.MUTEX_Q_DATA_AVAIL.release(); // simulate data 00720 preQ.MUTEX_Q.release(); 00721 00722 dprintf_full("Rtp::close: give the other thread a chance to work off Q (state %i)\n",state); 00723 //give the other thread a chance to work off Q 00724 sleep(1); 00725 preQ.MUTEX_Q.lock(); 00726 } while ((state == CLOSING)||(state == PAUSED)||(state == MUTED)); 00727 preQ.MUTEX_Q.release(); 00728 00729 if (state == OPEN) //happens on unpausing (hereby rewinding) a already closed inputIO 00730 return false; 00731 00732 if (state != CLOSED) { 00733 // if (preQ.Q == NULL) //make sure that other thread in popPreQ exits 00734 preQ.MUTEX_Q_DATA_AVAIL.release(); //simulate data 00735 dprintf_full("Rtp::close: waiting for MUTEX_Q_DONE\n"); 00736 preQ.MUTEX_Q_DONE.lock(); //wait until all data is sent 00737 dprintf_full("Rtp::close: all frames sent, so we are done...\n"); 00738 setState(CLOSED); 00739 00740 00741 //FIXME: this crashes the app! 00742 if(session!=NULL) { 00743 rtp_send_bye(session); 00744 rtp_done(session); 00745 session = NULL; 00746 portGen.closePortPair(localPort); 00747 } 00748 if (rtxEnabled && rtx->session) { 00749 rtp_send_bye(rtx->session); 00750 rtp_done(rtx->session); 00751 rtx->session = NULL; 00752 portGen.closePortPair(rtx->localPort); 00753 } 00754 00755 // now delete mutex 00756 preQ.MUTEX_Q.destroy(); 00757 preQ.MUTEX_Q_DATA_AVAIL.destroy(); 00758 preQ.MUTEX_Q_FULL.destroy(); 00759 preQ.MUTEX_Q_DONE.destroy(); 00760 sendQ.MUTEX_Q.destroy(); 00761 } 00762 } 00763 return true; 00764 }

Frame * Rtp::getFrame  )  [virtual]
 

returns a frame if one complete frame is available, otherwise null is returned.

This function is typically blocking. Don't use a NULL return value to conclude STREAMEOF, always check with getState()!

Implements IO.

Definition at line 770 of file Rtp.cpp.

References addToPreQ(), close(), flushBuffer(), Frame::getAU(), Statistics::getBufAheadSec(), ESInfo::getDuration(), Statistics::getFirstPacketTime(), Statistics::getFirstPacketTS(), Statistics::getHighestPacketTS(), ESInfo::getMediaTimeScale(), ESInfo::getNumberOfMediaSamples(), Statistics::getPrefetchedSecs(), Statistics::getStillToPrefetchSecs(), Frame::getType(), ESInfo::isAudioStream(), ESInfo::isVisualStream(), popPreQ(), Frame::setAU(), Statistics::setBufAheadSec(), ESInfo::setDecoderConfig(), Frame::setMediaTimeScale(), Statistics::setPrefetchedSecs(), GlobalTimer::setPrefetching(), IO::setState(), and Statistics::setStillToPrefetchSecs().

00770 { 00771 struct AU *au=NULL; 00772 NetPacket_t *p=NULL; 00773 u8 *buf=NULL; 00774 long buf_siz=0; 00775 bool lastFrag; 00776 bool first_frag=true; 00777 bool old_ts=false; 00778 00779 currentFrameNumber++; 00780 framesSeen++; 00781 00782 dprintf_full("Rtp::getFrame %i (TS %i) of still buffered %i frames (total %i of %i TS %li)... expecting rtp_seq %i (state %i)\n", 00783 IOFrameStat.numProcessedFrames, IOFrameStat.act_ts, preQ.num_full_frames, 00784 currentFrameNumber, 00785 es->getNumberOfMediaSamples(),(long)es->getDuration(), 00786 IOFrameStat.seqno+1,state); 00787 00788 //PRE-buffer some frames 00789 struct timeval tv; 00790 double start_dtime, now_dtime=0,fetch_time=0; 00791 00792 IO::State oldState; 00793 00794 00795 gettimeofday(&tv,0); 00796 start_dtime = now_dtime = tv.tv_sec + (double)tv.tv_usec / 1000000.0; 00797 00798 if (!preQ.needsPrefetching && (stats->getBufAheadSec() <= RTP_BUF_PREFETCH) 00799 && ((state == OPEN) || (state == MUTED)) ) { 00800 dprintf_full("Rtp::getFrame: further PREFETCHING needed to fill empty buffers!\n"); 00801 preQ.needsPrefetching=true; 00802 stats->setStillToPrefetchSecs( stats->getStillToPrefetchSecs() + RTP_PREFETCH_SECS); 00803 flushBuffer(0,-1); //clean up all packets, since this will lead to a DataChannel Pause() 00804 } 00805 00806 // PREFETCHING ########################################## 00807 if (preQ.needsPrefetching) { //initial or addtional prefetch is needed... 00808 bool done=false; 00809 oldState = state; 00810 setState(PREFETCHING); 00811 if (globalTimer) 00812 globalTimer->setPrefetching(true); 00813 fetch_time=0; 00814 do { 00815 //dprintf_full("...prefetching: left %f done %i needsPrefetch %i....state %i\n", 00816 // stats->getStillToPrefetchSecs(), done,preQ.needsPrefetching,state); 00817 msleep(10); 00818 gettimeofday(&tv,0); 00819 now_dtime = tv.tv_sec + (double)tv.tv_usec / 1000000.0; 00820 if (state == STREAMERR) { 00821 dprintf_err("Rtp::getFrame() STREAMERR occured during prefetching, returning NULL\n"); 00822 if (globalTimer) 00823 globalTimer->setPrefetching(false); 00824 return NULL; 00825 } 00826 if ((state != PREFETCHING) && (state != PAUSED)) { 00827 dprintf_full("Rtp::getFrame: continued PREFETCHING requested! left %f state %i\n", 00828 stats->getStillToPrefetchSecs(),state); 00829 //preQ.needsPrefetching=false; 00830 done=false; 00831 setState(PREFETCHING); // continue with prefetching... 00832 } 00833 if (firstFrame) { 00834 double fpt = stats->getFirstPacketTime(); 00835 double gstps = stats->getStillToPrefetchSecs(); 00836 if ( (fpt != 0) && //only take from real first frame 00837 (now_dtime - fpt >= gstps) ) { 00838 fetch_time = now_dtime - fpt; 00839 done=true; 00840 } 00841 } else { //intermediate prefetching 00842 //minor FIXME: pausing during intermediate Prefetching will shorten prefetching 00843 if (now_dtime - start_dtime >= stats->getStillToPrefetchSecs()) { 00844 fetch_time = now_dtime - start_dtime; 00845 done=true; 00846 } 00847 } 00848 } while ( !done && ((state == PREFETCHING) || (state == OPEN)|| 00849 (state == MUTED)|| (state == PAUSED)) ); 00850 00851 if (state == PREFETCHING) 00852 setState(oldState); 00853 stats->setPrefetchedSecs(stats->getPrefetchedSecs() + fetch_time); //fine tune spent time 00854 stats->setStillToPrefetchSecs(0); 00855 stats->setBufAheadSec(((double)(stats->getHighestPacketTS()-stats->getFirstPacketTS()) / (double)es->getMediaTimeScale())); 00856 preQ.needsPrefetching=false; 00857 dprintf_full("Rtp::getFrame: prefetching of %2.3f secs (%2.3f real data recv secs)" 00858 " brought us %2.3f secs of %s! state %i, prefetched_secs: %f\n", 00859 now_dtime - start_dtime, fetch_time, 00860 stats->getBufAheadSec(), es->isVisualStream()?"Video":"Audio",state, 00861 stats->getPrefetchedSecs()); 00862 if (globalTimer) 00863 globalTimer->setPrefetching(false); 00864 } 00865 00866 // MERGE FRAME FROM PACKETS ############################### 00867 do { 00868 p=popPreQ(); 00869 if(p==NULL) //this is the end.... my friend... 00870 return NULL; 00871 if (p==(Rtp::NetPacket_t *)-1) //FIXME: after unpause, the Q was empty! this will force PREFETCHING 00872 return this->getFrame(); 00873 00874 dprintf_full("Rtp::getFrame popped rtp_seq %i, TS %i, last_frag %i size %i\n", 00875 p->rtp_seq,p->rtp_ts,p->last_frag,p->pl_size); 00876 00877 if (IOFrameStat.seqno == -1) // init the rtp sequence 00878 IOFrameStat.seqno = p->rtp_seq; 00879 else 00880 IOFrameStat.seqno++; 00881 00882 if (first_frag) { //first AU fragment 00883 if ( IOFrameStat.act_ts > (int)p->rtp_ts) { 00884 dprintf_full("DELETING left overs from previous frames\n"); 00885 old_ts = true; //left overs from previous frames 00886 } //else 00887 //IOFrameStat.act_ts = p->rtp_ts; 00888 first_frag = false; 00889 } 00890 00891 if (old_ts || ((unsigned)IOFrameStat.seqno != p->rtp_seq)) { //old fragments or incomplete AU 00892 dprintf_err("Rtp::getFrame expected rtp_seq %i, I got %i\n",IOFrameStat.seqno, p->rtp_seq); 00893 00894 00895 //drop all other following fragments of incomplete AU 00896 //ATTN: this will read one more pkt from the NEXT frame and has to push it back! 00897 while ((IOFrameStat.act_ts > (int)p->rtp_ts) 00898 || (!old_ts && (IOFrameStat.act_ts == (int)p->rtp_ts)) ) { 00899 dprintf_full("Rtp::getFrame popped for dropping: rtp_seq %i, TS %i, last_frag %i\n", 00900 p->rtp_seq,p->rtp_ts,p->last_frag); 00901 delete p->payload; 00902 p->payload = NULL; 00903 delete p; 00904 p = NULL; 00905 p=popPreQ(); 00906 if(p==NULL) 00907 return NULL; //this is the end.... my friend... 00908 if (p==(Rtp::NetPacket_t *)-1) //FIXME: after unpause, the Q was empty! this will force PREFETCHING 00909 return this->getFrame(); 00910 } 00911 00912 if ((unsigned)IOFrameStat.seqno != p->rtp_seq) { 00913 //pushback to preQ 00914 dprintf_err("LOSS: Not all fragments of TS %i where available\n",IOFrameStat.act_ts); 00915 addToPreQ(p,Q_TS,RTP_REINSERT); 00916 IOFrameStat.seqno = p->rtp_seq - 1; //restart seq_no from here 00917 if(buf) { 00918 delete[] buf; 00919 buf=NULL; 00920 } 00921 //try to get the next frame 00922 return this->getFrame(); 00923 } 00924 } 00925 00926 dprintf_full("Rtp::getFrame CONT'd with popped rtp_seq %i, TS %i, last_frag %i\n", 00927 p->rtp_seq,p->rtp_ts,p->last_frag); 00928 IOFrameStat.act_ts = p->rtp_ts; 00929 lastFrag = p->last_frag; 00930 if(es->isAudioStream()) { //RFC2250 specific headers need to be removed 00931 //the header info is not utilized currently 00932 p->pl_size -= 4; 00933 buf=(u8*)realloc(buf, buf_siz + p->pl_size); 00934 memcpy(buf+buf_siz, p->payload + 4, p->pl_size); 00935 } 00936 else { 00937 buf=(u8*)realloc(buf,buf_siz + p->pl_size); 00938 memcpy(buf+buf_siz,p->payload,p->pl_size); 00939 } 00940 buf_siz+=p->pl_size; 00941 delete p->payload; 00942 p->payload=NULL; 00943 delete p; 00944 p=NULL; 00945 } while (lastFrag == 0); 00946 00947 dprintf_full("Rtp::getFrame DONE TS %i\n", IOFrameStat.act_ts); 00948 00949 00950 //IF VIDEO ################################################3 00951 if (es->isVisualStream()) 00952 if ((buf[3]==0xb0) && MP4Decoder::isValidDecoderConfig(buf)) { 00953 u8* tmp=buf+4; 00954 bool stop=false; 00955 // received a header! 00956 dprintf_full("Rtp::getFrame received header\n"); 00957 // some servers, like the ITECMP4Server send header+payload in one packet 00958 // catch this case 00959 while(!stop && tmp < (buf+buf_siz) && 00960 (tmp = (u8*)memchr(tmp, 0xb6, (buf+buf_siz)-tmp)) ) { 00961 if(tmp[-1] == 1 && tmp[-2] == 0 && tmp[-3] == 0) { 00962 stop=true; 00963 tmp-=3; 00964 } else 00965 tmp++; //avoid double findings 00966 } 00967 if (tmp) 00968 es->setDecoderConfig(buf,tmp-buf); 00969 else 00970 es->setDecoderConfig(buf,buf_siz); 00971 00972 00973 if(stop) { 00974 dprintf_full("Rtp::getFrame compound header %x %x %x %x (%i bytes) + frame (%li bytes) %x %x %x %x!\n" 00975 ,buf[0],buf[1],buf[2],buf[3], tmp-buf, buf_siz-(tmp-buf), 00976 tmp[0],tmp[1],tmp[2],tmp[3]); 00977 //fix everything up for final CompressedVideoFrame() creation 00978 buf_siz -= tmp-buf; 00979 u8* new_buf = new u8[buf_siz]; 00980 memcpy(new_buf,tmp,buf_siz); 00981 delete[] buf; 00982 buf = new_buf; 00983 } else { 00984 delete[] buf; buf=NULL; 00985 return this->getFrame(); //get the real frame 00986 } 00987 } else if ( !MP4Decoder::isValidVisualFrame(buf)) { 00988 dprintf_err("Rtp::getFrame LOSS: Not all frags of TS %i avail (broke header/missing first packet)\n", 00989 IOFrameStat.act_ts); 00990 delete[] buf; 00991 buf=NULL; 00992 //try to get the next frame 00993 return this->getFrame(); 00994 } 00995 00996 //IF AUDIO ################################################3 00997 if (es->isAudioStream()) 00998 if (MP4audioDecoder::isValidAudioFrame(buf)) { 00999 dprintf_full("Rtp::getFrame received audio frame header\n"); 01000 //Every audio frame consists of a 4 byte header + payload. These together comprise 01001 //the payload of the access unit. 01002 if(buf_siz == MPEG_AUDIO_HEADER_SIZE) { 01003 es->setDecoderConfig(buf,buf_siz); 01004 delete[] buf; buf=NULL; 01005 return this->getFrame(); 01006 } 01007 else { 01008 //FIXME: Decoder configuration may be required to be set for VBR audio for every frame!!! 01009 if (firstFrame) 01010 this->es->setDecoderConfig(buf,4); //"4" is for mpeg audio only ..... may be diff for aac 01011 } 01012 } else {// not valid audio 01013 dprintf_err("Rtp::getFrame LOSS: Not all audio frags of TS %i avail (broke header/missing first packet)\n", 01014 IOFrameStat.act_ts); 01015 delete[] buf; buf=NULL; 01016 //try to get the next frame 01017 return this->getFrame(); 01018 } 01019 01020 //GENERAL FULL FRAME CREATION ####################### 01021 IOFrameStat.numProcessedFrames++; 01022 01023 //fill the merged access unit 01024 au = new AU(); 01025 au->payload = buf; 01026 au->size = buf_siz; 01027 au->cts = au->dts = (u32) IOFrameStat.act_ts + (u32) ts_offset; 01028 Frame* f=NULL; 01029 if(es->isVisualStream()) 01030 f = new CompressedVideoFrame(Frame::NN_VOP, ((VideoESInfo*)es)->getWidth(), 01031 ((VideoESInfo*)es)->getHeight()); 01032 else if(es->isAudioStream()) 01033 f = new CompressedAudioFrame(Frame::NN_VOP); 01034 else 01035 f = new CompressedVideoFrame(Frame::NN_VOP, 0, 0); 01036 01037 f->setAU(au); //FrameType is set here automatically 01038 f->setMediaTimeScale(es->getMediaTimeScale()); //used by GlobalTimer Adaptor 01039 01040 dprintf_full("Rtp::getFrame got size %8i frameType %s TS %i presTime: %2.3f\n",f->getAU()->size, 01041 Frame::VOPTypeToChar(f->getType()),IOFrameStat.act_ts, 01042 (double)IOFrameStat.act_ts / es->getMediaTimeScale()); 01043 01044 if (firstFrame) { 01045 // not a valid ESHeader as first packet? 01046 // do we have one from the RTSP? 01047 if(this->es->getEncodedDecoderConfig()==NULL) { 01048 dprintf_err("FATAL: no header found!! was: 0x%x%x%x%x\n", 01049 buf[0],buf[1],buf[2],buf[3]); 01050 setState(STREAMERR); 01051 delete[] buf; buf=NULL; 01052 delete f; f=NULL; 01053 delete au; au=NULL; 01054 close(true); //immediate 01055 return NULL; //this is the end... my friend... 01056 } 01057 firstFrame = false; 01058 } 01059 #ifdef _POSIX_PRIORITY_SCHEDULING 01060 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 01061 perror("Rtp::writeFrame YIELD"); 01062 } 01063 #else 01064 // for windows pthread lib! 01065 #if (defined WIN32) && (!defined WINCE) 01066 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 01067 dprintf_full("Rtp::writeFrame YIELD"); 01068 } 01069 #endif 01070 #endif 01071 01072 return f; 01073 }

int Rtp::getNextPreQ_TS  )  [protected]
 

Returns:
the timestamp of the latest packet in the preQ (the one that would be popped next) or -1 if no packets where in queue
Definition at line 2014 of file Rtp.cpp.
02014 { 02015 int ts; 02016 02017 preQ.MUTEX_Q.lock(); 02018 if (preQ.Q_tail != NULL) 02019 ts = preQ.Q_tail->rtp_ts; 02020 else 02021 ts = -1; 02022 preQ.MUTEX_Q.release(); 02023 02024 return ts; 02025 }

bool Rtp::open  )  [virtual]
 

opens the IO connection.

State is set to OPENING. Depending on the underlying QIODevice, a network connection or a file connection is established. When the connection is ready for use, State is OPEN and the run thread is started.

Implements IO.

Definition at line 268 of file Rtp.cpp.

References ESInfo::getAvgBandwidth(), Statistics::getClientPreQMaxSize(), PortGenerator::getSockFromPort(), Statistics::setClientPreQMaxSize(), and IO::setState().

00268 { 00269 00270 dprintf_full("Rtp::open Rtp is %s, state is %i\n",(writeToNetwork) ? "write" : "read",state); 00271 if (state == OPEN) 00272 return true; 00273 setState(OPENING); 00274 00275 /* 00276 setRoundRobinScheduling(1); 00277 */ 00278 00279 currentFrameNumber=0; 00280 endFrameNumber=-0; 00281 IOFrameStat.numProcessedFrames=0; 00282 IOFrameStat.seqno = -1; 00283 IOFrameStat.act_ts = -1; //impossibly first CTS 00284 firstFrame=true; 00285 framesSeen = 0; 00286 if (remoteAddress && !session) { 00287 dprintf_full("Rtp::open init RTP channel ports %i <--> %i\n",localPort,remotePort); 00288 int p1, p2; 00289 p1=portGen.getSockFromPort(localPort); 00290 p2=portGen.getSockFromPort(localPort+1); 00291 dprintf_full("PortGenerator::dome\n"); fflush(stdout); 00292 session = ucl_rtp_init(remoteAddress, 00293 localPort, 00294 remotePort, 00295 TTL, 100, Rtp::RtpCallback, (uint8_t *) this, 00296 p1,p2); 00297 } 00298 dprintf_full("PortGenerator::\n"); fflush(stdout); 00299 if (session) { 00300 rtp_set_option(session, RTP_OPT_WEAK_VALIDATION, FALSE); 00301 rtp_set_option(session, RTP_OPT_PROMISC, TRUE); 00302 00303 #ifndef WITHOUT_RTP_EXT 00304 if (!writeToNetwork) { // DataSink 00305 if (rtx) { 00306 dprintf_full("Rtp::open enabled to recv/send feedback!\n"); 00307 feedbackEnabled = true; 00308 //should we send feedback? enable it at UCL lib! 00309 if ((rtx->state == ACK) || (rtx->state == NACKACK)) 00310 rtp_set_option(session, RTCP_OPT_FB_ACK, TRUE); 00311 if ((rtx->state == NACK) || (rtx->state == NACKACK)) 00312 rtp_set_option(session, RTCP_OPT_FB_NACK, TRUE); 00313 } 00314 } else 00315 feedbackEnabled = true; //server will always accept extended feedback 00316 #endif 00317 00318 if (rtx) { 00319 if (rtx->remotePort ==0) { 00320 dprintf_full("Rtp::open init RTP RTX channel port is ZERO! RTX disabled! still capable of recv/send feedback!\n"); 00321 } else { 00322 #ifndef WITHOUT_RTP_EXT 00323 dprintf_full("Rtp::open init RTP RTX channel ports %i <--> %i\n",rtx->localPort,rtx->remotePort); 00324 rtxEnabled = true; 00325 rtx->session = ucl_rtp_init(remoteAddress, 00326 rtx->localPort, 00327 rtx->remotePort, 00328 TTL, 100, Rtp::RtpCallback, (uint8_t *) this, 00329 portGen.getSockFromPort(rtx->localPort), 00330 portGen.getSockFromPort(rtx->localPort+1)); 00331 if (!rtx->session) { 00332 setState(STREAMERR); 00333 return false; 00334 } 00335 rtp_set_option(rtx->session, RTP_OPT_WEAK_VALIDATION, FALSE); 00336 rtp_set_option(rtx->session, RTP_OPT_PROMISC, TRUE); 00337 if (!writeToNetwork) { // DataSink 00338 //should we send rtx feedback? 00339 if ((rtx->state == ACK) || (rtx->state == NACKACK)) 00340 rtp_set_option(rtx->session, RTCP_OPT_FB_ACK, TRUE); 00341 if ((rtx->state == NACK) || (rtx->state == NACKACK)) 00342 rtp_set_option(rtx->session, RTCP_OPT_FB_NACK, TRUE); 00343 } 00344 #endif 00345 } 00346 }//if rtx 00347 00348 00349 setState(OPEN); 00350 firstFrame=true; 00351 // FIXME: on server-side this should be recv'd via RTSP/SDP! 00352 stats->setClientPreQMaxSize((RTP_CLIENT_PreQ_MAX_SECS+1) * 00353 (int) ((float)(es->getAvgBandwidth() / 8) * (1+RTP_BUF_FILL_FACT) ) ); 00354 00355 if (writeToNetwork) //server has larger Q 00356 preQ.max_size = RTP_SERVER_PreQ_MAX; 00357 else 00358 preQ.max_size = stats->getClientPreQMaxSize(); 00359 00360 preQ.size = preQ.num_elems = preQ.num_full_frames = 0; 00361 preQ.avg_bw = 0; 00362 // preQ.avg_bw = es->getAvgBandwidth(); 00363 preQ.highest_rtp_seq=0; 00364 preQ.highest_ts=0; 00365 00366 dprintf_full("Rtp::open (ports %i:%i) is starting the according RTP thread\n",localPort,remotePort); 00367 this->start(); 00368 return true; 00369 } else { 00370 setState(STREAMERR); 00371 return false; 00372 } 00373 }

Rtp::NetPacket_t * Rtp::popPreQ bool  blocking = true  )  [protected]
 

block or get the first packet off the preQ this does neither free the packet, nor the packet payload!

Parameters:
blocking if false, popPreQ might return with a NULL NetPacket
Definition at line 2028 of file Rtp.cpp.

References ESInfo::getMediaTimeScale(), and ESInfo::getVOPTimeIncrement().

Referenced by close(), and getFrame().

02028 { 02029 NetPacket_t *p=NULL; 02030 int counter = 0; 02031 02032 //dprintf_full("Rtp::popPreQ %s\n",blocking?"blocking":"non-blocking"); 02033 02034 //FIXME: WHY !firstframe?? if (!firstFrame && blocking && (state != STREAMEOF)) 02035 if (blocking && (state != STREAMEOF)) 02036 preQ.MUTEX_Q_DATA_AVAIL.lock(); //wait for data 02037 //dprintf_full("Rtp::popPreQ data avail\n"); 02038 02039 // preQ.MUTEX_Q.lock(); 02040 if ((state == FORCE_CLOSING) || (state == CLOSED)) { 02041 // preQ.MUTEX_Q.release(); 02042 if (writeToNetwork) 02043 preQ.MUTEX_Q_DONE.release(); 02044 dprintf_full("Rtp::popPreQ is quitting because of %s\n",(state==CLOSED?"CLOSED":"FORCE_CLOSING")); 02045 return NULL; 02046 } 02047 // preQ.MUTEX_Q.release(); 02048 02049 02050 preQ.MUTEX_Q.lock(); 02051 if ((state!=OPEN) && blocking && (preQ.Q == NULL)) { // if other thread is closing... 02052 dprintf_full("Rtp::popPreQ was blocking, but bailing out! empty Q and state %i!\n",state); 02053 preQ.MUTEX_Q.release(); 02054 return NULL; 02055 } 02056 if (preQ.Q == NULL) { 02057 dprintf_full("erroneously recv'd preQ.MUTEX_Q unlock.... no data after unpausing and state %i\n",state); 02058 preQ.MUTEX_Q.release(); 02059 return (Rtp::NetPacket_t *)-1; //FIXME: THIS IS A HACK!!!! 02060 } 02061 02062 while (preQ.Q == NULL) { 02063 dprintf_full("Rtp::popPreQ has empty Q! (%ith try)!\n",counter); 02064 msleep(100); 02065 if (counter++ > 10) { 02066 preQ.MUTEX_Q.release(); 02067 dprintf_full("Rtp::popPreQ is quitting because of empty Q and state %i!\n",state); 02068 if (firstFrame) //this is needed for a special error case 02069 firstFrame=false; //during prefetching! 02070 return NULL; 02071 } 02072 } 02073 p=preQ.Q; 02074 preQ.Q = p->next; 02075 p->next = NULL; 02076 preQ.size -= p->pl_size; 02077 preQ.num_elems--; 02078 if ( (p->last_frag) && (p->resent_counter == 0) ) 02079 preQ.num_full_frames--; 02080 if (preQ.num_elems == 0) { 02081 // assert(preQ.num_full_frames == 0); 02082 preQ.Q_tail = NULL; 02083 preQ.avg_bw = 0; 02084 } else 02085 preQ.avg_bw = (int)(preQ.size/preQ.num_elems * ((double)es->getMediaTimeScale())/((double)es->getVOPTimeIncrement())); 02086 02087 preQ.MUTEX_Q_FULL.release(); 02088 if (preQ.Q != NULL) { //still data available 02089 preQ.MUTEX_Q_DATA_AVAIL.release(); 02090 } 02091 preQ.MUTEX_Q.release(); 02092 return p; 02093 }

void Rtp::preQupdate  )  [protected]
 

scans the preQ for missing packets and requests their retransmission since in the client, not frame type is known, EVERY missing packet is reqeusted...

Definition at line 1671 of file Rtp.cpp.

References Statistics::getHighestPacketTS().

01671 { 01672 Rtp::NetPacket_t *p=NULL; 01673 int act_seq = -1; 01674 int prev_seq = -1; 01675 int ret; 01676 01677 if (!rtxEnabled) { 01678 dprintf_full("Rtp::preQupdate: no rtxEnabled... ignoring\n"); 01679 return; 01680 } 01681 01682 dprintf_full("Rtp::preQupdate\n"); 01683 preQ.MUTEX_Q.lock(); 01684 p = preQ.Q; 01685 while (p != NULL) { 01686 act_seq=p->rtp_seq; 01687 // dprintf_full("Rtp::preQupdate checks on %i ts %i (prev %i)\n",act_seq,p->rtp_ts,prev_seq); 01688 if (prev_seq != -1) { 01689 while (++prev_seq < act_seq) { //missing packet 01690 dprintf_full("Rtp::preQupdate: requesting %i (remaining: %i, final will be %i), sess %p ssrc %x\n", 01691 prev_seq, act_seq-1-prev_seq, act_seq-1, session, sender_ssrc); 01692 ret=rtp_send_explicit_nack(session, sender_ssrc, prev_seq, stats->getHighestPacketTS(), NULL); 01693 assert(act_seq > prev_seq); 01694 //assert(act_seq-1 - prev_seq < 30 && "YOUR SYSTEM IS TOO SLOW! LOST TOO MANY PACKETS IN A ROW!!!"); 01695 if (ret == -1) { 01696 dprintf_err("Rtp::preQupdate ERROR! rtp session wrong!!!!\n"); 01697 break; 01698 } 01699 } 01700 } 01701 prev_seq = act_seq; 01702 p = p->next; 01703 } 01704 preQ.MUTEX_Q.release(); 01705 }

void Rtp::run  )  [virtual]
 

this thread works on the read buffer, using the estimated avail.

bandwidth for send out rate

Reimplemented from IO.

Definition at line 2096 of file Rtp.cpp.

References VThread::setRoundRobinScheduling().

02096 { 02097 dprintf_full("Rtp::run (ports %i:%i) is starting the according RTP thread\n",localPort,remotePort); 02098 02099 #ifndef QT_THREAD_SUPPORT 02100 int ret = setRoundRobinScheduling(2); 02101 if (ret == 0) { 02102 dprintf_full("Rtp::sendThread switched to RT-RR SCHEDULING!\n"); 02103 } else { 02104 dprintf_full("Rtp::sendThread couldnt change to RT-RR SCHEDULING! (got root?) : err %i\n",ret); 02105 } 02106 #endif 02107 02108 if (writeToNetwork) //Sender 02109 sendThread(); 02110 else 02111 readThread(); 02112 }

void Rtp::sendQupdate long  streamout_sec  )  [protected]
 

scans the SendQ for already ack'd or out-dated frames and releases them.

Parameters:
streamout_sec determines the actual time, or if set to -1, every frame in the SendQ will be dropped
Definition at line 1614 of file Rtp.cpp.

References ESInfo::getMediaTimeScale(), and Statistics::getPrefetchedSecs().

Referenced by close().

01614 { 01615 Rtp::NetPacket_t *p=NULL, *pprev=NULL; 01616 int slot; 01617 long ssec_ticks, threshold; 01618 01619 ssec_ticks = es->getMediaTimeScale() * streamout_sec; 01620 threshold = (long)(ssec_ticks - ((u64)ceil(stats->getPrefetchedSecs()) * es->getMediaTimeScale())); 01621 01622 if ((streamout_sec >= 0) && (threshold < 0)) { //not "flushAll" and initial prefetching 01623 dprintf_full("Rtp::sendQupdate(ssec %li): ignoring since initial prefetching (threshold TS %li)\n", streamout_sec,threshold); 01624 return; 01625 } 01626 01627 dprintf_full("Rtp::sendQupdate(ssec %li): drop all older than TS %li\n",streamout_sec, threshold); 01628 01629 sendQ.MUTEX_Q.lock(); 01630 for (slot=0; slot<RTP_SENDQ_PRIME;slot++) { 01631 p = sendQ.Q[slot]; 01632 pprev = NULL; 01633 while (p != NULL) { 01634 if ( (streamout_sec == -1) || //-1 == drop all 01635 (p->rtp_ts + (u64)ceil(stats->getPrefetchedSecs()) * es->getMediaTimeScale() 01636 < (unsigned)ssec_ticks) || 01637 (p->acked) ) { 01638 //dprintf_full("NOW TS: %li Frame seq %i ts %i:%i is outdated or acked... now dropped\n", 01639 // streamout_sec*es->getMediaTimeScale(),p->rtp_seq, p->rtp_ts,p->frag_no); 01640 sendQ.size -= p->pl_size; 01641 sendQ.num_elems--; 01642 01643 if (pprev) 01644 pprev->next = p->next; 01645 else 01646 sendQ.Q[slot]=p->next; 01647 01648 assert(p->payload != NULL); 01649 delete p->payload; 01650 p->payload=NULL; 01651 delete p;p=NULL; 01652 01653 if (pprev) 01654 p = pprev; //to continue while 01655 else 01656 p = sendQ.Q[slot]; 01657 } 01658 01659 if (sendQ.Q[slot] == NULL) 01660 break; 01661 01662 pprev = p; 01663 p = p->next; 01664 } 01665 } 01666 sendQ.MUTEX_Q.release(); 01667 }

bool Rtp::setEndFrameNumber u32  frameNumber  )  [virtual]
 

sets the last frame that should be sent.

Allows to specify in combination with setToFrameNumber a range of frames that should be sent. Set

Parameters:
stopNumber to 0, to specify the last frame.
Returns:
true on success

Reimplemented from IO.

Definition at line 448 of file Rtp.cpp.

References flushBuffer(), and ESInfo::getVOPTimeIncrement().

00448 { 00449 long incr = es->getVOPTimeIncrement(); 00450 00451 if (endFrameNumber == 0) 00452 flushBuffer(frameNumber * incr, -1); 00453 else 00454 flushBuffer(frameNumber * incr, endFrameNumber * incr); 00455 00456 endFrameNumber=frameNumber; 00457 return true; 00458 }

void Rtp::setRtxInfo const rtx_info *  rtx  )  [virtual]
 

sets a shallow copy of

Parameters:
rtx at the ESInfo object

Reimplemented from IO.

Definition at line 2853 of file Rtp.cpp.

02853 { 02854 assert(rt); 02855 dprintf_full("Rtp::setRtxInfo \n"); 02856 if(rtx) { 02857 delete rtx; 02858 } 02859 rtx= new rtx_info(rt); 02860 };

bool Rtp::setToFrameNumber u32  frameNumber  )  [virtual]
 

repositions the IO class to the given frame.

Will return false in the following cases:

  • an illegal frame number (too large) was specified
  • the stream is not seekable, because the underlying device is a network device and the requested frame is already out of range.
  • Seeking doesn't work on IO classes opened for writing

Reimplemented from IO.

Definition at line 461 of file Rtp.cpp.

References flushBuffer().

00461 { 00462 00463 flushBuffer(0, -1); 00464 currentFrameNumber=frameNumber; 00465 00466 return true; 00467 }

void Rtp::ucl_update u32  ts,
u32  timeout
[protected]
 

calls UCL lib for RTP,RTCP updates and callback functions (recv data)

Parameters:
ts is the current timestamp
timeout: if zero, non-blocking, else in msecs
Definition at line 1077 of file Rtp.cpp.
01077 { 01078 struct timeval tv; 01079 01080 dprintf_full("Rtp::ucl_update TS %i\n",ts); 01081 if (session) { 01082 rtp_update(session); 01083 rtp_send_ctrl(session, ts, NULL, RTCP_NORMAL); 01084 /* Receive control and data packets */ 01085 tv.tv_sec = 0; 01086 tv.tv_usec = timeout*1000; 01087 if (rtxEnabled) 01088 tv.tv_usec = tv.tv_usec >> 1; //wait half of the time on rtp 01089 assert(session); 01090 rtp_recv(session, &tv, ts); 01091 01092 if (rtxEnabled) { 01093 // dprintf_full("Rtp::ucl_update on RTX at TS %i\n",ts); 01094 rtp_send_ctrl(rtx->session, ts, NULL, RTCP_NORMAL); 01095 /* eventually receive NACK packets again */ 01096 tv.tv_sec = 0; 01097 tv.tv_usec = timeout*1000 >> 1; //wait other half of the time on rtcp 01098 assert(rtx->session); 01099 rtp_recv(rtx->session, &tv, ts); 01100 /* do maintenance */ 01101 rtp_update(rtx->session); 01102 } 01103 bool ret; 01104 int rounds=0; 01105 do { 01106 tv.tv_sec = 0; 01107 tv.tv_usec = 1; //wait nearly no time 01108 ret=rtp_recv(session, &tv, ts); //try to get as much data as possible 01109 if (rtxEnabled) { 01110 tv.tv_sec = 0; 01111 tv.tv_usec = 1; //wait nearly no time 01112 ret=rtp_recv(rtx->session, &tv, ts); 01113 } 01114 rounds++; 01115 } while(session && ret && (rounds < 10)); //continue as long as data or rtx avail, or 10 pks where read in a row 01116 } 01117 }

int Rtp::writeFrame Frame frm,
ESInfo out_es = NULL
[virtual]
 

just adds a frame to the send buffer.

sending per se is handled in the run thread. If the send buffer is full, writeFrame will block.

Parameters:
frm 
Returns:
the number of packets delivered. Returns 0 on error

Implements IO.

Definition at line 552 of file Rtp.cpp.

References PacketizationLayer::createSLPacketList(), Frame::getAU(), ESInfo::getHeaders(), Frame::getType(), ESInfo::getVOPTimeIncrement(), and insertToPreQ().

00552 { 00553 bool result = true; 00554 SLPacketList slpl; 00555 AU au; 00556 u8* newPayload=NULL; 00557 bool last = false; 00558 int counter = 0; 00559 00560 if (!writeToNetwork || state != OPEN || !pLayer || !frm) 00561 return 0; 00562 00563 dprintf_full("Rtp::writeFrame CTS %i prio %i size %i\n",frm->getAU()->cts, frm->getAU()->prio, frm->getAU()->size); 00564 00565 if (endFrameNumber>0 && frm->getAU()->cts / es->getVOPTimeIncrement() > endFrameNumber) { 00566 //this comes from unflushed DC senqueue, and is out of range, so drop it! 00567 dprintf_full("Rtp::writeFrame nr %i TS %i exceeds endFrameNumber %i, ignoring!\n",frm->getAU()->cts / es->getVOPTimeIncrement(),frm->getAU()->cts, endFrameNumber ); 00568 return 0; 00569 } 00570 00571 //HEADER DATA 00572 if ( (!firstFrame && resendFrameHeader ) 00573 || (firstFrame && !MP4Decoder::isValidDecoderConfig(frm->getAU()->payload)) ) { //HEADER already in first frame 00574 00575 resendFrameHeader=false; 00576 firstFrame=false; 00577 00578 //send extra header! 00579 u8* esHeaderBuffer=NULL; 00580 u32 headerSize=es->getHeaders(&esHeaderBuffer); 00581 if(headerSize>0) { 00582 newPayload=new u8[headerSize + frm->getAU()->size]; 00583 memcpy(newPayload, esHeaderBuffer, headerSize); 00584 00585 au.payload=newPayload; 00586 au.size=headerSize; 00587 au.cts=frm->getAU()->cts; 00588 au.dts=frm->getAU()->dts; 00589 au.duration=0; 00590 slpl=pLayer->createSLPacketList(&au); 00591 if(!slpl.empty()) { 00592 SLPacketList::iterator li = slpl.begin(); 00593 last = false; 00594 counter = 0; 00595 00596 dprintf_full("Rtp::writeFrame: HEADER (size %i) first 4 bytes of first Packet, CTS %u: %x %x %x %x\r\n", 00597 headerSize, frm->getAU()->cts, (*li)->getPayload()[0],(*li)->getPayload()[1], 00598 (*li)->getPayload()[2],(*li)->getPayload()[3]); 00599 00600 while(!slpl.empty()) { 00601 if(slpl.size()==1) { 00602 last=true; 00603 } 00604 li = slpl.begin(); 00605 result &= insertToPreQ((*li)->getPayload(), (*li)->getPayloadSize(), 00606 frm->getType(), au.size, IO_NETWORK_HIGHEST_PRIORITY, 00607 frm->getAU()->dts, counter, last, 0,Rtp::Q_TS); 00608 (*li)->unsetPayload(); //payload will be freed when ACKd 00609 delete *li; // here, there is no payload to free any more... 00610 (*li)=NULL; 00611 slpl.pop_front(); 00612 counter++; 00613 } 00614 } 00615 } 00616 if(esHeaderBuffer) { 00617 delete[] esHeaderBuffer; 00618 esHeaderBuffer=NULL; 00619 } 00620 } 00621 00622 //FRAME DATA 00623 slpl = pLayer->createSLPacketList(frm->getAU()); 00624 firstFrame=false; 00625 00626 if (slpl.empty()) { 00627 dprintf_full("Rtp::writeFrame() SLPacketList is empty! AU: size = %u, cts = %u\n", 00628 frm->getAU()->size, frm->getAU()->cts); 00629 return 1; //fake success, since the SLPacket is buffered and not yet full MTU reached... 00630 } 00631 SLPacketList::iterator li = slpl.begin(); 00632 00633 dprintf_full("Rtp::writeFrame: FRAME first 4 bytes of Frame, CTS %u: %x %x %x %x\r\n", 00634 frm->getAU()->cts, frm->getAU()->payload[0], 00635 frm->getAU()->payload[1], 00636 frm->getAU()->payload[2],frm->getAU()->payload[3]); 00637 last=false; 00638 //counter stays to old value (might be with header) 00639 00640 while(!slpl.empty()) { 00641 if(slpl.size()==1) { 00642 last=true; 00643 } 00644 li = slpl.begin(); 00645 result &= insertToPreQ((*li)->getPayload(), (*li)->getPayloadSize(), 00646 frm->getType(), frm->getAU()->size, frm->getAU()->prio, 00647 frm->getAU()->dts, counter, last, 0, Rtp::Q_TS); 00648 (*li)->unsetPayload(); //payload will be freed when ACKd 00649 delete *li; // here, there is no payload to free any more... 00650 (*li)=NULL; 00651 slpl.pop_front(); 00652 counter++; 00653 } 00654 00655 IOFrameStat.numProcessedFrames++; 00656 #ifdef _POSIX_PRIORITY_SCHEDULING 00657 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 00658 perror("Rtp::writeFrame YIELD"); 00659 } 00660 #else 00661 // for windows pthread lib! 00662 #if (defined WIN32) && (!defined WINCE) 00663 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 00664 dprintf_full("Rtp::writeFrame YIELD"); 00665 } 00666 #endif 00667 #endif 00668 return (int)result; 00669 }


The documentation for this class was generated from the following files: