Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members
SimpleRtp.cpp
00001 #ifndef WINCE 00002 00003 #include <sched.h> 00004 #include "SimpleRtp.hpp" 00005 #include "Frame.hpp" 00006 #include "global.hpp" 00007 #include "SimpleStatistics.hpp" 00008 #include "CompressedVideoFrame.hpp" 00009 #include "CompressedAudioFrame.hpp" 00010 #include "adaptors/MP4Decoder.hpp" 00011 #include "adaptors/MP4audioDecoder.hpp" 00012 00013 // for stupid MSVC 00014 #ifdef WIN32 00015 static const int TTL = 16; 00016 static const int MAX_PAYLOAD_SIZE = 65000; 00017 static const int RTP_PAYLOAD_TYPE = 96; 00018 static const int UPDATE_INTERVAL_MS=20; 00019 static const int MINI_PREBUFFER_INTERVALL=500; 00020 static const unsigned int MAX_NULL_FRAMES=5; 00021 #endif 00022 00023 /* only sleep if can sleep at least 5 ms */ 00024 #define SIMPLERTP_STAYAHEAD 5 00025 #define MAX_PAKETS_SENT_WITHOUT_SLEEP 5 00026 #define SIMPLERTP_MAX_MS_BEHIND 50 00027 #define SIMPLERTP_MAX_MS_AHEAD 10000 00028 00029 SimpleRtp::SimpleRtp(const char *uri, int clintprt, int locPort, const char *clintaddr, ESInfo * e, 00030 PacketizationLayer * p, bool w, 00031 const char* statsFileName, bool overwriteExistingStatFile, 00032 Delay delay, bool updateVopTimeIncrement,bool switching):IO(), 00033 clientPort(clintprt), writeOnly(w), updateESInfo(updateVopTimeIncrement), 00034 enableSwitching(switching), delayType(delay) 00035 { 00036 assert(clintaddr && uri && e && p); 00037 dprintf_full("SimpleRtp::SimpleRtp: clientAddress %s clientPort %i localPort %i writeOnly: %s\n", 00038 clintaddr,clintprt,locPort,w?"YES":"NO"); 00039 if(statsFileName) { 00040 // server runs in writeonly mode 00041 stats=new SimpleStatistics(statsFileName,e,w,overwriteExistingStatFile); 00042 } 00043 else 00044 stats=NULL; 00045 00046 url = new char[strlen(uri) + 1]; 00047 clientAddress = new char[strlen(clintaddr) + 1]; 00048 es = e; 00049 strcpy(clientAddress, clintaddr); 00050 strcpy(url, uri); 00051 session = NULL; 00052 pLayer = p; 00053 localPort = locPort; 00054 firstFrame=false; // must use open to set to true 00055 resendFrameHeader=true; 00056 ctsForFirstPlay=0; 00057 // buffer at most 20001 packets (a packet odyssey ;-) 00058 packetBuffer=new FastPointerHashTable<AU*>(20001); 00059 resetPlayTime=true; 00060 packetInsertedDuringLastCallback=false; 00061 prefetchTimeInMs=1000; 00062 rtpSeqNrStart=4294967295u; 00063 rtpSeqNrEnd=0; 00064 overflow=0; 00065 lastCTSReturned=0; 00066 nullFrameCounter=0; 00067 bytesTransferred=0; 00068 frameToStart = 0; 00069 lockPacketBuffer.initialize("SimpleRtp-lockPacketBuffer"); 00070 }; 00071 00072 SimpleRtp::~SimpleRtp() 00073 { 00074 if (clientAddress) { 00075 delete clientAddress; 00076 clientAddress = NULL; 00077 } 00078 00079 if (pLayer) { 00080 delete pLayer; 00081 pLayer = NULL; 00082 } 00083 if (state != CLOSING || state != CLOSED) { 00084 close(); 00085 } 00086 if(session) { 00087 rtp_send_bye(session); 00088 rtp_done(session); 00089 session=NULL; 00090 } // delete happens in rtp_done(..) 00091 lockPacketBuffer.lock(); 00092 if(packetBuffer){ 00093 delete packetBuffer; 00094 packetBuffer=NULL; 00095 } 00096 lockPacketBuffer.release(); 00097 if(url) 00098 delete url; 00099 lockPacketBuffer.destroy(); 00100 if(stats) { 00101 stats->teardown(); 00102 delete stats; 00103 } 00104 }; 00105 00106 bool SimpleRtp::open() 00107 { 00108 if (state == OPEN) 00109 return true; 00110 state = OPENING; 00111 sendPacketCounter=0; 00112 firstPacket=true; 00113 u32 avgPacketsPerSecond=es->getAvgBandwidth()/(8*this->pLayer->getMaxPacketPayloadSize()); 00114 u32 roundsPerSecond=avgPacketsPerSecond/MAX_PAKETS_SENT_WITHOUT_SLEEP; 00115 if(roundsPerSecond>0) { 00116 intermediateMiniSleep=1000/roundsPerSecond; 00117 } 00118 else 00119 intermediateMiniSleep=1; 00120 if(intermediateMiniSleep>5) 00121 intermediateMiniSleep=5; 00122 // init for read and write is the same 00123 if (clientAddress && !session) { 00124 session = ucl_rtp_init(clientAddress, 00125 localPort, 00126 clientPort, 00127 TTL, 100, SimpleRtp::RtpCallback, (uint8_t *) this, 00128 portGen.getSockFromPort(localPort), 00129 portGen.getSockFromPort(localPort+1)); 00130 00131 } 00132 resendFrameHeader=true; 00133 ctsForFirstPlay=0;nullFrameCounter=0; 00134 rtpSeqNrStart=4294967295u; 00135 rtpSeqNrEnd=0; 00136 overflow=0; 00137 resetPlayTime=true; 00138 00139 if (session) { 00140 state = OPEN; 00141 rtp_set_option(session, RTP_OPT_WEAK_VALIDATION, FALSE); 00142 rtp_set_option(session, RTP_OPT_PROMISC, TRUE); 00143 00144 firstFrame=true; 00145 update(); 00146 return true; 00147 } else { 00148 state = CLOSED; 00149 return false; 00150 } 00151 }; 00152 00153 /***************************************************************/ 00155 int SimpleRtp::writeFrame(Frame * frm, ESInfo *out_es) { 00156 bool result = true; 00157 00158 AU au; 00159 u8* newPayload=NULL; 00160 bool last = false; 00161 u8* currentPacketStart=NULL;; 00162 u8* lastPacketEnd=NULL; 00163 u32 nrBytes=0; 00164 u32 remainingBytes=0; 00165 00166 if (!writeOnly || state != OPEN || !pLayer || !frm) 00167 return 0; 00168 lastCTSReturned=frm->getAU()->cts; 00169 dprintf_full("SimpleRtp::writeFrame CTS %i prio %i size %i\n",frm->getAU()->cts, frm->getAU()->prio, frm->getAU()->size); 00170 00171 if (endFrameNumber>0 && frm->getAU()->cts / es->getVOPTimeIncrement() > endFrameNumber) { 00172 //this comes from unflushed DC senqueue, and is out of range, so drop it! 00173 dprintf_full("SimpleRtp::writeFrame nr %i TS %i exceeds endFrameNumber %i, ignoring!\n",frm->getAU()->cts / es->getVOPTimeIncrement(),frm->getAU()->cts, endFrameNumber ); 00174 return frm->getAU()->size; //FIXME: fake return value for error 00175 } 00176 00177 //HEADER DATA 00178 if ( (!firstFrame && resendFrameHeader ) 00179 || (firstFrame && 00180 !MP4Decoder::isValidDecoderConfig(frm->getAU()->payload)) ) { //HEADER already in first frame 00181 00182 if(!MP4Decoder::isValidDecoderConfig(frm->getAU()->payload) && resetPlayTime) { 00183 // set the new lastCTS only for payload! 00184 ctsForFirstPlay=frm->getAU()->cts; 00185 gettimeofday(&lastTimeStamp,NULL); 00186 resetPlayTime=false; 00187 } 00188 //send extra header! 00189 u8* esHeaderBuffer=NULL; 00190 u32 headerSize=es->getHeaders(&esHeaderBuffer); 00191 if(headerSize>0) { 00192 newPayload=new u8[headerSize]; 00193 memcpy(newPayload, esHeaderBuffer,headerSize); 00194 00195 au.payload=newPayload; 00196 au.size=headerSize; 00197 if(firstFrame) 00198 au.cts=au.dts=0; 00199 else 00200 au.cts=au.dts=frm->getAU()->cts; 00201 00202 au.duration=0; 00203 currentPacketStart=au.payload; 00204 lastPacketEnd=au.payload+au.size; 00205 nrBytes=pLayer->getMaxPacketPayloadSize(); 00206 00207 while(currentPacketStart<lastPacketEnd) { 00208 remainingBytes=lastPacketEnd-currentPacketStart; 00209 if(remainingBytes<=nrBytes) { 00210 nrBytes=remainingBytes; 00211 last=true; 00212 } 00213 00214 dprintf_full("\nSimpleRtp::writeFrame: send cts %d instead of %d\n", au.cts - (es->getVOPTimeIncrement() * frameToStart), au.cts); 00215 //result &= sendPacket(currentPacketStart,nrBytes,au.cts,last); 00216 result &= sendPacket(currentPacketStart,nrBytes,au.cts - (es->getVOPTimeIncrement() * frameToStart),last); 00217 currentPacketStart+=nrBytes; 00218 } 00219 } 00220 if(esHeaderBuffer) { 00221 delete[] esHeaderBuffer; 00222 esHeaderBuffer=NULL; 00223 } 00224 resendFrameHeader=false; 00225 firstFrame=false; 00226 } 00227 00228 //FRAME DATA 00229 firstFrame=false; 00230 00231 if (frm->getAU()->size==0) { 00232 return 0; 00233 } 00234 00235 if(!MP4Decoder::isValidDecoderConfig(frm->getAU()->payload) && resetPlayTime) { 00236 // set the new lastCTS only for payload! 00237 ctsForFirstPlay=frm->getAU()->cts; 00238 gettimeofday(&lastTimeStamp,NULL); 00239 resetPlayTime=false; 00240 } 00241 00242 dprintf_full("SimpleRtp::writeFrame: FRAME first 4 bytes of Frame, CTS %u: %x %x %x %x\r\n", 00243 frm->getAU()->cts, frm->getAU()->payload[0], 00244 frm->getAU()->payload[1], 00245 frm->getAU()->payload[2],frm->getAU()->payload[3]); 00246 last=false; 00247 int upd=0; 00248 currentPacketStart=frm->getAU()->payload; 00249 lastPacketEnd=currentPacketStart+frm->getAU()->size; 00250 nrBytes=pLayer->getMaxPacketPayloadSize(); 00251 00252 while(currentPacketStart<lastPacketEnd) { 00253 remainingBytes=lastPacketEnd-currentPacketStart; 00254 if(remainingBytes<=nrBytes) { 00255 nrBytes=remainingBytes; 00256 last=true; 00257 } 00258 upd++; 00259 dprintf_full("\nSimpleRtp::writeFrame sending cts %d instead of cts %d", frm->getAU()->cts - (es->getVOPTimeIncrement() * frameToStart), frm->getAU()->cts); 00260 //result &= sendPacket(currentPacketStart,nrBytes,frm->getAU()->cts,last); 00261 result &= sendPacket(currentPacketStart,nrBytes,frm->getAU()->cts - (es->getVOPTimeIncrement() * frameToStart),last); 00262 currentPacketStart+=nrBytes; 00263 if(upd>=MAX_PAKETS_SENT_WITHOUT_SLEEP) { 00264 msleep(intermediateMiniSleep); 00265 update(); 00266 msleep(1); 00267 upd=0; 00268 } 00269 } 00270 00271 // sync 00272 #ifdef _POSIX_PRIORITY_SCHEDULING 00273 sched_yield(); 00274 #else 00275 // for windows pthread lib! 00276 #ifdef WIN32 00277 sched_yield(); 00278 #endif 00279 #endif 00280 syncToCts(frm->getAU()->cts); 00281 update(); 00282 return (int)result; 00283 } 00284 00286 void SimpleRtp::syncToCts(u32 cts) 00287 { 00288 if(cts>ctsForFirstPlay) { 00289 long sleepMS=calcSleepTimeInMs(cts); 00290 dprintf_full("SimpleRtp::syncToCts: will sleep %li ms\n",sleepMS); 00291 if(sleepMS>SIMPLERTP_STAYAHEAD) 00292 msleep(sleepMS-SIMPLERTP_STAYAHEAD); 00293 } 00294 } 00295 00296 long SimpleRtp::calcSleepTimeInMs(u32 cts) 00297 { 00298 long delay=0; 00299 00300 switch(delayType) { 00301 case(CTS_BASED): 00302 delay=calcSleepTimeCTSBased(cts); 00303 break; 00304 case(BITRATE_BASED): 00305 delay=calcSleepTimeBitRateBased(cts,115); 00306 break; 00307 case(MIXED_CTS_BITRATE): 00308 delay=calcSleepTimeMixedBased(cts,0); 00309 break; 00310 case(AGGRESSIVE_MIXED_CTS_BITRATE): 00311 delay=calcSleepTimeMixedBased(cts,prefetchTimeInMs); 00312 break; 00313 case(NO_DELAY): default: 00314 delay=0; 00315 } 00316 dprintf_full("SimpleRtp::calcSleepTimeInMs(%u) bytesTransf %u delay %li\n",cts,(u32)bytesTransferred,delay); 00317 return delay; 00318 00319 } 00320 00321 long SimpleRtp::calcSleepTimeCTSBased(u32 cts) 00322 { 00323 u64 ctsDifferenceInMs=(u64)(cts-ctsForFirstPlay); // in ticks 00324 00325 if(ctsDifferenceInMs==0) // first frame after play 00326 return 0; 00327 00328 ctsDifferenceInMs*=1000; // in ticks per millisec 00329 ctsDifferenceInMs/=(es->getMediaTimeScale()); // in ms 00330 00331 00332 // ctsDifferenceInMs calc the SHOULD difference (the play-out time in MS) 00333 // timeDiffInMs contains the observed difference 00334 u32 timeDiffInMs=getElapsedTimeInMSSinceLastPlay(); 00335 long delay=(long)(ctsDifferenceInMs-timeDiffInMs); 00336 return delay; 00337 } 00338 long SimpleRtp::calcSleepTimeBitRateBased(u32 cts, u32 speedScaleFactor) { 00339 long delay=0; 00340 if(speedScaleFactor==0 || speedScaleFactor>400) { 00341 speedScaleFactor=115; 00342 } 00343 u32 timeDiffInMs=(u32)getElapsedTimeInMSSinceLastPlay(); 00344 // constant netbandwidth streaming 00345 u64 bitsPerSecond=es->getAvgBandwidth(); 00346 // send the video faster with a factor of 1.15 00347 // bitsPerSecond*timeDiffInMs gives bits with scale 100 00348 // divide through 8 to get byte, multiply with scale/100 00349 // divide through 1000 because we calculated with time in MS and bit rate was a per second value! 00350 // assume: max duration 3 hours=10,8x10^6ms max bit rate 10 Mbit=10^7bit, max scale is 400 00351 // largest intermediate result is approx 1,1x10^7*10^7*400=4.4*10^16 --> ceil(ld(10^17)), max 57 bits are used 00352 u64 shouldBytes=bitsPerSecond*timeDiffInMs*speedScaleFactor/800000u; 00353 u64 diffBytes=0; 00354 // transferredBytes contains the real transferred bytes for the WHOLE session 00355 if(bytesTransferred> shouldBytes) { 00356 diffBytes=bytesTransferred-shouldBytes; 00357 delay=(long)(diffBytes*8000/bitsPerSecond); 00358 } 00359 return delay; 00360 }; 00361 long SimpleRtp::calcSleepTimeMixedBased(u32 cts,u32 clientPreBufferTime) { 00362 // try to keep client buffer between SIMPLERTP_MAX_MS_BEHIND and SIMPLERTP_MAX_MS_AHEAD 00363 // adjust scale factor depending on that information 00364 // decide periodically (3 seconds) on the new scale factor (1.0 < 1.15) 00365 // for the first 10 seconds use bit rate based streaming 00366 long delay=0; 00367 u32 timeDiffInMs=(u32)getElapsedTimeInMSSinceLastPlay(); 00368 s32 clientBufferInMS=0; 00369 00370 00371 u64 bitsPerSecond=es->getAvgBandwidth(); 00372 u64 ctsDifferenceInMs=(u64)(cts-ctsForFirstPlay); // in ticks 00373 ctsDifferenceInMs*=1000; // in ticks per millisec 00374 ctsDifferenceInMs/=(es->getMediaTimeScale()); // in ms 00375 dprintf_full("SimpleRtp::calcSleepTimeMixedBased(cts=%u): timeDiff %u ctsDiffMS %u preBuffer %u\n", 00376 cts,timeDiffInMs,(u32)ctsDifferenceInMs,clientPreBufferTime); 00377 if(ctsDifferenceInMs==0) // first frame 00378 return 0; 00379 // now calculate the client's buffer in MS 00380 // take the prebuffertime, add the streamout seconds, subtract the time elapsed since play 00381 // allow faster build up of buffer 00382 if(timeDiffInMs<clientPreBufferTime) 00383 clientPreBufferTime=timeDiffInMs; 00384 clientBufferInMS=clientPreBufferTime+ctsDifferenceInMs-timeDiffInMs; 00385 dprintf_full("SimpleRtp::calcSleepTimeMixedBased: clientBuffer %i\n",clientBufferInMS); 00386 double scale=1.0; 00387 double maxScale=1.3,minScale=0.8; 00388 // a buffer of SIMPLERTP_MAX_MS_BEHIND+1000 equals to 1.3 00389 // a buffer of SIMPLERTP_MAX_MS_AHEAD-1000 equals to 0.8 00390 if(clientBufferInMS<SIMPLERTP_MAX_MS_BEHIND+1000) { 00391 scale=maxScale; 00392 } 00393 else if(clientBufferInMS>(SIMPLERTP_MAX_MS_AHEAD-1000)) { 00394 scale=minScale; 00395 } 00396 else { 00397 u32 offset=(u32)(clientBufferInMS-(SIMPLERTP_MAX_MS_BEHIND+1000)); 00398 scale=maxScale-offset*(maxScale-minScale)/(SIMPLERTP_MAX_MS_AHEAD-2000-SIMPLERTP_MAX_MS_BEHIND); 00399 } 00400 00401 00402 u32 secElapsed=timeDiffInMs/1000; 00403 u32 seconds=1; 00404 if(secElapsed<seconds) { 00405 seconds=secElapsed+1; 00406 } 00407 u64 curMS=SimpleStatistics::getCurrentMilliSecond(); 00408 00409 curMS+=((seconds-1)*1000); 00410 u64 bytesToSend=(u64)(curMS*bitsPerSecond*scale/8000); 00411 u64 bytesSent=stats->getNumBytesOutLastNSeconds(seconds); 00412 long diff=(long)(bytesSent-bytesToSend); 00413 if(diff>0) 00414 delay=(long)(diff*8000/(seconds*scale*bitsPerSecond)); 00415 else 00416 delay=0; 00417 dprintf_full("SimpleRtp::calcSleepTimeMixedBased: delay%li bytesToSend %u bytesSent %u scale %lf\n",delay,(u32)bytesToSend,(u32)bytesSent,scale); 00418 00419 return delay; 00420 } 00421 00422 long SimpleRtp::getElapsedTimeInMSSinceLastPlay() { 00423 struct timeval theTimeIsNow; 00424 gettimeofday(&theTimeIsNow,NULL); 00425 long timeDiffInMs=(theTimeIsNow.tv_sec-lastTimeStamp.tv_sec)*1000+ 00426 (theTimeIsNow.tv_usec-lastTimeStamp.tv_usec)/1000; 00427 // should never happen but better catch this case 00428 if(timeDiffInMs<=0) 00429 timeDiffInMs=0; 00430 return timeDiffInMs; 00431 }; 00432 bool SimpleRtp::sendPacket(u8* payload, u32 size,u32 rtpTimestamp, bool last) 00433 { 00434 dprintf_full("SimpleRtp::sendPacket %i bytes CTS: %i \n",size,rtpTimestamp); 00435 if(stats) 00436 stats->insertPacket(rtpTimestamp,size,sendPacketCounter++,last); 00437 bytesTransferred+=size; 00438 int rc = rtp_send_data(session, 00439 rtpTimestamp, 00440 RTP_PAYLOAD_TYPE, last, 0, NULL, 00441 (uint8_t *) payload, 00442 size, 00443 NULL, 0, 0); 00444 return rc >= 0; 00445 }; 00446 00447 bool SimpleRtp::setToFrameNumber(u32 frameNumber) 00448 { 00449 dprintf_full("SimpleRtp::setToFrameNumber(%d)\n", frameNumber); 00450 frameToStart = frameNumber; //added by klschoef on 02/03/2005 00451 resetPlayTime=true; 00452 lockPacketBuffer.lock(); 00453 if(packetBuffer) 00454 packetBuffer->freeAllElements(); 00455 lockPacketBuffer.release(); 00456 return true; 00457 }; 00458 00459 bool SimpleRtp::close(bool immediate) 00460 { 00461 if(state==OPEN || state==OPENING) { 00462 state = CLOSING; 00463 // can not do this, client can send a play again 00464 /* rtp_send_bye(session); 00465 rtp_done(session); */ 00466 state = CLOSED; 00467 // empty the buffer 00468 lockPacketBuffer.lock(); 00469 packetBuffer->freeAllElements(); 00470 lockPacketBuffer.release(); 00471 } 00472 return true; 00473 }; 00474 00482 Frame *SimpleRtp::getFrame() 00483 { 00484 if (writeOnly || state != OPEN) 00485 return NULL; 00486 if(lastCTSReturned == es->getDuration()-es->getVOPTimeIncrement()) { 00487 //EOF 00488 close(); // locks lockpacketBuffer 00489 dprintf_full("SimpleRtp::getFrame: REACHED STREAMEOF\n"); 00490 return NULL; 00491 } 00492 00493 dprintf_full("SimpleRtp::getFrame() %u\n",rtpSeqNrStart); 00494 // otherwise prebuffer! 00495 if(firstFrame) { 00496 preBufferPackets(prefetchTimeInMs); 00497 firstFrame=false; 00498 } 00499 // now check if we have a complete frame available in the buffer 00500 // we start with rtp_seq_nr 00501 if(rtpSeqNrStart>=rtpSeqNrEnd) 00502 preBufferPackets(MINI_PREBUFFER_INTERVALL); 00503 else { 00504 //fetch all cached packets which are CURRENTLY available 00505 preBufferPackets(0); 00506 } 00507 lockPacketBuffer.release(); 00508 Frame* frm=extractCompleteFrame(rtpSeqNrStart); 00509 if(frm) { 00510 // no more accesses to packetBuffer 00511 lockPacketBuffer.release(); 00512 nullFrameCounter=0; 00513 if(resetPlayTime) { 00514 ctsForFirstPlay=frm->getAU()->cts; 00515 gettimeofday(&lastTimeStamp,NULL); 00516 resetPlayTime=false; 00517 } 00518 00519 if(frm->getAU()->cts==0 && MP4Decoder::isValidDecoderConfig(frm->getAU()->payload)) { 00520 //check if we have payload too! if yes remove the header from the frame 00521 // to avoid that we have duplicate headers when writing to file! 00522 stripHeaderFromFrame(frm); 00523 } 00524 if(MP4Decoder::isValidDecoderConfig(frm->getAU()->payload)) { 00525 if(es->isVisualStream() && enableSwitching) 00526 es->setDecoderConfig(frm->getAU()->payload,frm->getAU()->size); 00527 delete frm; 00528 frm=getFrame(); 00529 } 00530 else if(MP4audioDecoder::isValidDecoderConfig(frm->getAU()->payload)) { 00531 if(es->isAudioStream() && enableSwitching) 00532 es->setDecoderConfig(frm->getAU()->payload,frm->getAU()->size); 00533 delete frm; 00534 frm=getFrame(); 00535 } 00536 } 00537 else { 00538 nullFrameCounter++; 00539 dprintf_full("SimpleRtp::getFrame() NULL %i\n",nullFrameCounter); 00540 if(lastCTSReturned > 00541 this->es->getDuration() - es->getMediaTimeScale()) { 00542 // if 2 null frames in last second close! 00543 if(nullFrameCounter>1) { 00544 lockPacketBuffer.release(); 00545 close(); // locks lockpacketBuffer 00546 dprintf_full("SimpleRtp::getFrame: REACHED STREAMEOF\n"); 00547 } 00548 else 00549 lockPacketBuffer.release(); 00550 } 00551 else { 00552 //give 2nd chance to retrieve the frame (>1) 00553 if(nullFrameCounter>1) { 00554 removeIncompleteFrame(rtpSeqNrStart); //access packetBuffer 00555 lockPacketBuffer.release(); 00556 } 00557 else if(nullFrameCounter>MAX_NULL_FRAMES){ 00558 lockPacketBuffer.release(); 00559 close(); // locks lockpacketBuffer 00560 dprintf_full("SimpleRtp::getFrame: PACKET LOSS TOO HIGH! ENDING SimpleRtp\n"); 00561 } 00562 else 00563 lockPacketBuffer.release(); 00564 } 00565 } 00566 if(frm) 00567 lastCTSReturned=frm->getAU()->cts; 00568 00569 // sync 00570 #ifdef _POSIX_PRIORITY_SCHEDULING 00571 sched_yield(); 00572 #else 00573 // for windows pthread lib! 00574 #ifdef WIN32 00575 sched_yield(); 00576 #endif 00577 #endif 00578 if(frm) 00579 syncToCts(frm->getAU()->cts); 00580 update(); 00581 return frm; 00582 } 00583 00587 void SimpleRtp::removeIncompleteFrame(u32 seqNr) 00588 { 00589 bool cont=true; 00590 bool firstPacket=true; 00591 u32 cts=0; 00592 dprintf_full("SimpleRtp::removeIncompleteFrame: start with seqNr%u (cts should be %u)\n",seqNr,lastCTSReturned); 00593 00594 while(cont && seqNr<=rtpSeqNrEnd) { 00595 00596 AU* p=packetBuffer->get(seqNr); 00597 u32 size=(p?p->size:0); 00598 if(p) { 00599 if(firstPacket) { 00600 cts=p->cts; 00601 } 00602 if(p->sampleFlags>0) //end of frame detected 00603 cont=false; 00604 if(cts==p->cts) { 00605 if(stats) { 00606 stats->dropInsertedPacket(size,seqNr,cts,firstPacket); 00607 } 00608 packetBuffer->freeElement(seqNr); 00609 p=NULL; 00610 } 00611 else { 00612 cont=false; 00613 seqNr--; // put back the last packet, belongs to next frame 00614 } 00615 if(firstPacket) 00616 firstPacket=false; 00617 } 00618 else { 00619 // a NULL packet returned -> lost!!! 00620 stats->notifyLostPacket(seqNr); 00621 } 00622 seqNr++; 00623 } 00624 00625 dprintf_full("SimpleRtp::removeIncompleteFrame: last deleted seqNr%u\n",seqNr-1); 00626 rtpSeqNrStart=seqNr; 00627 } 00628 00629 Frame* SimpleRtp::extractCompleteFrame(u32 seqNr) 00630 { 00631 std::list<AU*> packets; 00632 std::list<AU*>::iterator li; 00633 Frame* frm=NULL; 00634 dprintf_full("SimpleRtp::extractCompleteFrame(u32 seqNr=%u)\n",seqNr); 00635 00636 u32 rtpTimeStamp=0; // the cts value 00637 if(rtpSeqNrStart>=rtpSeqNrEnd) // empty buffer 00638 return NULL; 00639 // extract packets until empty packet or end of frame 00640 // or new cts! no need to care about seqNr here, HashTable guarantees 00641 // ordering! 00642 // seqNr is a unique id!!! Every list has exactly ONE packet stored 00643 bool cont=true; 00644 bool complete=false; 00645 u32 tmpSeqNr=seqNr; 00646 u32 frmSize=0; 00647 while(cont && tmpSeqNr<=rtpSeqNrEnd && !complete) { 00648 AU* p=packetBuffer->get(tmpSeqNr); 00649 if(p==NULL) { 00650 cont=false; 00651 // don't do notify here. to catch every lost packet do it in dropIncompleteFrame 00652 // stats->notifyLostPacket(tmpSeqNr); 00653 } 00654 else { 00655 if(packets.empty()) // if no other packet in the result list 00656 rtpTimeStamp=p->cts; // the cts value 00657 if(p->cts==rtpTimeStamp) { 00658 packets.push_back(p); 00659 frmSize+=p->size; 00660 complete=(p->sampleFlags>0?true:false); 00661 } 00662 else { // no endofframe but different cts, 00663 cont=false; 00664 // don't do notify here. to catch every lost packet do it in dropIncompleteFrame 00665 // stats->notifyLostPacket(tmpSeqNr); 00666 if(updateESInfo) { 00667 if(p->cts-rtpTimeStamp>0) { 00668 es->setVOPTimeIncrement(p->cts-rtpTimeStamp); 00669 updateESInfo=false; 00670 } 00671 } 00672 00673 } 00674 } 00675 tmpSeqNr++; 00676 00677 } 00678 if(complete) { 00679 // check next frame to calc voptimeincrement 00680 if(updateESInfo) { 00681 AU* tmpp=packetBuffer->get(tmpSeqNr); 00682 if(tmpp) { 00683 if(tmpp->cts-rtpTimeStamp>0) { 00684 es->setVOPTimeIncrement(tmpp->cts-rtpTimeStamp); 00685 updateESInfo=false; 00686 } 00687 } 00688 } 00689 // rebuild the frame 00690 dprintf_full("SimpleRtp::extractCompleteFrame: Using packets %u to %u to rebuild frame(%i packets)\n", 00691 seqNr,tmpSeqNr-1,packets.size()); 00692 if(es->isAudioStream()) 00693 frmSize-=packets.size()*4; 00694 AU* au=new AU(); 00695 au->size=frmSize; 00696 au->payload=new u8[frmSize]; 00697 if(stats) { 00698 stats->getFrame(rtpTimeStamp,frmSize,seqNr,tmpSeqNr-1); 00699 } 00700 // FIXME: handle header correctly 00701 // for now just concatenate the payload of the packets 00702 // free the packets from seqNr to including tmpSeqNr 00703 u32 offset=0; 00704 u32 dataOffset=0; 00705 // remove header for audio 00706 if(es->isAudioStream()) 00707 dataOffset+=4; 00708 for(li=packets.begin();li!=packets.end();++li) { 00709 if(li==packets.begin()) { 00710 // set cts for first packet 00711 // FIXME: dts is wrong 00712 au->cts=au->dts=(*li)->cts; 00713 } 00714 AU* tmp=(*li); 00715 memcpy(au->payload+offset,tmp->payload+dataOffset,tmp->size-dataOffset); 00716 offset+=tmp->size-dataOffset; 00717 } 00718 // free memory from seqNr to excl tmpSeqNr 00719 for(u32 sNr=seqNr;sNr<tmpSeqNr;sNr++) 00720 packetBuffer->freeElement(sNr); 00721 00722 if(es->isAudioStream()) 00723 frm=new CompressedAudioFrame(Frame::NN_VOP); 00724 else 00725 frm=new CompressedVideoFrame(Frame::NN_VOP,((VideoESInfo*)es)->getWidth(),((VideoESInfo*)es)->getHeight()); 00726 frm->setAU(au); 00727 rtpSeqNrStart=tmpSeqNr; // the next packet to read 00728 } 00729 else { 00730 dprintf_full("SimpleRtp::extractCompleteFrame: incomplete frame CTS %u\n",rtpTimeStamp); 00731 } 00732 return frm; 00733 }; 00734 00735 void SimpleRtp::preBufferPackets(long millisecs) 00736 { 00737 struct timeval tv; 00738 struct timeval preBufferStart; 00739 struct timeval preBufferEnd; 00740 gettimeofday(&preBufferStart,NULL); 00741 packetInsertedDuringLastCallback=true; 00742 long timeDiffInMs=0; 00743 int ret=0; 00744 dprintf_full("SimpleRtp::preBufferPackets %li\n",millisecs); 00745 while(timeDiffInMs<=millisecs) { 00746 tv.tv_sec = 0; 00747 tv.tv_usec = 1; //rtp_recv returns one single packet, wait nearly no time 00748 if(packetInsertedDuringLastCallback) { 00749 // when we read a packet the last time, try to read again 00750 packetInsertedDuringLastCallback=false; 00751 ret=rtp_recv(session, &tv, rtpSeqNrStart); // only returns one packet 00752 // will set above var. to true if data avail. 00753 // if no data is available it is still false and in case millisecs is zero 00754 // exit immediately 00755 if(!packetInsertedDuringLastCallback && millisecs==0) { 00756 update(); 00757 return; 00758 } 00759 } 00760 else if(millisecs-timeDiffInMs>=UPDATE_INTERVAL_MS) { 00761 msleep(UPDATE_INTERVAL_MS); 00762 packetInsertedDuringLastCallback=true; // read again after sleep 00763 } 00764 else if(millisecs-timeDiffInMs>0) { 00765 msleep(millisecs-timeDiffInMs); 00766 packetInsertedDuringLastCallback=true; // read again after sleep 00767 } 00768 // calc prebuffer 00769 gettimeofday(&preBufferEnd,NULL); 00770 timeDiffInMs=(preBufferEnd.tv_sec-preBufferStart.tv_sec)*1000+ 00771 (preBufferEnd.tv_usec-preBufferStart.tv_usec)/1000; 00772 00773 } 00774 dprintf_full("SimpleRtp::preBufferPackets spent %li\n",timeDiffInMs); 00775 update(); 00776 } 00777 00778 char *SimpleRtp::getSSRC() 00779 { 00780 if (state != OPEN) 00781 return NULL; 00782 00783 char *result = new char[9]; 00784 sprintf(result, "%08x", rtp_my_ssrc(session)); 00785 return result; 00786 }; 00787 00788 void SimpleRtp::update() 00789 { 00790 if(state==IO::OPEN) { 00791 if (session) { 00792 rtp_update(session); 00793 rtp_send_ctrl(session, 0, NULL,RTCP_NORMAL); 00794 } 00795 } 00796 }; 00797 00798 IO::State SimpleRtp::play(double prefetchTime) 00799 { 00800 resetPlayTime=true; 00801 prefetchTimeInMs=(long)prefetchTime*1000; 00802 // prebuffer after pause 00803 if(!writeOnly) { 00804 firstFrame=true; // prebuffer! 00805 ctsForFirstPlay=0;nullFrameCounter=0; 00806 rtpSeqNrStart=4294967295u; 00807 rtpSeqNrEnd=0; 00808 } 00809 bytesTransferred=0; 00810 if(stats) 00811 stats->play(0,true); 00812 return IO::play(prefetchTime); 00813 } 00814 00815 IO::State SimpleRtp::pause() 00816 { 00817 resetPlayTime=true; 00818 if(stats) 00819 stats->pause(); 00820 return IO::pause(); 00821 } 00822 00823 void SimpleRtp::RtpCallback(struct rtp *session, rtp_event * e) { 00824 // Currently we ignore RTCP packets 00825 // Just do our required housekeeping 00826 rtp_packet *packet; 00827 SimpleRtp *caller = NULL; 00828 00829 if (e->ssrc == rtp_my_ssrc(session)) 00830 return; 00831 00832 switch(e->type) { 00833 case RX_RTP: 00834 // we must deep-copy the packets, they are freed in the ucl lib! 00835 packet = (rtp_packet*)e->data; 00836 caller=(SimpleRtp*)rtp_get_userdata(session); 00837 if(!packet) { 00838 dprintf_err("SimpleRtp::RtpCallback NULL packet?"); 00839 break; 00840 } 00841 if(packet->pd.rtp_pd_data_len==0 || packet->pd.rtp_pd_data==NULL) { 00842 dprintf_err("SimpleRtp::RtpCallback empty packet?"); 00843 xfree(packet); 00844 break; 00845 } 00846 if (caller->isInput()) { //only client handles incoming data! 00847 AU* pClone=new AU(); 00848 pClone->cts=packet->ph.ph_ts; 00849 pClone->payload=new u8[packet->pd.rtp_pd_data_len]; 00850 pClone->size=packet->pd.rtp_pd_data_len; 00851 pClone->sampleFlags=(packet->ph.ph_m?1:0); 00852 memcpy(pClone->payload,packet->pd.rtp_pd_data,pClone->size); 00853 caller->insertPacket(pClone,packet->ph.ph_seq); 00854 } 00855 else 00856 dprintf_full("SimpleRtp::RtpCallback: Not supposed to get data?\n"); 00857 00858 xfree(packet); /* xfree() is mandatory to release RTP packet data */ 00859 break; 00860 case RX_SDES: 00861 dprintf_full("RX_SDES SSRC = 0x%08x\n", e->ssrc); 00862 break; 00863 case RX_BYE: 00864 dprintf_full("RX_BYE SSRC = 0x%08x\n", e->ssrc); 00865 break; 00866 case SOURCE_CREATED: 00867 dprintf_full("SimpleRtp::RtpCallback: New source created, SSRC = 0x%08x\n", e->ssrc); 00868 break; 00869 case SOURCE_DELETED: 00870 caller=(SimpleRtp*)rtp_get_userdata(session); 00871 caller->close(); 00872 break; 00873 case RX_APP: 00874 break; 00875 default: 00876 break; 00877 } 00878 }; 00879 00880 void SimpleRtp::insertPacket(AU* p, u32 origSeqNr) 00881 { 00882 u32 seqNr= origSeqNr+overflow*65536u; 00883 if(firstPacket) { 00884 rtpSeqNrStart=rtpSeqNrEnd=seqNr; 00885 firstPacket=false; 00886 } 00887 // howto detect overflow? 00888 // the difference between the largest and the next will exceed 60000! 00889 if((seqNr+65536)-rtpSeqNrEnd<2000u) { 00890 overflow++; 00891 dprintf_full("SimpleRtp::insertPacket detected rtpSeqNr Overflow\n"); 00892 seqNr+=65536; 00893 } 00894 // now seq Nr is correct 00895 if(stats) { 00896 stats->insertPacket(p->cts,p->size,seqNr,(p->sampleFlags==1) ); 00897 } 00898 lockPacketBuffer.lock(); 00899 if(packetBuffer) 00900 packetBuffer->put(p,seqNr); //use rtp timestamp as key 00901 lockPacketBuffer.release(); 00902 // check for a new max/min seqNr 00903 if(seqNr<rtpSeqNrStart) 00904 rtpSeqNrStart=seqNr; 00905 if(seqNr>rtpSeqNrEnd) 00906 rtpSeqNrEnd=seqNr; 00907 packetInsertedDuringLastCallback=true; 00908 dprintf_full("SimpleRtp::insertPacket(%u/%u)[%u,%u] CTS=%u Size=%u\n",origSeqNr,seqNr,rtpSeqNrStart,rtpSeqNrEnd,p->cts,p->size); 00909 }; 00910 00914 void SimpleRtp::stripHeaderFromFrame(Frame* frm) 00915 { 00916 assert(frm&&frm->getAU()); 00917 if(!frm) 00918 return; 00919 if(!frm->getAU()) 00920 return; 00921 00922 u8* payload=frm->getAU()->payload; 00923 u32 size=frm->getAU()->size; 00924 u8* payloadEnd=payload+size; 00925 if(!payload || size==0) 00926 return; 00927 if(!MP4Decoder::isValidDecoderConfig(payload)) 00928 return; 00929 00930 bool stop=false; 00931 u8* tmp=payload+4; 00932 00933 // we received a header 00934 // with or without payload? 00935 while(!stop && tmp < payloadEnd && 00936 (tmp=(u8*)memchr(tmp,0xb6,payloadEnd-tmp))) { 00937 if(tmp==NULL) 00938 tmp=payloadEnd; 00939 else if(tmp[-1] == 1 && tmp[-2] == 0 && tmp[-3] == 0) { 00940 stop=true; 00941 tmp-=3; 00942 } else 00943 tmp++; 00944 00945 } 00946 if (stop && tmp && enableSwitching) 00947 es->setDecoderConfig(payload,tmp-payload); 00948 else if(enableSwitching) 00949 es->setDecoderConfig(payload,(int)size); 00950 00951 if(stop && tmp) { 00952 // remove the header from the frame 00953 u32 newSize=size-(tmp-payload); 00954 u8* newPayload=new u8[newSize]; 00955 memcpy(newPayload,tmp,newSize); 00956 AU* tmp=frm->getAU(); 00957 delete[] tmp->payload; 00958 tmp->payload=newPayload; 00959 tmp->size=newSize; 00960 frm->detectFrameType(); 00961 } 00962 } 00963 00964 00965 #endif //wince