Rtp.cpp

00001 00002 /*********************************************************************** 00003 * * 00004 * ViTooKi * 00005 * * 00006 * title: Rtp.cpp * 00007 * * 00008 * * 00009 * * 00010 * ITEC institute of the University of Klagenfurt (Austria) * 00011 * http://www.itec.uni-klu.ac.at * 00012 * * 00013 * * 00014 * For more information visit the ViTooKi homepage: * 00015 * http://ViTooKi.sourceforge.net * 00016 * vitooki-user@lists.sourceforge.net * 00017 * vitooki-devel@lists.sourceforge.net * 00018 * * 00019 * This file is part of ViTooKi, a free video toolkit. * 00020 * ViTooKi is free software; you can redistribute it and/or * 00021 * modify it under the terms of the GNU General Public License * 00022 * as published by the Free Software Foundation; either version 2 * 00023 * of the License, or (at your option) any later version. * 00024 * * 00025 * This program is distributed in the hope that it will be useful, * 00026 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00027 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 00028 * GNU General Public License for more details. * 00029 * * 00030 * You should have received a copy of the GNU General Public License * 00031 * along with this program; if not, write to the Free Software * 00032 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, * 00033 * MA 02111-1307, USA. * 00034 * * 00035 ***********************************************************************/ 00036 00037 /*********************************************************************** 00038 * * 00039 * REVISION HISTORY: * 00040 * * 00041 * * 00042 * * 00043 ***********************************************************************/ 00044 00045 #include <vector> 00046 #ifndef WINCE 00047 #include <sched.h> 00048 #endif 00049 00050 #include "Rtp.hpp" 00051 #include "CompressedVideoFrame.hpp" 00052 #include "CompressedAudioFrame.hpp" 00053 #include "net/SyncLayer.hpp" 00054 #include "net/PacketizationLayer.hpp" 00055 #include "adaptors/MP4Decoder.hpp" 00056 #include "adaptors/MP4audioDecoder.hpp" 00057 #include "Statistics.hpp" 00058 #include "metadata/TerminalCapabilities.hpp" 00059 00060 /* when there is less than eg. 2.0 percent pkt loss, increase net_BW by INC_PACKETs */ 00061 #define RTP_BW_INC_LOSS 1.5 00062 #define RTP_BW_INC_PACKETS 6 00063 //#define RTP_BW_INC_FACT 1.15 00064 00065 /* when there is more than eg. 5.0 percent pkt loss, decrease net_BW by eg. 20% (0.80) */ 00066 #define RTP_BW_DEC_LOSS 10.0 00067 #define RTP_BW_DEC_FACT 0.85 00068 00069 00070 /* how may secs of video should be added to the standard sec, just to fill buffers and lose quality? 00071 * Type: SECONDS */ 00072 #define RTP_BUF_FILL_FACT 0.30 00073 00074 /* how may secs of video should be removed from the standard sec, just to drain buffers and keep quality? 00075 * Type: SECONDS */ 00076 #define RTP_BUF_DRAIN_FACT 0.30 00077 00078 /* if the buffer is 95% full, we stop adapting, just fully draining buffer... 00079 * further, we reduce network bandwidth to a bare minimum of 3 Kbyte/sec 00080 * we call this "superdrain mode" :) 00081 * Type: PERCENT */ 00082 #define RTP_BUF_SUPERHIGHWATER 90 00083 00084 /* if client buffer is drained from SUPERHIGHWATER, it will return into normal mode when the 00085 * buffer fill level falls below the RTP_BUF_SUPERHIGHWATER_LOW 00086 * Type: PERCENT */ 00087 #define RTP_BUF_SUPERHIGHWATER_LOW 70 00088 00089 00090 /* if client buffer allows it, dont reduce quality less than eg. 50% 00091 * for B-frame dropping, a pattern with 4b IBBBBPBBBBPBBBB and B-quant28 00092 * gives about 50% scalability 00093 * Type: PERCENT */ 00094 #define RTP_MAX_ADAPT 50 00095 00096 /* what is the lowest acceptable framerate? (eg. fps > 15) 00097 * Type: FPS */ 00098 #define RTP_MIN_PRIO 14 //14 or 5 00099 //#define RTP_MIN_PRIO IO_NETWORK_HIGHEST_PRIORITY 00100 00101 /* if client has less than eg. 0.3 secs of video in buffer, we force prefetching 00102 * Type: SECONDS */ 00103 #define RTP_BUF_PREFETCH 0.1 00104 00105 /* byte size of server-side preQ */ 00106 #define RTP_SERVER_PreQ_MAX 10*1024*1024 00107 00108 /* byte size of client-side preQ */ 00109 //NOT USED! OVERRIDDEN BY: RTP_CLIENT_PreQ_MAX_SECS * avgBW 00110 #define RTP_CLIENT_PreQ_MAX 4*1024*1024 00111 00112 00113 // for stupid MSVC 00114 #ifdef WIN32 00115 static const int TTL = 16; 00116 static const int MAX_PAYLOAD_SIZE = 65000; 00117 static const int RTP_PAYLOAD_TYPE = 96; 00118 #endif 00119 #define MPEG_AUDIO_HEADER_SIZE 4 00120 #define RTP_SEQ_MOD 0x10000 00121 #define RTP_BW_MINIMUM 3*1024 //dont go below that net_bw value... 00122 #define UCL_TIMEOUT 20 00123 00124 //minimum udp packet inter-sendout time 00125 #define RTP_MIN_INTERARRIVAL_TIME 6 00126 00127 //if act_wait is below this, we do busy waiting, to get better granularity 00128 #define RTP_BUSY_WAIT_MS 5 00129 00130 PortGenerator portGen; 00131 00132 /***************************************************************/ 00133 Rtp::Rtp(const char *uri, int remotePrt, const char *addr, 00134 int localPrt, ESInfo * e, 00135 PacketizationLayer * p, bool w, const TerminalCapabilities *tc, 00136 GlobalTimer *gt, Statistics *statistics,bool avtd) : IO() 00137 { 00138 assert(e); 00139 assert(p); 00140 assert(uri && addr); 00141 allowVopTimeDetection=avtd; 00142 url = new char[strlen(uri) + 1]; 00143 rtx=NULL; 00144 remotePort = remotePrt; 00145 remoteAddress = new char[strlen(addr) + 1]; 00146 localPort = localPrt; 00147 globalTimer = gt; 00148 stats = statistics; 00149 00150 writeToNetwork = w; 00151 strcpy(remoteAddress, addr); 00152 strcpy(url, uri); 00153 setState(CLOSED); 00154 session = NULL; 00155 pLayer = p; 00156 firstFrame=false; // must use open to set to true 00157 memset(&sendQ.Q,0,sizeof(sendQ.Q)); 00158 preQ.Q = preQ.Q_tail = NULL; 00159 sendQ.size = preQ.size = 0; 00160 resendFrameHeader=false; 00161 preQ.needsPrefetching=false; //initial prefetching autodetected 00162 00163 preQ.MUTEX_Q.initialize("RTP-preQ.MUTEX_Q"); 00164 00165 preQ.MUTEX_Q_DATA_AVAIL.initialize("RTP-preQ.MUTEX_Q_DATA_AVAIL"); 00166 preQ.MUTEX_Q_DATA_AVAIL.lock(); 00167 00168 preQ.MUTEX_Q_FULL.initialize("RTP-preQ.MUTEX_Q_FULL"); 00169 preQ.MUTEX_Q_DONE.initialize("RTP-preQ.MUTEX_Q_DONE"); 00170 preQ.MUTEX_Q_DONE.lock(); 00171 00172 sendQ.MUTEX_Q.initialize("RTP-sendQ.MUTEX_Q"); 00173 00174 last_rr_seq_no = -1; 00175 last_rr_sum_pkts_lost = -1; 00176 00177 last_rtp_seq=0; 00178 last_rtx_rtp_seq=0; 00179 interarrival_time=0; 00180 //net_bw_before_superdrain=-1; 00181 real_bw=0; 00182 first_rtp_seq = -1; 00183 first_rtx_rtp_seq = -1; 00184 ts_offset = 0; 00185 00186 feedbackEnabled = false; 00187 rtxEnabled = false; 00188 threadWasPaused=false; 00189 00190 this->setESInfo(e); 00191 if(tc) 00192 stats->setBaseStreamBW(tc->getNetworkCapacityInByte()); //set base, unaffected by switching 00193 else 00194 stats->setBaseStreamBW(stats->getStreamBW()); //set base, unaffected by switching 00195 00196 00197 dprintf_full("Rtp::Rtp (url=%s, remotePort=%i, remoteAddr=%s, localPort=%i, writeOnly=%i, avgBW=%i, TCbw=%i)\r\n", 00198 url, remotePort,remoteAddress,localPort,(int)writeToNetwork,stats->getStreamBW()/128, stats->getBaseStreamBW()/128 ); 00199 00200 } 00201 00202 00203 /***************************************************************/ 00204 void Rtp::setESInfo(ESInfo * new_es) { 00205 int netBW; 00206 00207 es = new_es; 00208 00209 stats->setESInfo(new_es); 00210 00211 stats->setStreamBW( (int)(es->getAvgBandwidth() / 8 * 1.15)); //add 15 % 00212 #if (defined NETBENCH) 00213 dprintf_full("Rtp compiled with NETBENCH, so simulated network behaviour!!!\n"); 00214 //override with exact avgBW 00215 stats->setStreamBW( (int)(es->getAvgBandwidth() / 8 * 1)); 00216 #endif 00217 //stats->setStreamBW(0.8*1024*1024/8); 00218 00219 dprintf_full("Rtp::setESInfo with new streamBW %i\n",stats->getStreamBW()/128); 00220 00221 endFrameNumber = 0; 00222 00223 //set adaption right for new stream 00224 netBW = stats->getStreamBW(); 00225 00226 if (state != CLOSED) { //only for stream switching 00227 calcAdaptPreQValues(netBW); 00228 adaptPreQ(netBW); 00229 } 00230 } 00231 00232 00233 /***************************************************************/ 00234 Rtp::~Rtp() { 00235 if (remoteAddress) { 00236 delete[] remoteAddress; 00237 remoteAddress = NULL; 00238 } 00239 // FIXME: pLayer needs a refCount 00240 if (pLayer) { 00241 delete pLayer; 00242 pLayer = NULL; 00243 } 00244 if (state != CLOSING || state != CLOSED) { 00245 close(true); 00246 } 00247 00248 00249 if(session) { 00250 rtp_send_bye(session); 00251 //rtp_done(session); 00252 session=NULL; 00253 } // delete happens in rtp_done(..) 00254 flushBuffer(0,-1); 00255 if(rtx) { 00256 if(rtx->session) { 00257 rtp_send_bye(rtx->session); 00258 //rtp_done(rtx->session); 00259 rtx->session=NULL; 00260 } // delete happens in rtp_done(..) 00261 delete rtx; 00262 rtx=NULL; 00263 } 00264 00265 } 00266 00267 /***************************************************************/ 00268 bool Rtp::open() { 00269 00270 dprintf_full("Rtp::open Rtp is %s, state is %i\n",(writeToNetwork) ? "write" : "read",state); 00271 if (state == OPEN) 00272 return true; 00273 setState(OPENING); 00274 00275 /* 00276 setRoundRobinScheduling(1); 00277 */ 00278 00279 currentFrameNumber=0; 00280 endFrameNumber=-0; 00281 IOFrameStat.numProcessedFrames=0; 00282 IOFrameStat.seqno = -1; 00283 IOFrameStat.act_ts = -1; //impossibly first CTS 00284 firstFrame=true; 00285 framesSeen = 0; 00286 if (remoteAddress && !session) { 00287 dprintf_full("Rtp::open init RTP channel ports %i <--> %i\n",localPort,remotePort); 00288 int p1, p2; 00289 p1=portGen.getSockFromPort(localPort); 00290 p2=portGen.getSockFromPort(localPort+1); 00291 dprintf_full("PortGenerator::dome\n"); fflush(stdout); 00292 session = ucl_rtp_init(remoteAddress, 00293 localPort, 00294 remotePort, 00295 TTL, 100, Rtp::RtpCallback, (uint8_t *) this, 00296 p1,p2); 00297 } 00298 dprintf_full("PortGenerator::\n"); fflush(stdout); 00299 if (session) { 00300 rtp_set_option(session, RTP_OPT_WEAK_VALIDATION, FALSE); 00301 rtp_set_option(session, RTP_OPT_PROMISC, TRUE); 00302 00303 #ifndef WITHOUT_RTP_EXT 00304 if (!writeToNetwork) { // DataSink 00305 if (rtx) { 00306 dprintf_full("Rtp::open enabled to recv/send feedback!\n"); 00307 feedbackEnabled = true; 00308 //should we send feedback? enable it at UCL lib! 00309 if ((rtx->state == ACK) || (rtx->state == NACKACK)) 00310 rtp_set_option(session, RTCP_OPT_FB_ACK, TRUE); 00311 if ((rtx->state == NACK) || (rtx->state == NACKACK)) 00312 rtp_set_option(session, RTCP_OPT_FB_NACK, TRUE); 00313 } 00314 } else 00315 feedbackEnabled = true; //server will always accept extended feedback 00316 #endif 00317 00318 if (rtx) { 00319 if (rtx->remotePort ==0) { 00320 dprintf_full("Rtp::open init RTP RTX channel port is ZERO! RTX disabled! still capable of recv/send feedback!\n"); 00321 } else { 00322 #ifndef WITHOUT_RTP_EXT 00323 dprintf_full("Rtp::open init RTP RTX channel ports %i <--> %i\n",rtx->localPort,rtx->remotePort); 00324 rtxEnabled = true; 00325 rtx->session = ucl_rtp_init(remoteAddress, 00326 rtx->localPort, 00327 rtx->remotePort, 00328 TTL, 100, Rtp::RtpCallback, (uint8_t *) this, 00329 portGen.getSockFromPort(rtx->localPort), 00330 portGen.getSockFromPort(rtx->localPort+1)); 00331 if (!rtx->session) { 00332 setState(STREAMERR); 00333 return false; 00334 } 00335 rtp_set_option(rtx->session, RTP_OPT_WEAK_VALIDATION, FALSE); 00336 rtp_set_option(rtx->session, RTP_OPT_PROMISC, TRUE); 00337 if (!writeToNetwork) { // DataSink 00338 //should we send rtx feedback? 00339 if ((rtx->state == ACK) || (rtx->state == NACKACK)) 00340 rtp_set_option(rtx->session, RTCP_OPT_FB_ACK, TRUE); 00341 if ((rtx->state == NACK) || (rtx->state == NACKACK)) 00342 rtp_set_option(rtx->session, RTCP_OPT_FB_NACK, TRUE); 00343 } 00344 #endif 00345 } 00346 }//if rtx 00347 00348 00349 setState(OPEN); 00350 firstFrame=true; 00351 // FIXME: on server-side this should be recv'd via RTSP/SDP! 00352 stats->setClientPreQMaxSize((RTP_CLIENT_PreQ_MAX_SECS+1) * 00353 (int) ((float)(es->getAvgBandwidth() / 8) * (1+RTP_BUF_FILL_FACT) ) ); 00354 00355 if (writeToNetwork) //server has larger Q 00356 preQ.max_size = RTP_SERVER_PreQ_MAX; 00357 else 00358 preQ.max_size = stats->getClientPreQMaxSize(); 00359 00360 preQ.size = preQ.num_elems = preQ.num_full_frames = 0; 00361 preQ.avg_bw = 0; 00362 // preQ.avg_bw = es->getAvgBandwidth(); 00363 preQ.highest_rtp_seq=0; 00364 preQ.highest_ts=0; 00365 00366 dprintf_full("Rtp::open (ports %i:%i) is starting the according RTP thread\n",localPort,remotePort); 00367 this->start(); 00368 return true; 00369 } else { 00370 setState(STREAMERR); 00371 return false; 00372 } 00373 } 00374 00375 /***************************************************************/ 00376 IO::State Rtp::play(double prefetchTime) { 00377 dprintf_full("Rtp::play with prefetch %f (old state: %i)\n",prefetchTime,state); 00378 00379 if ((getState() == IO::CLOSED) || 00380 (getState() == IO::PAUSED) || 00381 (getState() == IO::MUTED)) { 00382 if (isInput()) { 00383 if (!preQ.needsPrefetching) { 00384 preQ.needsPrefetching=true; 00385 stats->setStillToPrefetchSecs( stats->getStillToPrefetchSecs() + prefetchTime); 00386 dprintf_full("Rtp::play requesting stillToPrefetchSecs %f\n", stats->getStillToPrefetchSecs()); 00387 flushBuffer(0,-1); //delete all 00388 IOFrameStat.seqno = -1; //getFrame will accept new rtp_seq offset 00389 IOFrameStat.act_ts = -1; //impossibly first CTS 00390 } 00391 } else { 00392 stats->setPrefetchedSecs(stats->getPrefetchedSecs() + prefetchTime); 00393 } 00394 if (getState() != IO::CLOSED) { 00395 setState(IO::OPEN); 00396 while (threadWasPaused) 00397 msleep(10); 00398 } 00399 } 00400 return getState(); 00401 } 00402 00403 /***************************************************************/ 00404 IO::State Rtp::pause() { 00405 dprintf_full("Rtp::pause\n"); 00406 if (getState() == IO::OPEN || 00407 getState() == IO::OPENING || 00408 getState() == IO::CLOSING || 00409 getState() == IO::STREAMEOF || 00410 getState() == IO::MUTED) { 00411 setState(IO::PAUSED); 00412 //well, we ignore preQ locking... just read... 00413 if (preQ.Q != NULL) { // popPreQ might hang in preQ.MUTEX_Q_DATA_AVAIL 00414 while (!threadWasPaused) //wait until RtpThread is really paused! 00415 msleep(10); 00416 } 00417 } 00418 00419 return getState(); 00420 } 00421 00422 /***************************************************************/ 00423 IO::State Rtp::mute() { 00424 dprintf_full("Rtp::mute\n"); 00425 if (getState() == IO::OPEN || 00426 getState() == IO::OPENING || 00427 getState() == IO::CLOSING || 00428 getState() == IO::STREAMEOF || 00429 getState() == IO::PAUSED) 00430 return setState(IO::MUTED); 00431 else 00432 return getState();} 00433 00434 00435 00436 /***************************************************************/ 00437 char *Rtp::getSSRC() { 00438 if (state != OPEN) 00439 return NULL; 00440 00441 char *result = new char[9]; 00442 sprintf(result, "%08x", rtp_my_ssrc(session)); 00443 return result; 00444 } 00445 00446 00447 /*****************************************************************/ 00448 bool Rtp::setEndFrameNumber(u32 frameNumber) { 00449 long incr = es->getVOPTimeIncrement(); 00450 00451 if (endFrameNumber == 0) 00452 flushBuffer(frameNumber * incr, -1); 00453 else 00454 flushBuffer(frameNumber * incr, endFrameNumber * incr); 00455 00456 endFrameNumber=frameNumber; 00457 return true; 00458 } 00459 00460 /*****************************************************************/ 00461 bool Rtp::setToFrameNumber(u32 frameNumber) { 00462 00463 flushBuffer(0, -1); 00464 currentFrameNumber=frameNumber; 00465 00466 return true; 00467 } 00468 00469 /*****************************************************************/ 00470 int Rtp::flushBuffer(long from_ts, long to_ts) { 00471 NetPacket_t *p=NULL; 00472 NetPacket_t *pr=NULL; 00473 int flushed_bytes=0; 00474 00475 00476 dprintf_full("Rtp::flushBuffer preQ from TS %li to TS %li\n",from_ts,to_ts); 00477 assert(to_ts == -1 || from_ts <= to_ts); 00478 preQ.MUTEX_Q.lock(); 00479 00480 if (to_ts < -1) 00481 to_ts = -1; //fix wrong inputs 00482 00483 if (isInput()) 00484 stats->setHighestPacketTS(0); 00485 p=pr=preQ.Q; 00486 while (p != NULL) { 00487 //dprintf_full(" checking rtp seq %i TS %i (%i elems left, next %p)\n", p->rtp_seq, p->rtp_ts,preQ.num_elems,p->next); 00488 if ( (p->rtp_ts >= (unsigned)from_ts) && 00489 ((to_ts == -1) || (p->rtp_ts <= (unsigned)to_ts)) ) { 00490 preQ.size -= p->pl_size; 00491 flushed_bytes += p->pl_size; 00492 preQ.num_elems--; 00493 if (p->last_frag) 00494 preQ.num_full_frames--; 00495 00496 preQ.avg_bw = 0; 00497 if (preQ.num_elems == 0) 00498 preQ.Q_tail = NULL; 00499 else 00500 if (preQ.num_full_frames > 0) 00501 preQ.avg_bw = (int)(preQ.size/preQ.num_full_frames * 00502 ((double)es->getMediaTimeScale()) / 00503 ((double)es->getVOPTimeIncrement())); 00504 00505 if (p == preQ.Q) 00506 preQ.Q = pr = p->next; 00507 else 00508 pr->next = p->next; 00509 00510 if (p == preQ.Q_tail) { 00511 preQ.Q_tail = pr; 00512 if (pr != NULL) //very last elem deleted 00513 pr->next = NULL; 00514 } 00515 00516 //free everything! 00517 assert(p->payload != NULL); 00518 delete p->payload; 00519 p->payload=NULL; 00520 delete p;p=NULL; 00521 00522 //keep old pr 00523 if(pr==NULL) 00524 p=NULL; 00525 else if (pr == preQ.Q) 00526 p=preQ.Q; 00527 else 00528 p=pr->next; 00529 00530 } else { 00531 if (isInput() && (stats->getHighestPacketTS() < p->rtp_ts)) { 00532 dprintf_full("Rtp::flushBuffer: set new highest TS: %i\n",p->rtp_ts); 00533 stats->setHighestPacketTS(p->rtp_ts); 00534 } 00535 pr=p; 00536 p=p->next; 00537 } 00538 } 00539 00540 preQ.MUTEX_Q_FULL.release(); 00541 if (preQ.Q != NULL) { //still data available 00542 preQ.MUTEX_Q_DATA_AVAIL.release(); 00543 } 00544 00545 preQ.MUTEX_Q.release(); 00546 return flushed_bytes; 00547 } 00548 00549 00550 /***************************************************************/ 00552 int Rtp::writeFrame(Frame * frm, ESInfo *out_es) { 00553 bool result = true; 00554 SLPacketList slpl; 00555 AU au; 00556 u8* newPayload=NULL; 00557 bool last = false; 00558 int counter = 0; 00559 00560 if (!writeToNetwork || state != OPEN || !pLayer || !frm) 00561 return 0; 00562 00563 dprintf_full("Rtp::writeFrame CTS %i prio %i size %i\n",frm->getAU()->cts, frm->getAU()->prio, frm->getAU()->size); 00564 00565 if (endFrameNumber>0 && frm->getAU()->cts / es->getVOPTimeIncrement() > endFrameNumber) { 00566 //this comes from unflushed DC senqueue, and is out of range, so drop it! 00567 dprintf_full("Rtp::writeFrame nr %i TS %i exceeds endFrameNumber %i, ignoring!\n",frm->getAU()->cts / es->getVOPTimeIncrement(),frm->getAU()->cts, endFrameNumber ); 00568 return 0; 00569 } 00570 00571 //HEADER DATA 00572 if ( (!firstFrame && resendFrameHeader ) 00573 || (firstFrame && !MP4Decoder::isValidDecoderConfig(frm->getAU()->payload)) ) { //HEADER already in first frame 00574 00575 resendFrameHeader=false; 00576 firstFrame=false; 00577 00578 //send extra header! 00579 u8* esHeaderBuffer=NULL; 00580 u32 headerSize=es->getHeaders(&esHeaderBuffer); 00581 if(headerSize>0) { 00582 newPayload=new u8[headerSize + frm->getAU()->size]; 00583 memcpy(newPayload, esHeaderBuffer, headerSize); 00584 00585 au.payload=newPayload; 00586 au.size=headerSize; 00587 au.cts=frm->getAU()->cts; 00588 au.dts=frm->getAU()->dts; 00589 au.duration=0; 00590 slpl=pLayer->createSLPacketList(&au); 00591 if(!slpl.empty()) { 00592 SLPacketList::iterator li = slpl.begin(); 00593 last = false; 00594 counter = 0; 00595 00596 dprintf_full("Rtp::writeFrame: HEADER (size %i) first 4 bytes of first Packet, CTS %u: %x %x %x %x\r\n", 00597 headerSize, frm->getAU()->cts, (*li)->getPayload()[0],(*li)->getPayload()[1], 00598 (*li)->getPayload()[2],(*li)->getPayload()[3]); 00599 00600 while(!slpl.empty()) { 00601 if(slpl.size()==1) { 00602 last=true; 00603 } 00604 li = slpl.begin(); 00605 result &= insertToPreQ((*li)->getPayload(), (*li)->getPayloadSize(), 00606 frm->getType(), au.size, IO_NETWORK_HIGHEST_PRIORITY, 00607 frm->getAU()->dts, counter, last, 0,Rtp::Q_TS); 00608 (*li)->unsetPayload(); //payload will be freed when ACKd 00609 delete *li; // here, there is no payload to free any more... 00610 (*li)=NULL; 00611 slpl.pop_front(); 00612 counter++; 00613 } 00614 } 00615 } 00616 if(esHeaderBuffer) { 00617 delete[] esHeaderBuffer; 00618 esHeaderBuffer=NULL; 00619 } 00620 } 00621 00622 //FRAME DATA 00623 slpl = pLayer->createSLPacketList(frm->getAU()); 00624 firstFrame=false; 00625 00626 if (slpl.empty()) { 00627 dprintf_full("Rtp::writeFrame() SLPacketList is empty! AU: size = %u, cts = %u\n", 00628 frm->getAU()->size, frm->getAU()->cts); 00629 return 1; //fake success, since the SLPacket is buffered and not yet full MTU reached... 00630 } 00631 SLPacketList::iterator li = slpl.begin(); 00632 00633 dprintf_full("Rtp::writeFrame: FRAME first 4 bytes of Frame, CTS %u: %x %x %x %x\r\n", 00634 frm->getAU()->cts, frm->getAU()->payload[0], 00635 frm->getAU()->payload[1], 00636 frm->getAU()->payload[2],frm->getAU()->payload[3]); 00637 last=false; 00638 //counter stays to old value (might be with header) 00639 00640 while(!slpl.empty()) { 00641 if(slpl.size()==1) { 00642 last=true; 00643 } 00644 li = slpl.begin(); 00645 result &= insertToPreQ((*li)->getPayload(), (*li)->getPayloadSize(), 00646 frm->getType(), frm->getAU()->size, frm->getAU()->prio, 00647 frm->getAU()->dts, counter, last, 0, Rtp::Q_TS); 00648 (*li)->unsetPayload(); //payload will be freed when ACKd 00649 delete *li; // here, there is no payload to free any more... 00650 (*li)=NULL; 00651 slpl.pop_front(); 00652 counter++; 00653 } 00654 00655 IOFrameStat.numProcessedFrames++; 00656 #ifdef _POSIX_PRIORITY_SCHEDULING 00657 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 00658 perror("Rtp::writeFrame YIELD"); 00659 } 00660 #else 00661 // for windows pthread lib! 00662 #if (defined WIN32) && (!defined WINCE) 00663 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 00664 dprintf_full("Rtp::writeFrame YIELD"); 00665 } 00666 #endif 00667 #endif 00668 return (int)result; 00669 } 00670 00671 00672 /***************************************************************/ 00673 bool Rtp::close(bool immediate) { 00674 if (state == CLOSED) 00675 return true; 00676 dprintf_full("Rtp::close: incoming state = %i\n",state); 00677 if ((state == CLOSING) && (immediate)) { //multiple closes 00678 dprintf_full("Rtp::close: multiple close... ignoring\n"); 00679 setState(FORCE_CLOSING); 00680 preQ.MUTEX_Q_DONE.release(); //signal that all data is sent 00681 sleep(2); //workaround to let datachannel::teardown wait! 00682 return true; 00683 } 00684 00685 if((state == OPEN) || (state == PAUSED) || (state == MUTED) 00686 || (state == PREFETCHING) || (state == STREAMEOF)) { 00687 if (!immediate) { 00688 setState(CLOSING); 00689 dprintf_full("Rtp::close waiting for correct sending/displaying (NOT IMMEDIATE)\n"); 00690 } else { //drop all frames, is IMMEDIATE 00691 dprintf_full("Rtp::close IMMEDIATE close requested\n"); 00692 setState(FORCE_CLOSING); 00693 } 00694 if (immediate) { //drop all frames, is IMMEDIATE 00695 NetPacket_t *p; 00696 00697 sendQupdate(-1); 00698 do { 00699 p = popPreQ(false); 00700 if ( (p != NULL) && (p->payload != NULL) ) { 00701 dprintf_full("freeing preQ Frame seq %i ts %i:%i\n", p->rtp_seq,p->rtp_ts,p->frag_no); 00702 delete p->payload; 00703 p->payload=NULL; 00704 delete p; 00705 p=NULL; 00706 } 00707 } while (p != NULL); 00708 preQ.MUTEX_Q_DONE.tryLock(); 00709 } 00710 00711 00712 dprintf_full("Rtp::close: waiting for data to be sent out...\n"); 00713 00714 preQ.MUTEX_Q.lock(); 00715 do { 00716 if (state == OPEN) //happens after unpause or unmute 00717 setState(CLOSING); 00718 //if (preQ.Q == NULL) //make sure that other thread in popPreQ exits 00719 preQ.MUTEX_Q_DATA_AVAIL.release(); // simulate data 00720 preQ.MUTEX_Q.release(); 00721 00722 dprintf_full("Rtp::close: give the other thread a chance to work off Q (state %i)\n",state); 00723 //give the other thread a chance to work off Q 00724 sleep(1); 00725 preQ.MUTEX_Q.lock(); 00726 } while ((state == CLOSING)||(state == PAUSED)||(state == MUTED)); 00727 preQ.MUTEX_Q.release(); 00728 00729 if (state == OPEN) //happens on unpausing (hereby rewinding) a already closed inputIO 00730 return false; 00731 00732 if (state != CLOSED) { 00733 // if (preQ.Q == NULL) //make sure that other thread in popPreQ exits 00734 preQ.MUTEX_Q_DATA_AVAIL.release(); //simulate data 00735 dprintf_full("Rtp::close: waiting for MUTEX_Q_DONE\n"); 00736 preQ.MUTEX_Q_DONE.lock(); //wait until all data is sent 00737 dprintf_full("Rtp::close: all frames sent, so we are done...\n"); 00738 setState(CLOSED); 00739 00740 00741 //FIXME: this crashes the app! 00742 if(session!=NULL) { 00743 rtp_send_bye(session); 00744 rtp_done(session); 00745 session = NULL; 00746 portGen.closePortPair(localPort); 00747 } 00748 if (rtxEnabled && rtx->session) { 00749 rtp_send_bye(rtx->session); 00750 rtp_done(rtx->session); 00751 rtx->session = NULL; 00752 portGen.closePortPair(rtx->localPort); 00753 } 00754 00755 // now delete mutex 00756 preQ.MUTEX_Q.destroy(); 00757 preQ.MUTEX_Q_DATA_AVAIL.destroy(); 00758 preQ.MUTEX_Q_FULL.destroy(); 00759 preQ.MUTEX_Q_DONE.destroy(); 00760 sendQ.MUTEX_Q.destroy(); 00761 } 00762 } 00763 return true; 00764 } 00765 00770 Frame *Rtp::getFrame() { 00771 struct AU *au=NULL; 00772 NetPacket_t *p=NULL; 00773 u8 *buf=NULL; 00774 long buf_siz=0; 00775 bool lastFrag; 00776 bool first_frag=true; 00777 bool old_ts=false; 00778 00779 currentFrameNumber++; 00780 framesSeen++; 00781 00782 dprintf_full("Rtp::getFrame %i (TS %i) of still buffered %i frames (total %i of %i TS %li)... expecting rtp_seq %i (state %i)\n", 00783 IOFrameStat.numProcessedFrames, IOFrameStat.act_ts, preQ.num_full_frames, 00784 currentFrameNumber, 00785 es->getNumberOfMediaSamples(),(long)es->getDuration(), 00786 IOFrameStat.seqno+1,state); 00787 00788 //PRE-buffer some frames 00789 struct timeval tv; 00790 double start_dtime, now_dtime=0,fetch_time=0; 00791 00792 IO::State oldState; 00793 00794 00795 gettimeofday(&tv,0); 00796 start_dtime = now_dtime = tv.tv_sec + (double)tv.tv_usec / 1000000.0; 00797 00798 if (!preQ.needsPrefetching && (stats->getBufAheadSec() <= RTP_BUF_PREFETCH) 00799 && ((state == OPEN) || (state == MUTED)) ) { 00800 dprintf_full("Rtp::getFrame: further PREFETCHING needed to fill empty buffers!\n"); 00801 preQ.needsPrefetching=true; 00802 stats->setStillToPrefetchSecs( stats->getStillToPrefetchSecs() + RTP_PREFETCH_SECS); 00803 flushBuffer(0,-1); //clean up all packets, since this will lead to a DataChannel Pause() 00804 } 00805 00806 // PREFETCHING ########################################## 00807 if (preQ.needsPrefetching) { //initial or addtional prefetch is needed... 00808 bool done=false; 00809 oldState = state; 00810 setState(PREFETCHING); 00811 if (globalTimer) 00812 globalTimer->setPrefetching(true); 00813 fetch_time=0; 00814 do { 00815 //dprintf_full("...prefetching: left %f done %i needsPrefetch %i....state %i\n", 00816 // stats->getStillToPrefetchSecs(), done,preQ.needsPrefetching,state); 00817 msleep(10); 00818 gettimeofday(&tv,0); 00819 now_dtime = tv.tv_sec + (double)tv.tv_usec / 1000000.0; 00820 if (state == STREAMERR) { 00821 dprintf_err("Rtp::getFrame() STREAMERR occured during prefetching, returning NULL\n"); 00822 if (globalTimer) 00823 globalTimer->setPrefetching(false); 00824 return NULL; 00825 } 00826 if ((state != PREFETCHING) && (state != PAUSED)) { 00827 dprintf_full("Rtp::getFrame: continued PREFETCHING requested! left %f state %i\n", 00828 stats->getStillToPrefetchSecs(),state); 00829 //preQ.needsPrefetching=false; 00830 done=false; 00831 setState(PREFETCHING); // continue with prefetching... 00832 } 00833 if (firstFrame) { 00834 double fpt = stats->getFirstPacketTime(); 00835 double gstps = stats->getStillToPrefetchSecs(); 00836 if ( (fpt != 0) && //only take from real first frame 00837 (now_dtime - fpt >= gstps) ) { 00838 fetch_time = now_dtime - fpt; 00839 done=true; 00840 } 00841 } else { //intermediate prefetching 00842 //minor FIXME: pausing during intermediate Prefetching will shorten prefetching 00843 if (now_dtime - start_dtime >= stats->getStillToPrefetchSecs()) { 00844 fetch_time = now_dtime - start_dtime; 00845 done=true; 00846 } 00847 } 00848 } while ( !done && ((state == PREFETCHING) || (state == OPEN)|| 00849 (state == MUTED)|| (state == PAUSED)) ); 00850 00851 if (state == PREFETCHING) 00852 setState(oldState); 00853 stats->setPrefetchedSecs(stats->getPrefetchedSecs() + fetch_time); //fine tune spent time 00854 stats->setStillToPrefetchSecs(0); 00855 stats->setBufAheadSec(((double)(stats->getHighestPacketTS()-stats->getFirstPacketTS()) / (double)es->getMediaTimeScale())); 00856 preQ.needsPrefetching=false; 00857 dprintf_full("Rtp::getFrame: prefetching of %2.3f secs (%2.3f real data recv secs)" 00858 " brought us %2.3f secs of %s! state %i, prefetched_secs: %f\n", 00859 now_dtime - start_dtime, fetch_time, 00860 stats->getBufAheadSec(), es->isVisualStream()?"Video":"Audio",state, 00861 stats->getPrefetchedSecs()); 00862 if (globalTimer) 00863 globalTimer->setPrefetching(false); 00864 } 00865 00866 // MERGE FRAME FROM PACKETS ############################### 00867 do { 00868 p=popPreQ(); 00869 if(p==NULL) //this is the end.... my friend... 00870 return NULL; 00871 if (p==(Rtp::NetPacket_t *)-1) //FIXME: after unpause, the Q was empty! this will force PREFETCHING 00872 return this->getFrame(); 00873 00874 dprintf_full("Rtp::getFrame popped rtp_seq %i, TS %i, last_frag %i size %i\n", 00875 p->rtp_seq,p->rtp_ts,p->last_frag,p->pl_size); 00876 00877 if (IOFrameStat.seqno == -1) // init the rtp sequence 00878 IOFrameStat.seqno = p->rtp_seq; 00879 else 00880 IOFrameStat.seqno++; 00881 00882 if (first_frag) { //first AU fragment 00883 if ( IOFrameStat.act_ts > (int)p->rtp_ts) { 00884 dprintf_full("DELETING left overs from previous frames\n"); 00885 old_ts = true; //left overs from previous frames 00886 } //else 00887 //IOFrameStat.act_ts = p->rtp_ts; 00888 first_frag = false; 00889 } 00890 00891 if (old_ts || ((unsigned)IOFrameStat.seqno != p->rtp_seq)) { //old fragments or incomplete AU 00892 dprintf_err("Rtp::getFrame expected rtp_seq %i, I got %i\n",IOFrameStat.seqno, p->rtp_seq); 00893 00894 00895 //drop all other following fragments of incomplete AU 00896 //ATTN: this will read one more pkt from the NEXT frame and has to push it back! 00897 while ((IOFrameStat.act_ts > (int)p->rtp_ts) 00898 || (!old_ts && (IOFrameStat.act_ts == (int)p->rtp_ts)) ) { 00899 dprintf_full("Rtp::getFrame popped for dropping: rtp_seq %i, TS %i, last_frag %i\n", 00900 p->rtp_seq,p->rtp_ts,p->last_frag); 00901 delete p->payload; 00902 p->payload = NULL; 00903 delete p; 00904 p = NULL; 00905 p=popPreQ(); 00906 if(p==NULL) 00907 return NULL; //this is the end.... my friend... 00908 if (p==(Rtp::NetPacket_t *)-1) //FIXME: after unpause, the Q was empty! this will force PREFETCHING 00909 return this->getFrame(); 00910 } 00911 00912 if ((unsigned)IOFrameStat.seqno != p->rtp_seq) { 00913 //pushback to preQ 00914 dprintf_err("LOSS: Not all fragments of TS %i where available\n",IOFrameStat.act_ts); 00915 addToPreQ(p,Q_TS,RTP_REINSERT); 00916 IOFrameStat.seqno = p->rtp_seq - 1; //restart seq_no from here 00917 if(buf) { 00918 delete[] buf; 00919 buf=NULL; 00920 } 00921 //try to get the next frame 00922 return this->getFrame(); 00923 } 00924 } 00925 00926 dprintf_full("Rtp::getFrame CONT'd with popped rtp_seq %i, TS %i, last_frag %i\n", 00927 p->rtp_seq,p->rtp_ts,p->last_frag); 00928 IOFrameStat.act_ts = p->rtp_ts; 00929 lastFrag = p->last_frag; 00930 if(es->isAudioStream()) { //RFC2250 specific headers need to be removed 00931 //the header info is not utilized currently 00932 p->pl_size -= 4; 00933 buf=(u8*)realloc(buf, buf_siz + p->pl_size); 00934 memcpy(buf+buf_siz, p->payload + 4, p->pl_size); 00935 } 00936 else { 00937 buf=(u8*)realloc(buf,buf_siz + p->pl_size); 00938 memcpy(buf+buf_siz,p->payload,p->pl_size); 00939 } 00940 buf_siz+=p->pl_size; 00941 delete p->payload; 00942 p->payload=NULL; 00943 delete p; 00944 p=NULL; 00945 } while (lastFrag == 0); 00946 00947 dprintf_full("Rtp::getFrame DONE TS %i\n", IOFrameStat.act_ts); 00948 00949 00950 //IF VIDEO ################################################3 00951 if (es->isVisualStream()) 00952 if ((buf[3]==0xb0) && MP4Decoder::isValidDecoderConfig(buf)) { 00953 u8* tmp=buf+4; 00954 bool stop=false; 00955 // received a header! 00956 dprintf_full("Rtp::getFrame received header\n"); 00957 // some servers, like the ITECMP4Server send header+payload in one packet 00958 // catch this case 00959 while(!stop && tmp < (buf+buf_siz) && 00960 (tmp = (u8*)memchr(tmp, 0xb6, (buf+buf_siz)-tmp)) ) { 00961 if(tmp[-1] == 1 && tmp[-2] == 0 && tmp[-3] == 0) { 00962 stop=true; 00963 tmp-=3; 00964 } else 00965 tmp++; //avoid double findings 00966 } 00967 if (tmp) 00968 es->setDecoderConfig(buf,tmp-buf); 00969 else 00970 es->setDecoderConfig(buf,buf_siz); 00971 00972 00973 if(stop) { 00974 dprintf_full("Rtp::getFrame compound header %x %x %x %x (%i bytes) + frame (%li bytes) %x %x %x %x!\n" 00975 ,buf[0],buf[1],buf[2],buf[3], tmp-buf, buf_siz-(tmp-buf), 00976 tmp[0],tmp[1],tmp[2],tmp[3]); 00977 //fix everything up for final CompressedVideoFrame() creation 00978 buf_siz -= tmp-buf; 00979 u8* new_buf = new u8[buf_siz]; 00980 memcpy(new_buf,tmp,buf_siz); 00981 delete[] buf; 00982 buf = new_buf; 00983 } else { 00984 delete[] buf; buf=NULL; 00985 return this->getFrame(); //get the real frame 00986 } 00987 } else if ( !MP4Decoder::isValidVisualFrame(buf)) { 00988 dprintf_err("Rtp::getFrame LOSS: Not all frags of TS %i avail (broke header/missing first packet)\n", 00989 IOFrameStat.act_ts); 00990 delete[] buf; 00991 buf=NULL; 00992 //try to get the next frame 00993 return this->getFrame(); 00994 } 00995 00996 //IF AUDIO ################################################3 00997 if (es->isAudioStream()) 00998 if (MP4audioDecoder::isValidAudioFrame(buf)) { 00999 dprintf_full("Rtp::getFrame received audio frame header\n"); 01000 //Every audio frame consists of a 4 byte header + payload. These together comprise 01001 //the payload of the access unit. 01002 if(buf_siz == MPEG_AUDIO_HEADER_SIZE) { 01003 es->setDecoderConfig(buf,buf_siz); 01004 delete[] buf; buf=NULL; 01005 return this->getFrame(); 01006 } 01007 else { 01008 //FIXME: Decoder configuration may be required to be set for VBR audio for every frame!!! 01009 if (firstFrame) 01010 this->es->setDecoderConfig(buf,4); //"4" is for mpeg audio only ..... may be diff for aac 01011 } 01012 } else {// not valid audio 01013 dprintf_err("Rtp::getFrame LOSS: Not all audio frags of TS %i avail (broke header/missing first packet)\n", 01014 IOFrameStat.act_ts); 01015 delete[] buf; buf=NULL; 01016 //try to get the next frame 01017 return this->getFrame(); 01018 } 01019 01020 //GENERAL FULL FRAME CREATION ####################### 01021 IOFrameStat.numProcessedFrames++; 01022 01023 //fill the merged access unit 01024 au = new AU(); 01025 au->payload = buf; 01026 au->size = buf_siz; 01027 au->cts = au->dts = (u32) IOFrameStat.act_ts + (u32) ts_offset; 01028 Frame* f=NULL; 01029 if(es->isVisualStream()) 01030 f = new CompressedVideoFrame(Frame::NN_VOP, ((VideoESInfo*)es)->getWidth(), 01031 ((VideoESInfo*)es)->getHeight()); 01032 else if(es->isAudioStream()) 01033 f = new CompressedAudioFrame(Frame::NN_VOP); 01034 else 01035 f = new CompressedVideoFrame(Frame::NN_VOP, 0, 0); 01036 01037 f->setAU(au); //FrameType is set here automatically 01038 f->setMediaTimeScale(es->getMediaTimeScale()); //used by GlobalTimer Adaptor 01039 01040 dprintf_full("Rtp::getFrame got size %8i frameType %s TS %i presTime: %2.3f\n",f->getAU()->size, 01041 Frame::VOPTypeToChar(f->getType()),IOFrameStat.act_ts, 01042 (double)IOFrameStat.act_ts / es->getMediaTimeScale()); 01043 01044 if (firstFrame) { 01045 // not a valid ESHeader as first packet? 01046 // do we have one from the RTSP? 01047 if(this->es->getEncodedDecoderConfig()==NULL) { 01048 dprintf_err("FATAL: no header found!! was: 0x%x%x%x%x\n", 01049 buf[0],buf[1],buf[2],buf[3]); 01050 setState(STREAMERR); 01051 delete[] buf; buf=NULL; 01052 delete f; f=NULL; 01053 delete au; au=NULL; 01054 close(true); //immediate 01055 return NULL; //this is the end... my friend... 01056 } 01057 firstFrame = false; 01058 } 01059 #ifdef _POSIX_PRIORITY_SCHEDULING 01060 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 01061 perror("Rtp::writeFrame YIELD"); 01062 } 01063 #else 01064 // for windows pthread lib! 01065 #if (defined WIN32) && (!defined WINCE) 01066 if (sched_yield() != 0) { //this is necessary to give sendThread more realtime performance! 01067 dprintf_full("Rtp::writeFrame YIELD"); 01068 } 01069 #endif 01070 #endif 01071 01072 return f; 01073 } 01074 01075 01076 /***************************************************************/ 01077 void Rtp::ucl_update(u32 ts, u32 timeout) { 01078 struct timeval tv; 01079 01080 dprintf_full("Rtp::ucl_update TS %i\n",ts); 01081 if (session) { 01082 rtp_update(session); 01083 rtp_send_ctrl(session, ts, NULL, RTCP_NORMAL); 01084 /* Receive control and data packets */ 01085 tv.tv_sec = 0; 01086 tv.tv_usec = timeout*1000; 01087 if (rtxEnabled) 01088 tv.tv_usec = tv.tv_usec >> 1; //wait half of the time on rtp 01089 assert(session); 01090 rtp_recv(session, &tv, ts); 01091 01092 if (rtxEnabled) { 01093 // dprintf_full("Rtp::ucl_update on RTX at TS %i\n",ts); 01094 rtp_send_ctrl(rtx->session, ts, NULL, RTCP_NORMAL); 01095 /* eventually receive NACK packets again */ 01096 tv.tv_sec = 0; 01097 tv.tv_usec = timeout*1000 >> 1; //wait other half of the time on rtcp 01098 assert(rtx->session); 01099 rtp_recv(rtx->session, &tv, ts); 01100 /* do maintenance */ 01101 rtp_update(rtx->session); 01102 } 01103 bool ret; 01104 int rounds=0; 01105 do { 01106 tv.tv_sec = 0; 01107 tv.tv_usec = 1; //wait nearly no time 01108 ret=rtp_recv(session, &tv, ts); //try to get as much data as possible 01109 if (rtxEnabled) { 01110 tv.tv_sec = 0; 01111 tv.tv_usec = 1; //wait nearly no time 01112 ret=rtp_recv(rtx->session, &tv, ts); 01113 } 01114 rounds++; 01115 } while(session && ret && (rounds < 10)); //continue as long as data or rtx avail, or 10 pks where read in a row 01116 } 01117 } 01118 01119 /***************************************************************/ 01120 void Rtp::fb_print(struct rtp *session, uint32_t ssrc, rtcp_fb *fb) { 01121 int i; 01122 01123 if (ssrc == rtp_my_ssrc(session)) 01124 return; 01125 dprintf_full("RX_FB SSRC = 0x%08x ", ssrc); 01126 switch (fb->subtype) { 01127 case RTCP_FB_FMT_ACK: 01128 dprintf_err("ACK BLP from %6i: ",fb->fci.fb_ack.pid); 01129 if (fb->fci.fb_ack.r == 1) { //range 01130 dprintf_full(" to %6i",fb->fci.fb_ack.blp+fb->fci.fb_ack.pid); 01131 } else //bitmap 01132 for (i=0; i < 15; i++) { 01133 if ((fb->fci.fb_ack.blp & ((uint64_t)1 << i)) != 0) { 01134 dprintf_full("r"); 01135 } else { 01136 dprintf_full("."); 01137 } 01138 } 01139 break; 01140 case RTCP_FB_FMT_NACK: 01141 dprintf_full("NACK BLP from %6i: ",fb->fci.fb_ack.pid); 01142 for (i=0; i < 15; i++) { 01143 if ((fb->fci.fb_nack.blp & ((uint64_t)1 << i)) != 0) { 01144 dprintf_full("D"); 01145 } else { 01146 dprintf_full("."); 01147 } 01148 } 01149 break; 01150 default: 01151 dprintf_err("FATAL: unknown Feedback Format!"); 01152 ::exit(1); 01153 } 01154 dprintf_full("\n"); 01155 } 01156 01157 /************************************************************** 01158 * called by: server-side ucl_update() 01159 */ 01160 void Rtp::fb_handle(struct rtp *session, uint32_t ssrc, rtcp_fb *fb) { 01161 int i; 01162 NetPacket_t *p=NULL; 01163 Rtp *caller=NULL; 01164 struct timeval now_tv; 01165 double now_dtime; 01166 int curr_rtt; 01167 int last_nack_seq=0; 01168 01169 01170 if (ssrc == rtp_my_ssrc(session)) 01171 return; 01172 01173 caller=(Rtp*)rtp_get_userdata(session); 01174 dprintf_full("RX_FB SSRC = 0x%08x \n", ssrc); 01175 01176 if (session == caller->session) { // feedback from DATA session 01177 switch (fb->subtype) { 01178 case RTCP_FB_FMT_ACK: 01179 dprintf_full("RTCP RX_FB: ACK BLP from %6i: ",fb->fci.fb_ack.pid); 01180 p=caller->popNetPacket(fb->fci.fb_ack.pid); 01181 if (p) { 01182 p->acked=true; 01183 caller->pushNetPacket(p); 01184 } 01185 if (fb->fci.fb_ack.r == 1) { //range 01186 dprintf_full(" to %6i",fb->fci.fb_ack.blp+fb->fci.fb_ack.pid); 01187 for (i=1; i <= fb->fci.fb_ack.blp; i++) { 01188 p=caller->popNetPacket(fb->fci.fb_ack.pid+i); 01189 if (p) { 01190 p->acked=true; 01191 caller->pushNetPacket(p); 01192 } 01193 } 01194 } else //bitmap 01195 for (i=0; i < 15; i++) { 01196 if ((fb->fci.fb_ack.blp & ((uint64_t)1 << i)) != 0) { 01197 p=caller->popNetPacket(fb->fci.fb_ack.pid+i+1); 01198 if (p) { 01199 p->acked=true; 01200 caller->pushNetPacket(p); 01201 } 01202 } 01203 } 01204 break; 01205 case RTCP_FB_FMT_NACK: 01206 last_nack_seq = fb->fci.fb_nack.pid; 01207 dprintf_full("RTCP RX_FB: resending NACK BLP from %6i:\n",last_nack_seq); 01208 p=caller->popNetPacket(last_nack_seq); 01209 caller->reinsertToPreQ(p,last_nack_seq); 01210 01211 01212 //now add the bitmapped NACKs 01213 for (i=0; i < 15; i++) { 01214 if ((fb->fci.fb_nack.blp & ((uint64_t)1 << i)) != 0) { 01215 last_nack_seq = fb->fci.fb_nack.pid+i+1; 01216 dprintf_full("RTCP RX_FB: further resending NACK BLP from %6i:\n",last_nack_seq); 01217 p=caller->popNetPacket(last_nack_seq); 01218 caller->reinsertToPreQ(p,last_nack_seq); 01219 } 01220 } 01221 break; 01222 default: 01223 dprintf_err("FATAL: unknown Feedback Format!"); 01224 ::exit(1); 01225 } 01226 dprintf_full("\n"); 01227 } else /*************************************************/ 01228 if (caller->rtx && (session == caller->rtx->session)) { //lost RTX packets are also reinserted! 01229 switch (fb->subtype) { 01230 case RTCP_FB_FMT_ACK: 01231 dprintf_full("RTCP RX_FB: RTX ACK BLP from %6i: NOT SUPPORTED",fb->fci.fb_ack.pid); 01232 break; 01233 case RTCP_FB_FMT_NACK: 01234 last_nack_seq = fb->fci.fb_nack.pid; 01235 dprintf_full("RTCP RX_FB: resending RTX NACK BLP from %6i:\n",last_nack_seq); 01236 p=caller->popNetPacketByRTXseq(last_nack_seq); 01237 caller->reinsertToPreQ(p,last_nack_seq); 01238 01239 //now add the bitmapped NACKs 01240 for (i=0; i < 15; i++) { 01241 if ((fb->fci.fb_nack.blp & ((uint64_t)1 << i)) != 0) { 01242 last_nack_seq = fb->fci.fb_nack.pid+i+1; 01243 01244 dprintf_full("RTCP RX_FB: further resending RTX NACK BLP from %6i:\n",last_nack_seq); 01245 p=caller->popNetPacketByRTXseq(last_nack_seq); 01246 caller->reinsertToPreQ(p,last_nack_seq); 01247 } 01248 } 01249 break; 01250 default: 01251 dprintf_err("FATAL: unknown Feedback Format!"); 01252 ::exit(1); 01253 } 01254 } else { /*************************************************/ 01255 dprintf_err("FATAL: unknown session 0x%08x is sending feedback!", ssrc); 01256 ::exit(1); 01257 } 01258 01259 //use only LAST nack'd packet for RTT! 01260 //pick out the NEXT packet, which was definitely recv'd by the client, this one 01261 //is the best candidate for correct RTT calculations! 01262 if (caller->rtx && session == caller->rtx->session) //lost RTX packets are also reinserted! 01263 p=caller->popNetPacketByRTXseq(last_nack_seq+1); 01264 else 01265 p=caller->popNetPacket(last_nack_seq+1); 01266 01267 if (p!=NULL) { 01268 gettimeofday(&now_tv,NULL); 01269 now_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 01270 if (p->resent_dtime == 0) //fb for first try 01271 curr_rtt = (int)((now_dtime - p->sent_dtime)*1000); 01272 else //fb of already RTX'd packet 01273 curr_rtt = (int)((now_dtime - p->resent_dtime)*1000); 01274 01275 caller->stats->setAvgRTT(curr_rtt); 01276 dprintf_full("RTCP RX_FB: RTT curr: %i avg: %i\n", curr_rtt, caller->stats->getAvgRTT()); 01277 } 01278 } 01279 01280 /************************************************************** 01281 * called by reader-side ucl_update() 01282 */ 01283 void Rtp::rtp_data_handle(struct rtp *session, uint32_t ssrc, rtp_packet *packet) { 01284 char *buf=NULL; 01285 int data_len; 01286 Rtp *caller=NULL; 01287 uint16_t rtp_seq=0; 01288 uint16_t rtx_rtp_seq=0; 01289 struct timeval now_tv; 01290 01291 if (ssrc == rtp_my_ssrc(session)) 01292 return; 01293 01294 caller=(Rtp*)rtp_get_userdata(session); 01295 // printf("RX_RTP DATA SSRC = 0x%08x \n", ssrc); 01296 01297 if (caller->getState() == PAUSED) //ignore last packet, which are sent shortly before pause! 01298 return; 01299 01300 gettimeofday(&now_tv,NULL); 01301 caller->interarrival_time = (int)(1000* (now_tv.tv_sec + now_tv.tv_usec/1000000.0 01302 - caller->stats->getLastPacketTime())); 01303 caller->stats->setAvgInterarrivalTime(caller->interarrival_time); 01304 01305 caller->stats->setLastPacketTime(now_tv.tv_sec + now_tv.tv_usec/1000000.0); 01306 if (caller->stats->getFirstPacketTime() == 0) { 01307 caller->stats->setFirstPacketTime(caller->stats->getLastPacketTime()); 01308 caller->interarrival_time = 0; //reset to zero 01309 //if the server doesnt start at TS=0, but later, fix the offset.... 01310 caller->ts_offset = packet->ph.ph_ts; //FIXME: problematic, if first packet comes out of order 01311 caller->stats->setFirstPacketTS(0); // as packet->ph.ph_ts will be reduced by ts_offset below 01312 //FIXME: stats->avgInterarrivalTime cannot be reset... 01313 } 01314 packet->ph.ph_ts-= caller->ts_offset; //always adjust TS to first detected offset 01315 01316 int this_frame_sec = (int)floor((double)packet->ph.ph_ts / caller->es->getMediaTimeScale()); 01317 int highest_ts_sec = (int)floor((double)caller->stats->getHighestPacketTS() 01318 / caller->es->getMediaTimeScale()); 01319 if (caller->stats->getHighestPacketTS() < packet->ph.ph_ts) { 01320 caller->stats->checkForNewPlayoutSec(this_frame_sec, highest_ts_sec); 01321 01322 //dprintf_full("Rtp::rtp_data_handle: set new highest TS: %i\n",packet->ph.ph_ts); 01323 caller->stats->setHighestPacketTS(packet->ph.ph_ts); 01324 } 01325 01326 if (this_frame_sec > (signed)highest_ts_sec - RTP_STAT_SECS) { //not out of range 01327 if (packet->ph.ph_m) 01328 caller->stats->incPlayoutSecNetFPS(this_frame_sec); 01329 } 01330 01331 //make copy of rtp payload, since rtp pkt will be freed later 01332 if (caller->session == session) { // normal DATA packet 01333 caller->stats->incStreamoutSecDataBW(caller->stats->getStreamoutFullSec(), 01334 sizeof(rtp_packet_header) + packet->pd.rtp_pd_data_len); 01335 caller->stats->incStreamoutSecDataPkts(caller->stats->getStreamoutFullSec()); 01336 01337 if (this_frame_sec > (signed)highest_ts_sec - RTP_STAT_SECS) { //not out of range 01338 caller->stats->incPlayoutSecDataBW(this_frame_sec, 01339 sizeof(rtp_packet_header) + packet->pd.rtp_pd_data_len); 01340 caller->stats->incPlayoutSecDataPkts(this_frame_sec); 01341 } 01342 01343 data_len = packet->pd.rtp_pd_data_len; 01344 buf=(char*)malloc(data_len); 01345 assert(buf != NULL); 01346 memcpy(buf,packet->pd.rtp_pd_data, data_len); 01347 rtp_seq = packet->ph.ph_seq; 01348 dprintf_full("Rtp::rtp_data_handle: received rtp packet seq no %i)\n",rtp_seq); 01349 01350 if (caller->first_rtp_seq < 0) { //initialize 01351 caller->first_rtp_seq = rtp_seq; 01352 caller->last_rtp_seq = rtp_seq; 01353 } else 01354 if (++caller->last_rtp_seq != rtp_seq) { // FB only on NACK 01355 if (caller->feedbackEnabled) { 01356 // send immediate rtcp feedback packet 01357 dprintf_err("sending NACK for rtp seq %i up to seq %i\n", caller->last_rtp_seq, rtp_seq-1); 01358 rtp_send_ctrl(caller->session, packet->ph.ph_ts, NULL, RTCP_IMMEDIATE); 01359 } 01360 caller->last_rtp_seq = rtp_seq; 01361 } 01362 } else // RTX packet 01363 if ( (caller->rtxEnabled) && (caller->rtx->session == session)) { 01364 uint16_t* seq_p=NULL; 01365 01366 caller->stats->incStreamoutSecRtxBW(caller->stats->getStreamoutFullSec(), 01367 sizeof(rtp_packet_header) + packet->pd.rtp_pd_data_len); 01368 caller->stats->incStreamoutSecRtxPkts(caller->stats->getStreamoutFullSec()); 01369 01370 if (this_frame_sec > (signed)highest_ts_sec - RTP_STAT_SECS) { //not out of range 01371 caller->stats->incPlayoutSecRtxPkts(this_frame_sec); 01372 caller->stats->incPlayoutSecRtxBW(this_frame_sec, 01373 sizeof(rtp_packet_header) + packet->pd.rtp_pd_data_len); 01374 } 01375 01376 rtx_rtp_seq = packet->ph.ph_seq; 01377 seq_p = (uint16_t*)packet->pd.rtp_pd_data; 01378 rtp_seq = ntohs(*seq_p); 01379 01380 dprintf_full("RECV'd retransmitted RTX packet %i (RTX seq no %i) size %i\n", 01381 rtp_seq,rtx_rtp_seq,packet->pd.rtp_pd_data_len); 01382 data_len = packet->pd.rtp_pd_data_len - sizeof(uint16_t); 01383 buf=(char*)malloc(data_len); 01384 assert(buf != NULL); 01385 memcpy(buf,packet->pd.rtp_pd_data + sizeof(uint16_t), data_len); 01386 01387 if (caller->first_rtx_rtp_seq < 0) { //initialize 01388 caller->first_rtx_rtp_seq = rtx_rtp_seq; 01389 caller->last_rtx_rtp_seq = rtx_rtp_seq; 01390 } else 01391 if (++caller->last_rtx_rtp_seq != rtx_rtp_seq) { // FB only on NACK 01392 if (caller->feedbackEnabled) { 01393 // send immediate rtcp feedback packet 01394 dprintf_err("sending NACK for RTX rtp seq %i up to seq %i\n", 01395 caller->last_rtx_rtp_seq,rtx_rtp_seq-1); 01396 rtp_send_ctrl(caller->rtx->session, packet->ph.ph_ts, NULL, RTCP_IMMEDIATE); 01397 } 01398 caller->last_rtx_rtp_seq = rtx_rtp_seq; 01399 } 01400 } else { 01401 dprintf_err("FATAL: recv'd data from unknown RTP session!"); 01402 ::exit(1); 01403 } 01404 01405 01406 //add pkt sorted by TS (and frag_no==rtp_seq), insert with unknown VOP_type, vop_size and prio 01407 caller->insertToPreQ((u8*)buf, data_len, Frame::NN_VOP, -1, -1, 01408 packet->ph.ph_ts, rtp_seq, packet->ph.ph_m, rtp_seq, Rtp::Q_TS); 01409 01410 } 01411 01412 /***************************************************************/ 01413 void Rtp::RtpCallback(struct rtp *session, rtp_event * e) { 01414 Rtp *caller = NULL; 01415 rtp_packet *rtp_pkt=NULL; 01416 rtcp_fb *fb=NULL; 01417 rtcp_rr *rr=NULL; 01418 NetPacket_t *p=NULL; 01419 int ts; 01420 double this_rr_lost; 01421 01422 if (e->ssrc == rtp_my_ssrc(session)) 01423 return; 01424 01425 switch (e->type) { 01426 case RX_RTP: 01427 //dprintf_full("RX_RTP SSRC = 0x%08x\n", e->ssrc); 01428 rtp_pkt = (rtp_packet*)e->data; 01429 caller=(Rtp*)rtp_get_userdata(session); 01430 if (caller->isInput()) { //only client handles incoming data! 01431 Rtp::rtp_data_handle(session,e->ssrc, rtp_pkt); 01432 if (caller->session == session) 01433 caller->sender_ssrc = e->ssrc; //set sender of normal RTP channel 01434 } else 01435 dprintf_err("Rtp::RtpCallback incoming data on Server-side???? *wonders*\n"); 01436 free(e->data); 01437 break; 01438 case RX_SDES: 01439 //dprintf_full("RX_SDES SSRC = 0x%08x\n", e->ssrc); 01440 break; 01441 case RX_BYE: 01442 dprintf_full("RX_BYE SSRC = 0x%08x\n", e->ssrc); 01443 break; 01444 case SOURCE_CREATED: 01445 //dprintf_full("New source created, SSRC = 0x%08x\n", e->ssrc); 01446 break; 01447 case SOURCE_DELETED: 01448 // dprintf_full("Source deleted, SSRC = 0x%08x state=%i\n", e->ssrc,caller->getState()); 01449 caller=(Rtp*)rtp_get_userdata(session); 01450 dprintf_full("Source deleted, SSRC = 0x%08x... state %i\n", e->ssrc,caller->state); 01451 if ( (caller->state != CLOSED) && (caller->state != CLOSING) 01452 && (caller->state != FORCE_CLOSING) ) { 01453 if (caller->isOutput()) 01454 caller->setState(STREAMERR); 01455 else 01456 caller->setState(STREAMEOF); 01457 caller->preQ.MUTEX_Q_DATA_AVAIL.release(); //simulate data 01458 } 01459 break; 01460 case RX_SR: 01461 //dprintf_full("RX_SR SSRC = 0x%08x\n", e->ssrc); 01462 caller=(Rtp*)rtp_get_userdata(session); 01463 break; 01464 case RX_RR: 01465 rr = (rtcp_rr*)e->data; 01466 caller=(Rtp*)rtp_get_userdata(session); 01467 01468 if (caller->session == session) { //only data channel, not RTX 01469 this_rr_lost = (double)(rr->fract_lost<<8)/(1<<16) * 100; 01470 if (caller->last_rr_sum_pkts_lost > -1) { 01471 int sum_iv=rr->last_seq - caller->last_rr_seq_no; 01472 int lost_iv=rr->total_lost - caller->last_rr_sum_pkts_lost; 01473 caller->last_rr_lost_fract = (double)lost_iv / (double)sum_iv * 100; 01474 } else //use total values for first lost_fract 01475 caller->last_rr_lost_fract = (double)rr->total_lost 01476 / (double)(rr->last_seq - caller->first_rtp_seq) * 100; 01477 caller->last_rr_sum_pkts_lost = rr->total_lost; 01478 caller->last_rr_seq_no = rr->last_seq; 01479 01480 dprintf_full("RX_RR SSRC = 0x%08x Delay to last RR: %2.3f secs, " 01481 "PacketLoss fract %3.3f%% myFract %3.3f%% total loss: %i pkts of %lli pkts\n", 01482 e->ssrc, 01483 caller->stats->getStreamoutFloatSec() - caller->last_rr_received, 01484 this_rr_lost, 01485 caller->last_rr_lost_fract, rr->total_lost, 01486 caller->last_rr_seq_no - caller->first_rtp_seq ); 01487 01488 caller->last_rr_received = caller->stats->getStreamoutFloatSec(); 01489 01490 //grab packet, get TS, push back... 01491 p = caller->popNetPacket(rr->last_seq); 01492 if (p) { 01493 ts = p->rtp_ts; 01494 caller->pushNetPacket(p); 01495 dprintf_full("RX_RR SSRC = 0x%08x RR from rtp seq %i (TS: %i)\n", 01496 e->ssrc, rr->last_seq, ts); 01497 } else 01498 dprintf_full("RX_RR SSRC = 0x%08x from rtp seq %i, which is allready off sendQ\n", 01499 e->ssrc,rr->last_seq); 01500 } 01501 break; 01502 case RX_RR_EMPTY: 01503 //dprintf_full("RX_RR_EMPTY SSRC = 0x%08x\n", e->ssrc); 01504 break; 01505 case RX_RTCP_START: 01506 //printf("RX_RTCP_START\n"); 01507 break; 01508 case RX_RTCP_FINISH: 01509 //printf("RX_RTCP_FINISH\n"); 01510 break; 01511 case RR_TIMEOUT: 01512 //dprintf_full("RR_TIMEOUT SSRC = 0x%08x\n", e->ssrc); 01513 break; 01514 case RX_APP: 01515 //dprintf_full("RX_APP SSRC = 0x%08x\n", e->ssrc); 01516 break; 01517 case RX_FB: 01518 //dprintf_full("RX_FB SSRC = 0x%08x\n", e->ssrc); 01519 fb = (rtcp_fb*)e->data; 01520 //Rtp::fb_print(session,e->ssrc, fb); 01521 Rtp::fb_handle(session,e->ssrc, fb); 01522 free(e->data); 01523 break; 01524 default: 01525 break; 01526 } 01527 } 01528 01529 /***************************************************************/ 01530 bool Rtp::pushNetPacket(Rtp::NetPacket_t *p) { 01531 int slot; 01532 01533 slot = p->rtp_seq % RTP_SENDQ_PRIME; 01534 01535 sendQ.MUTEX_Q.lock(); 01536 sendQ.size += p->pl_size; 01537 sendQ.num_elems++; 01538 p->next = sendQ.Q[slot]; 01539 sendQ.Q[slot] = p; 01540 01541 //DEBUG PRINTOUT QUEUE 01542 /* 01543 for (int i=0;i<RTP_SENDQ_PRIME;i++) { 01544 NetPacket_t *d = sendQ.Q[i]; 01545 dprintf_full("sendQ slot %i\n",i); 01546 while (d != NULL) { 01547 dprintf_full(" RTP seq %i\n",d->rtp_seq); 01548 d = d->next; 01549 } 01550 } 01551 */ 01552 sendQ.MUTEX_Q.release(); 01553 return true; 01554 } 01555 01556 /***************************************************************/ 01557 Rtp::NetPacket_t *Rtp::popNetPacket(uint16_t rtp_seq) { 01558 Rtp::NetPacket_t *p=NULL, *pprev=NULL; 01559 int slot; 01560 01561 slot = rtp_seq % RTP_SENDQ_PRIME; 01562 sendQ.MUTEX_Q.lock(); 01563 p = sendQ.Q[slot]; 01564 pprev = NULL; 01565 while ( (p != NULL) && (p->rtp_seq != rtp_seq) ) { 01566 pprev = p; 01567 p = p->next; 01568 } 01569 01570 if (p) { 01571 if (pprev) 01572 pprev->next = p->next; 01573 else 01574 sendQ.Q[slot] = p->next; 01575 p->next = NULL; 01576 sendQ.size -= p->pl_size; 01577 sendQ.num_elems--; 01578 } 01579 01580 sendQ.MUTEX_Q.release(); 01581 return p; 01582 } 01583 01584 /***************************************************************/ 01585 Rtp::NetPacket_t *Rtp::popNetPacketByRTXseq(uint16_t rtx_rtp_seq) { 01586 Rtp::NetPacket_t *p=NULL, *pprev=NULL; 01587 int slot; 01588 01589 sendQ.MUTEX_Q.lock(); 01590 for (slot=0;((slot<RTP_SENDQ_PRIME) && (p==NULL));slot++) { 01591 p = sendQ.Q[slot]; 01592 pprev = NULL; 01593 while ( (p != NULL) && (p->rtx_rtp_seq != rtx_rtp_seq) ) { 01594 pprev = p; 01595 p = p->next; 01596 } 01597 01598 if (p) { 01599 if (pprev) 01600 pprev->next = p->next; 01601 else 01602 sendQ.Q[slot] = p->next; 01603 p->next = NULL; 01604 sendQ.size -= p->pl_size; 01605 sendQ.num_elems--; 01606 } 01607 pprev = NULL; 01608 } 01609 sendQ.MUTEX_Q.release(); 01610 return p; 01611 } 01612 01613 /***************************************************************/ 01614 void Rtp::sendQupdate(long streamout_sec) { 01615 Rtp::NetPacket_t *p=NULL, *pprev=NULL; 01616 int slot; 01617 long ssec_ticks, threshold; 01618 01619 ssec_ticks = es->getMediaTimeScale() * streamout_sec; 01620 threshold = (long)(ssec_ticks - ((u64)ceil(stats->getPrefetchedSecs()) * es->getMediaTimeScale())); 01621 01622 if ((streamout_sec >= 0) && (threshold < 0)) { //not "flushAll" and initial prefetching 01623 dprintf_full("Rtp::sendQupdate(ssec %li): ignoring since initial prefetching (threshold TS %li)\n", streamout_sec,threshold); 01624 return; 01625 } 01626 01627 dprintf_full("Rtp::sendQupdate(ssec %li): drop all older than TS %li\n",streamout_sec, threshold); 01628 01629 sendQ.MUTEX_Q.lock(); 01630 for (slot=0; slot<RTP_SENDQ_PRIME;slot++) { 01631 p = sendQ.Q[slot]; 01632 pprev = NULL; 01633 while (p != NULL) { 01634 if ( (streamout_sec == -1) || //-1 == drop all 01635 (p->rtp_ts + (u64)ceil(stats->getPrefetchedSecs()) * es->getMediaTimeScale() 01636 < (unsigned)ssec_ticks) || 01637 (p->acked) ) { 01638 //dprintf_full("NOW TS: %li Frame seq %i ts %i:%i is outdated or acked... now dropped\n", 01639 // streamout_sec*es->getMediaTimeScale(),p->rtp_seq, p->rtp_ts,p->frag_no); 01640 sendQ.size -= p->pl_size; 01641 sendQ.num_elems--; 01642 01643 if (pprev) 01644 pprev->next = p->next; 01645 else 01646 sendQ.Q[slot]=p->next; 01647 01648 assert(p->payload != NULL); 01649 delete p->payload; 01650 p->payload=NULL; 01651 delete p;p=NULL; 01652 01653 if (pprev) 01654 p = pprev; //to continue while 01655 else 01656 p = sendQ.Q[slot]; 01657 } 01658 01659 if (sendQ.Q[slot] == NULL) 01660 break; 01661 01662 pprev = p; 01663 p = p->next; 01664 } 01665 } 01666 sendQ.MUTEX_Q.release(); 01667 } 01668 01669 01670 /***************************************************************/ 01671 void Rtp::preQupdate() { 01672 Rtp::NetPacket_t *p=NULL; 01673 int act_seq = -1; 01674 int prev_seq = -1; 01675 int ret; 01676 01677 if (!rtxEnabled) { 01678 dprintf_full("Rtp::preQupdate: no rtxEnabled... ignoring\n"); 01679 return; 01680 } 01681 01682 dprintf_full("Rtp::preQupdate\n"); 01683 preQ.MUTEX_Q.lock(); 01684 p = preQ.Q; 01685 while (p != NULL) { 01686 act_seq=p->rtp_seq; 01687 // dprintf_full("Rtp::preQupdate checks on %i ts %i (prev %i)\n",act_seq,p->rtp_ts,prev_seq); 01688 if (prev_seq != -1) { 01689 while (++prev_seq < act_seq) { //missing packet 01690 dprintf_full("Rtp::preQupdate: requesting %i (remaining: %i, final will be %i), sess %p ssrc %x\n", 01691 prev_seq, act_seq-1-prev_seq, act_seq-1, session, sender_ssrc); 01692 ret=rtp_send_explicit_nack(session, sender_ssrc, prev_seq, stats->getHighestPacketTS(), NULL); 01693 assert(act_seq > prev_seq); 01694 //assert(act_seq-1 - prev_seq < 30 && "YOUR SYSTEM IS TOO SLOW! LOST TOO MANY PACKETS IN A ROW!!!"); 01695 if (ret == -1) { 01696 dprintf_err("Rtp::preQupdate ERROR! rtp session wrong!!!!\n"); 01697 break; 01698 } 01699 } 01700 } 01701 prev_seq = act_seq; 01702 p = p->next; 01703 } 01704 preQ.MUTEX_Q.release(); 01705 } 01706 01707 01708 01709 01710 /***************************************************************/ 01711 bool Rtp::insertToPreQ(u8 *payload, long pl_size, Frame::FrameType type, long vop_size, int prio, 01712 u32 rtpTimestamp, u32 fragCounter, bool lastFrag, u32 rtp_seq, QOrder qo) { 01713 NetPacket_t *p=NULL; 01714 int diff; 01715 int rtp_cycle; 01716 01717 p = new NetPacket_t; 01718 memset(p,0,sizeof(NetPacket_t)); 01719 p->rtp_ts = rtpTimestamp; 01720 p->vop_type = type; 01721 p->frag_no = fragCounter; 01722 p->last_frag = lastFrag; 01723 p->payload = payload; 01724 p->pl_size = pl_size; 01725 p->vop_size = vop_size; 01726 p->drop_me = false; 01727 p->prio = prio; 01728 p->sent_dtime = p->resent_dtime = 0; 01729 01730 preQ.MUTEX_Q.lock(); 01731 /* to let this work, we assume no packets which are off more than 01732 * 64K in seq_no space.... */ 01733 rtp_cycle = (preQ.highest_rtp_seq / RTP_SEQ_MOD); 01734 if (preQ.highest_rtp_seq != 0) { 01735 diff=(int)(preQ.highest_rtp_seq % RTP_SEQ_MOD) - (int)(rtp_seq); 01736 if (diff > 32768) //new cycle 01737 rtp_cycle++; 01738 else 01739 if (diff < -32768) //previous cycle 01740 rtp_cycle--; 01741 } 01742 p->rtp_seq = rtp_seq + rtp_cycle*RTP_SEQ_MOD; 01743 if (isInput()) { // clientSide fix for 32bit rtp_seq 01744 assert(rtp_seq == fragCounter); 01745 p->frag_no = p->rtp_seq; 01746 } 01747 01748 preQ.MUTEX_Q.release(); 01749 01750 //now put it onto the PreQ 01751 return addToPreQ(p,qo); 01752 } 01753 01754 01755 /*************************************************************** 01756 * called by: server-side fb_handle() 01757 */ 01758 bool Rtp::reinsertToPreQ(NetPacket_t *p, uint rtp_seq) { 01759 01760 if (p!=NULL) { 01761 int this_frame_sec = (int)floor((double)p->rtp_ts / es->getMediaTimeScale()); 01762 int highest_ts_sec = (int)floor((double)stats->getHighestPacketTS() / es->getMediaTimeScale()); 01763 01764 // too late for this packet .... 01765 if (this_frame_sec < stats->getPlayoutSec()) { 01766 dprintf_full("Rtp::reinsertToPreQ too late for TS %i seq %i\n",p->rtp_ts,rtp_seq); 01767 if (p->resent_counter == 0) { //first RTX, adjust 01768 if (this_frame_sec > highest_ts_sec - RTP_STAT_SECS) { //in stats range 01769 if (p->last_frag) // is a full frame 01770 stats->decPlayoutSecNetFPS(this_frame_sec); 01771 //general stats 01772 stats->decPlayoutSecDataPkts(this_frame_sec); 01773 stats->decPlayoutSecDataBW(this_frame_sec, p->pl_size + sizeof(rtp_packet_header)); 01774 } 01775 } 01776 return false; 01777 } 01778 01779 // when we recv this NACK, it might be for a previous streamout sec, 01780 // so fix up this previous streamout sec statistic with more nacked_bw 01781 int nack_sec = (int)floor(p->sent_dtime - stats->getFirstPacketTime()); 01782 if (nack_sec >= MAX(0,(int)stats->getStreamoutFullSec() - RTP_STAT_SECS)) // within stat array 01783 stats->incStreamoutSecNackedBW(nack_sec, sizeof(rtp_packet_header) + p->pl_size); 01784 01785 if (this_frame_sec > (signed)highest_ts_sec - RTP_STAT_SECS) //within stat array range 01786 stats->incPlayoutSecNackedBW(this_frame_sec, sizeof(rtp_packet_header) + p->pl_size); 01787 01788 p->resent_counter++; 01789 dprintf_full(" adding to RESEND Q for %i%s time: seq %i ts %i:%i prio %i\n", 01790 p->resent_counter, p->resent_counter==1?"st":p->resent_counter==2?"nd": 01791 p->resent_counter==3?"rd":"th", 01792 p->rtp_seq, p->rtp_ts, p->frag_no,p->prio); 01793 return addToPreQ(p,Q_TS,RTP_REINSERT); //this will add to front, but keep RTX order 01794 // return false; //DISABLE RTX 01795 01796 } 01797 else 01798 dprintf_err(" resend of rtp_seq %i too late... already off sendQ!\n",rtp_seq); 01799 01800 return false; 01801 } 01802 01803 01804 /***************************************************************/ 01805 /* @param qo: the QueueOrder 01806 * for the server: Q_TAIL for normal read from disk, 01807 * Q_HEAD for retransmissions, to put them to the immediate front; 01808 * for the client: Q_SEQ in RtpHandler, if new data arrives, insert with RTP seqNo, 01809 * Q_SEQ in getFrame, if too many packets where read, for pushback; 01810 * now, everything is done via Q_TS, which is more fair to RTX 01811 * @param reinsert: is used in Rtp::getFrame if there was popped too far in the Q... 01812 * or if a NACK'd packed is going to be added to the Q 01813 */ 01814 bool Rtp::addToPreQ(NetPacket_t *p, QOrder qo, bool reinsert) { 01815 NetPacket_t *pprev=NULL, *pold=NULL; 01816 01817 preQ.MUTEX_Q.lock(); 01818 p->next = NULL; 01819 if (reinsert) 01820 dprintf_full("Rtp::addtoPreQ reinsert!\n"); 01821 dprintf_full("Rtp::addToPreQ (stored: %i full frames) packet seq %i ts %i:%i size %i (%s) type: %s prio: %i\n", 01822 preQ.num_full_frames,p->rtp_seq,p->rtp_ts, 01823 p->frag_no,p->pl_size,!p->last_frag?"intermediate":"last", 01824 Frame::VOPTypeToChar(p->vop_type), p->prio); 01825 01826 dprintf_full(" PreQ has %i packets and %li bytes (avg bw: %i Kbit/sec)\n", 01827 preQ.num_elems,preQ.size,preQ.avg_bw*8/1024); 01828 dprintf_full(" now its %2.3f, PreQ is at %2.3f secs, ClPlay sec is %2.3f, so it's %2.3f secs ahead\n", 01829 stats->getStreamoutFloatSec(),(float)preQ.highest_ts/es->getMediaTimeScale(), 01830 stats->getPlayoutSec(), stats->getBufAheadSec()); 01831 01832 if (!writeToNetwork && !reinsert) 01833 dprintf_full(" PreQ: last frame arrival@%6.3f secs, interarrival time: %i msecs\n", 01834 stats->getLastPacketTime() - stats->getFirstPacketTime(), interarrival_time); 01835 01836 //for convenience, resends are always enqueued, even if the preQ is full! 01837 //all others are bound to preQ MAX size 01838 if (!reinsert) 01839 while (preQ.size+p->pl_size > preQ.max_size) { 01840 preQ.MUTEX_Q_FULL.lock(); 01841 preQ.MUTEX_Q.release(); 01842 01843 if (this->isInput()) 01844 dprintf_full("Rtp::addToPreQ: preQ BUFFER FULL SO BLOCKED! ... we also might lose arriving packets!\n"); 01845 //wait until run thread has sent some data 01846 msleep(200); 01847 // preQ.MUTEX_Q_FULL.lock(); 01848 preQ.MUTEX_Q_FULL.release(); 01849 preQ.MUTEX_Q.lock(); 01850 } 01851 01852 preQ.isDirty=true; 01853 preQ.size += p->pl_size; 01854 preQ.num_elems++; 01855 /* add to tail of preQ */ 01856 if (preQ.Q == NULL) { 01857 preQ.Q = preQ.Q_tail = p; 01858 //dprintf_full("preQ was NULL, now DATA AVAIL!\n"); 01859 preQ.MUTEX_Q_DATA_AVAIL.release(); 01860 } else { 01861 if (qo == Q_HEAD) { 01862 p->next = preQ.Q; 01863 preQ.Q = p; 01864 } else 01865 if (qo == Q_TAIL) { //add to tail 01866 preQ.Q_tail->next = p; 01867 preQ.Q_tail = p; 01868 } else 01869 if (qo == Q_TS) { //insert according to timestamp 01870 pprev = pold = preQ.Q; 01871 01872 //also frag_no is a key for sorting! 01873 //SERVER: frag_no is set correctly, CLIENT: frag_no is the RTP_seq, also monotonic! 01874 while((pold!=NULL) && (pold->rtp_ts < p->rtp_ts)) { 01875 pprev = pold; 01876 pold = pold->next; 01877 } 01878 while ((pold!=NULL) && 01879 (pold->rtp_ts == p->rtp_ts) && 01880 (pold->frag_no < p->frag_no)) { 01881 pprev = pold; 01882 pold = pold->next; 01883 } 01884 01885 if (pold!=NULL) { 01886 if (pold->rtp_ts > p->rtp_ts) { 01887 p->next = pold; 01888 if (pold == preQ.Q) { 01889 preQ.Q = p; 01890 } else 01891 pprev->next = p; 01892 } else 01893 if (pold->frag_no == p->frag_no) { 01894 dprintf_err("MULTIPLE RECEPTION of rtp_seq %i TS %i:%i\n",p->rtp_seq,p->rtp_ts,p->frag_no); 01895 delete p->payload;p->payload=NULL; 01896 delete p;p=NULL; 01897 preQ.MUTEX_Q.release(); 01898 return false; 01899 } else 01900 if (pold->frag_no > p->frag_no) { 01901 p->next = pold; 01902 if (pold == preQ.Q) 01903 preQ.Q = p; 01904 else 01905 pprev->next = p; 01906 } else { 01907 if (pold == preQ.Q_tail) { 01908 preQ.Q_tail->next = p; 01909 preQ.Q_tail = p; 01910 } else { 01911 if (pprev == pold) { //first elem 01912 preQ.Q=p; 01913 p->next = pold; 01914 } else { 01915 p->next=pprev->next; 01916 pprev->next=p; 01917 } 01918 } 01919 } 01920 } else {//add to tail 01921 //preQ.Q_tail->next = p; //redundant 01922 pprev->next = p; 01923 preQ.Q_tail = p; 01924 } 01925 } else 01926 if (qo == Q_SEQ) { //insert according to RTP sequence number 01927 pprev = pold = preQ.Q; 01928 while((pold!=NULL) && (pold->rtp_seq < p->rtp_seq)) { 01929 pprev = pold; 01930 pold = pold->next; 01931 } 01932 if (pold!=NULL) { //insert in between 01933 if (pold->rtp_seq == p->rtp_seq) { 01934 dprintf_err("MULTIPLE RECEPTION of rtp_seq %i TS %i:%i\n",p->rtp_seq,p->rtp_ts,p->frag_no); 01935 delete p->payload;p->payload=NULL; 01936 delete p;p=NULL; 01937 preQ.MUTEX_Q.release(); 01938 return false; 01939 } else { 01940 p->next = pold; 01941 if (preQ.Q == pold) 01942 preQ.Q = p; 01943 else 01944 pprev->next = p; 01945 } 01946 } else { //add to tail 01947 preQ.Q_tail->next = p; 01948 preQ.Q_tail = p; 01949 } 01950 } 01951 } 01952 01953 /* VITOOKI_DEBUG PRINT OUTPUT */ 01954 /* 01955 NetPacket_t *dummy=p; 01956 NetPacket_t *pr; 01957 int64_t monotonic_seq; 01958 dprintf_full("preQ (%li bytes, %i frms, %i KBps) elem: ", 01959 preQ.size,preQ.num_elems,(int)(preQ.avg_bw*8/1024.0)); 01960 // printf(" %i:%i#%i\n",p->rtp_seq,p->rtp_ts,p->frag_no); 01961 p=pr=preQ.Q; 01962 monotonic_seq=-1; 01963 while (p != NULL) { 01964 printf(" %i:%i#%i(%i)",p->rtp_seq, p->rtp_ts, p->frag_no, p->pl_size); 01965 fflush(stdout); 01966 // if (!writeToNetwork) 01967 // assert(monotonic_seq < p->rtp_seq); 01968 monotonic_seq=p->rtp_seq; 01969 pr=p; 01970 p=p->next; 01971 } 01972 printf("\n"); 01973 p=dummy; 01974 assert(preQ.Q_tail == pr); 01975 assert(pr->next == NULL); 01976 */ 01977 01978 if (!reinsert && (p->resent_counter == 0)) { 01979 if (preQ.highest_ts < p->rtp_ts) { 01980 if (preQ.highest_ts == 0) { //initial jump size (eg. 3000==30fps or 3600=25fps) 01981 //FIXME: dirty hack to determine FPS even if one frame lost 01982 if(allowVopTimeDetection) { 01983 if (p->rtp_ts == 7200) 01984 es->setVOPTimeIncrement(3600); 01985 else if (p->rtp_ts == 6000) 01986 es->setVOPTimeIncrement(3000); 01987 else 01988 es->setVOPTimeIncrement(p->rtp_ts); 01989 // es->setVOPTimeIncrement(90000); //MP3 01990 } 01991 } 01992 preQ.highest_ts = p->rtp_ts; 01993 } 01994 if (preQ.highest_rtp_seq < p->rtp_seq) 01995 preQ.highest_rtp_seq = p->rtp_seq; 01996 01997 if (p->last_frag) { 01998 preQ.num_full_frames++; 01999 if (preQ.num_full_frames > 0) 02000 preQ.avg_bw = (int)(preQ.size/preQ.num_full_frames * 02001 ((double)es->getMediaTimeScale())/((double)es->getVOPTimeIncrement())); 02002 else 02003 preQ.avg_bw = 0; 02004 } 02005 } 02006 02007 preQ.MUTEX_Q.release(); 02008 //dprintf_full("insertQ unlocked\n"); 02009 02010 return true; 02011 } 02012 02013 /***************************************************************/ 02014 int Rtp::getNextPreQ_TS() { 02015 int ts; 02016 02017 preQ.MUTEX_Q.lock(); 02018 if (preQ.Q_tail != NULL) 02019 ts = preQ.Q_tail->rtp_ts; 02020 else 02021 ts = -1; 02022 preQ.MUTEX_Q.release(); 02023 02024 return ts; 02025 } 02026 02027 /***************************************************************/ 02028 Rtp::NetPacket_t *Rtp::popPreQ(bool blocking) { 02029 NetPacket_t *p=NULL; 02030 int counter = 0; 02031 02032 //dprintf_full("Rtp::popPreQ %s\n",blocking?"blocking":"non-blocking"); 02033 02034 //FIXME: WHY !firstframe?? if (!firstFrame && blocking && (state != STREAMEOF)) 02035 if (blocking && (state != STREAMEOF)) 02036 preQ.MUTEX_Q_DATA_AVAIL.lock(); //wait for data 02037 //dprintf_full("Rtp::popPreQ data avail\n"); 02038 02039 // preQ.MUTEX_Q.lock(); 02040 if ((state == FORCE_CLOSING) || (state == CLOSED)) { 02041 // preQ.MUTEX_Q.release(); 02042 if (writeToNetwork) 02043 preQ.MUTEX_Q_DONE.release(); 02044 dprintf_full("Rtp::popPreQ is quitting because of %s\n",(state==CLOSED?"CLOSED":"FORCE_CLOSING")); 02045 return NULL; 02046 } 02047 // preQ.MUTEX_Q.release(); 02048 02049 02050 preQ.MUTEX_Q.lock(); 02051 if ((state!=OPEN) && blocking && (preQ.Q == NULL)) { // if other thread is closing... 02052 dprintf_full("Rtp::popPreQ was blocking, but bailing out! empty Q and state %i!\n",state); 02053 preQ.MUTEX_Q.release(); 02054 return NULL; 02055 } 02056 if (preQ.Q == NULL) { 02057 dprintf_full("erroneously recv'd preQ.MUTEX_Q unlock.... no data after unpausing and state %i\n",state); 02058 preQ.MUTEX_Q.release(); 02059 return (Rtp::NetPacket_t *)-1; //FIXME: THIS IS A HACK!!!! 02060 } 02061 02062 while (preQ.Q == NULL) { 02063 dprintf_full("Rtp::popPreQ has empty Q! (%ith try)!\n",counter); 02064 msleep(100); 02065 if (counter++ > 10) { 02066 preQ.MUTEX_Q.release(); 02067 dprintf_full("Rtp::popPreQ is quitting because of empty Q and state %i!\n",state); 02068 if (firstFrame) //this is needed for a special error case 02069 firstFrame=false; //during prefetching! 02070 return NULL; 02071 } 02072 } 02073 p=preQ.Q; 02074 preQ.Q = p->next; 02075 p->next = NULL; 02076 preQ.size -= p->pl_size; 02077 preQ.num_elems--; 02078 if ( (p->last_frag) && (p->resent_counter == 0) ) 02079 preQ.num_full_frames--; 02080 if (preQ.num_elems == 0) { 02081 // assert(preQ.num_full_frames == 0); 02082 preQ.Q_tail = NULL; 02083 preQ.avg_bw = 0; 02084 } else 02085 preQ.avg_bw = (int)(preQ.size/preQ.num_elems * ((double)es->getMediaTimeScale())/((double)es->getVOPTimeIncrement())); 02086 02087 preQ.MUTEX_Q_FULL.release(); 02088 if (preQ.Q != NULL) { //still data available 02089 preQ.MUTEX_Q_DATA_AVAIL.release(); 02090 } 02091 preQ.MUTEX_Q.release(); 02092 return p; 02093 } 02094 02095 /***************************************************************/ 02096 void Rtp::run() { 02097 dprintf_full("Rtp::run (ports %i:%i) is starting the according RTP thread\n",localPort,remotePort); 02098 02099 #ifndef QT_THREAD_SUPPORT 02100 int ret = setRoundRobinScheduling(2); 02101 if (ret == 0) { 02102 dprintf_full("Rtp::sendThread switched to RT-RR SCHEDULING!\n"); 02103 } else { 02104 dprintf_full("Rtp::sendThread couldnt change to RT-RR SCHEDULING! (got root?) : err %i\n",ret); 02105 } 02106 #endif 02107 02108 if (writeToNetwork) //Sender 02109 sendThread(); 02110 else 02111 readThread(); 02112 } 02113 02114 02115 /***************************************************************/ 02116 void Rtp::readThread() { 02117 struct timeval now_tv; 02118 double now_dtime; 02119 double paused_dtime=0.0; 02120 IO::State myState; 02121 bool new_streamout_sec; 02122 02123 dprintf_full("Rtp::ReadThread started\n"); 02124 dprintf_full("RTP_PREFETCH_SECS: %i Avg BW: %i kbit/sec BufferSize: %li KByte\n", 02125 RTP_PREFETCH_SECS, es->getAvgBandwidth()/1024, preQ.max_size/1024); 02126 //is enough buffer avail for selected prefetching? 02127 if (preQ.max_size < RTP_PREFETCH_SECS * es->getAvgBandwidth()/8) { 02128 dprintf_err("WARNING: maybe not enough buffer to fit client prefetching!!!!\n"); 02129 //::exit(1); 02130 } 02131 02132 while(1) { 02133 myState = state; // disable race conds without large locking... 02134 02135 if ((myState == CLOSING) || (myState == FORCE_CLOSING)) { 02136 dprintf_full("Rtp::readThread DONE!\n"); 02137 preQ.MUTEX_Q_DONE.release(); 02138 return; 02139 } 02140 02141 gettimeofday(&now_tv,NULL); 02142 now_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02143 dprintf_full("Rtp::readThread() loop now_dtime %lf\n",now_dtime); 02144 if (!threadWasPaused && (myState == PAUSED)) { 02145 dprintf_full("Rtp::readThread set to PAUSED! waiting for PLAY again...\n"); 02146 threadWasPaused=true; 02147 paused_dtime = now_dtime; 02148 } 02149 if ( (threadWasPaused) && (myState != PAUSED)) { 02150 threadWasPaused=false; 02151 dprintf_full("Rtp::readThread returns from paused to OPEN after %6.3f secs! adjusting first_packet_dtime!\n", 02152 now_dtime - paused_dtime); 02153 if (stats->getFirstPacketTime() != 0.0) { // PAUSED at very beginning 02154 //set sendout base timing right 02155 stats->setLastPacketTime(now_dtime); 02156 stats->setFirstPacketTime(stats->getFirstPacketTime() + (now_dtime - paused_dtime)); 02157 } 02158 } 02159 02160 if (myState != PAUSED) { //normal code in OPEN or MUTED or PREFETCHING 02161 new_streamout_sec = stats->checkForNewStreamoutSec(now_dtime); 02162 02163 if (new_streamout_sec) { 02164 dprintf_full("Rtp::readThread: NEW streamout %2.3f " 02165 " ClPlaySec %2.3f ClBuf ahead %2.3f\n", 02166 stats->getStreamoutFloatSec(), 02167 stats->getPlayoutSec(), stats->getBufAheadSec()); 02168 02169 stats->writeStreamoutSecClientStats(preQ.size); 02170 02171 //this does (maybe) senseless requests for B-VOPs too!! 02172 preQupdate(); //re-request still missing packets 02173 } 02174 02175 if ((!firstFrame) && (now_dtime - stats->getLastPacketTime() > 1.0)) // TIMEOUT 02176 //ignore, if state is PAUSED (dont use myState, since the real state might have changed!) 02177 if ((state == OPEN) || (state == MUTED) || (state == PREFETCHING)) { 02178 dprintf_err("Rtp::readThread: now its %2.3f no more data arriving! EOF? state=%i buf ahead %2.3f secs\n",stats->getStreamoutFloatSec(),myState,stats->getBufAheadSec()); 02179 setState(STREAMEOF); 02180 preQ.MUTEX_Q_DATA_AVAIL.release(); //simulate data 02181 } 02182 } // !PAUSED 02183 02184 //always try to get packets, even if PAUSED... 02185 if ((state != CLOSED) && (state != CLOSING) && (state != FORCE_CLOSING)) 02186 ucl_update(stats->getHighestPacketTS(), UCL_TIMEOUT); //blocking for 20 msecs 02187 }//while 02188 } 02189 02190 02191 /***************************************************************/ 02192 int Rtp::doSend(NetPacket_t *p) { 02193 int ret=0; 02194 struct timeval now_tv; 02195 double now_dtime; 02196 if(!p) 02197 return -1; 02198 02199 gettimeofday(&now_tv,NULL); 02200 interarrival_time = (int)(1000* (now_tv.tv_sec + now_tv.tv_usec/1000000.0 02201 - stats->getLastPacketTime())); 02202 stats->setAvgInterarrivalTime(interarrival_time); 02203 now_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02204 02205 if (interarrival_time > 0) //not first frame... 02206 dprintf_full("Rtp::doSend: last sent: %3.3f secs, now %3.3f secs, interarrival time: %i msecs\n", 02207 stats->getLastPacketTime() - stats->getFirstPacketTime(), 02208 now_dtime - stats->getFirstPacketTime(), 02209 interarrival_time); 02210 02211 if (stats->getFirstPacketTime() == 0.0) { 02212 stats->setFirstPacketTime(now_dtime); 02213 stats->setFirstPacketTS(p->rtp_ts); 02214 } 02215 stats->setLastPacketTime(now_dtime); 02216 if (globalTimer) //adjust not to TS but to packet sendout time 02217 globalTimer->adjustToTS((int)((now_dtime - stats->getFirstPacketTime()) * es->getMediaTimeScale()), 02218 es->getMediaTimeScale()); 02219 02220 int this_frame_sec = (int)floor((double)p->rtp_ts / es->getMediaTimeScale()); 02221 int highest_ts_sec = (int)floor((double)stats->getHighestPacketTS() / es->getMediaTimeScale()); 02222 if (stats->getHighestPacketTS() < p->rtp_ts) { //find highest TS ever sent... 02223 dprintf_full("Rtp::doSend: set new highest TS: %i\n",p->rtp_ts); 02224 stats->setHighestPacketTS(p->rtp_ts); 02225 02226 stats->checkForNewPlayoutSec(this_frame_sec,highest_ts_sec); 02227 } 02228 02229 //put packet into sendQ, only if not a retransmission 02230 //FIRST TIME SEND ############## 02231 if (p->resent_counter == 0) { 02232 stats->incStreamoutSecDataBW(stats->getStreamoutSec(), sizeof(rtp_packet_header) + p->pl_size); 02233 stats->incStreamoutSecDataPkts(stats->getStreamoutSec()); 02234 02235 if (!p->last_frag) //only if there are other packets of this frame 02236 markFullFramePartially(false,true,p->rtp_ts); 02237 p->rtp_seq = rtp_get_act_seqno(session); 02238 p->sent_dtime = now_dtime; 02239 dprintf_full("data available for sending! (%6i bytes, rtp_seq %6i cts %6i:%i prio %i %s)\n", 02240 p->pl_size,p->rtp_seq,p->rtp_ts,p->frag_no,p->prio,Frame::VOPTypeToChar(p->vop_type)); 02241 02242 if (this_frame_sec > highest_ts_sec - RTP_STAT_SECS) { //in stats range 02243 if (p->last_frag && (p->vop_type != Frame::HEADER_VOP)) // is a full frame 02244 stats->incPlayoutSecNetFPS(this_frame_sec); 02245 // general stats on this packet 02246 stats->incPlayoutSecDataPkts(this_frame_sec); 02247 stats->incPlayoutSecDataBW(this_frame_sec,p->pl_size + sizeof(rtp_packet_header)); 02248 } 02249 02250 if (first_rtp_seq < 0) //initialize 02251 first_rtp_seq = p->rtp_seq; 02252 last_rtp_seq = p->rtp_seq; 02253 02254 //ret holds the number of sent bytes or -1 on error 02255 ret=rtp_send_data(session,p->rtp_ts, RTP_PAYLOAD_TYPE, p->last_frag, 0, NULL, 02256 (uint8_t *) p->payload,p->pl_size, NULL, 0, 0); 02257 } else { // RETRANSMISSION ############### 02258 if (rtxEnabled) { 02259 uint8_t *buf=NULL; 02260 uint16_t *seq_p=NULL; 02261 02262 stats->incStreamoutSecRtxBW(stats->getStreamoutSec(), sizeof(rtp_packet_header) + p->pl_size); 02263 stats->incStreamoutSecRtxPkts(stats->getStreamoutSec()); 02264 stats->incPlayoutSecRtxPkts(this_frame_sec); 02265 stats->incPlayoutSecRtxBW(this_frame_sec,p->pl_size + sizeof(rtp_packet_header)); 02266 02267 p->rtx_rtp_seq = rtp_get_act_seqno(rtx->session); 02268 p->resent_dtime = now_dtime; 02269 dprintf_full("RESENT packet seq %i rtx_seq %i ts %i:%i size %6i bytes\n", 02270 p->rtp_seq,p->rtx_rtp_seq, p->rtp_ts,p->frag_no,p->pl_size); 02271 // add original RTP seq_no to front of payload 02272 // according to draft-ietf-avt-rtp-retransmission-04 02273 buf=(uint8_t*)malloc(p->pl_size + sizeof(uint16_t)); 02274 assert(buf!=NULL); 02275 seq_p = (uint16_t*)buf; 02276 *seq_p = htons(p->rtp_seq); //store real rtp seqNo 02277 assert(p); 02278 assert(p->payload); 02279 memcpy(buf+sizeof(uint16_t),(uint8_t *) p->payload,p->pl_size); 02280 02281 if (first_rtx_rtp_seq < 0) //initialize 02282 first_rtx_rtp_seq = p->rtp_seq; 02283 last_rtx_rtp_seq = p->rtp_seq; 02284 02285 //ret holds the number of sent bytes or -1 on error 02286 ret=rtp_send_data(rtx->session,p->rtp_ts, RTP_PAYLOAD_TYPE, p->last_frag, 0, NULL, 02287 buf,p->pl_size+sizeof(uint16_t), NULL, 0, 0); 02288 free(buf); 02289 } else 02290 dprintf_err("no RTX initialized... NO resend of packet seq %i ts %i:%i\n", 02291 p->rtp_seq,p->rtp_ts,p->frag_no); 02292 } 02293 02294 if (ret == -1) // problem with sending data 02295 close(true); 02296 return ret; 02297 } 02298 02299 /***************************************************************/ 02300 void Rtp::sendThread() { 02301 NetPacket_t *p=NULL; 02302 struct timeval timeout; 02303 struct timeval now_tv; 02304 double now_dtime=0, now2_dtime=0; 02305 double byte_msec_fact; 02306 bool new_streamout_sec; 02307 int act_wait=0; 02308 int net_bw; 02309 int inc_in_a_row = 0; 02310 int dec_in_a_row = 0; 02311 int bke=15; // estimation of needed time for bookkeeping 02312 int awe=10; // estimation of needed time for avg. wait time 02313 IO::State myState; 02314 int sent_pkts = 0; 02315 02316 #ifdef NETBENCH 02317 int NETBENCH_BASE_streamBW = stats->getStreamBW(); 02318 int NETBENCH_lastSec = 0; 02319 int NETBENCH_highSec = 0; 02320 bool NETBENCH_go_down=true; 02321 #endif 02322 02323 //this is for gdb'd client or QuickTime to come up receiving! 02324 sleep(RTP_SENDER_PRESLEEP); 02325 //stream_bw is set in setESInfo()!!! 02326 // setESInfo(es); 02327 02328 net_bw_before_superdrain = -1; 02329 // net_bw = (int)(stats->getStreamBW() * (1+RTP_BUF_FILL_FACT)); //initially assume max. avail BW (optimum) 02330 net_bw = stats->getStreamBW(); 02331 02332 byte_msec_fact = 1000.0 / net_bw; 02333 dprintf_full("Rtp::sendThread started with avgBW %i streamBW %i netBW %i KBps\n", 02334 es->getAvgBandwidth() / 1024, stats->getStreamBW()/128, net_bw/128); 02335 02336 while (1) { 02337 new_streamout_sec = false; 02338 myState = state; //prevent race conds and avoid large locking 02339 02340 gettimeofday(&now_tv,NULL); 02341 now_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02342 02343 if (!threadWasPaused && (myState == PAUSED)) { 02344 dprintf_full("Rtp::sendThread set to PAUSED! waiting for PLAY again...\n"); 02345 threadWasPaused=true; 02346 } 02347 if (threadWasPaused) 02348 if (myState != PAUSED) { 02349 threadWasPaused=false; 02350 dprintf_full("Rtp::sendThread returns from paused to OPEN! adjusting first_packet_dtime!\n"); 02351 if (stats->getFirstPacketTime() != 0) { // PAUSED not at very beginning 02352 //set sendout base timing right 02353 stats->setFirstPacketTime(stats->getFirstPacketTime() + now_dtime - stats->getLastPacketTime()); 02354 stats->setLastPacketTime(now_dtime); 02355 } 02356 } else { //keep alive 02357 stats->incStreamoutSecNumBKeepChecks(stats->getStreamoutSec()); 02358 // does all rtp, rtcp updates & callbacks, blocking 20msec 02359 ucl_update(stats->getHighestPacketTS(), UCL_TIMEOUT); 02360 } 02361 02362 if (myState != PAUSED) { //normal code in OPEN or MUTED, but also CLOSE et al. 02363 02364 02365 new_streamout_sec = stats->checkForNewStreamoutSec(now_dtime); 02366 02367 if (new_streamout_sec) { 02368 float pkt_loss; 02369 int oldSec = stats->getStreamoutFullSec()-2; 02370 02371 //SUPERHACK TO FORCE ADAPTATION 02372 #ifndef NETBENCH 02373 if (stats->getStreamoutFullSec() >= 2) 02374 #endif 02375 { 02376 //ADJUST AVAIL BW 02377 if (stats->getStreamoutFullSec() % 2) { //every two secs we see the results of adjustment 02378 if (feedbackEnabled) { 02379 //adjust bw to network state of 2 secs ago 02380 pkt_loss = stats->getStreamoutSecLossPercent(oldSec); 02381 //dprintf_full("Rtp::sendThread calcing pkt_loss from rtp feedback: data %i rtx %i nack %i == %3.3f pkt loss\n",stats->getStreamoutSecDataBW(oldSec),stats->getStreamoutSecRtxBW(oldSec),stats->getStreamoutSecNackedBW(oldSec),pkt_loss ); 02382 } else { 02383 pkt_loss = last_rr_lost_fract; //use RTCP Receiver report 02384 last_rr_lost_fract = 0; //reset, 0 means later increase! 02385 dprintf_full("Rtp::sendThread calcing pkt_loss from RTCP RX_RR: %3.3f pkt loss\n",pkt_loss ); 02386 } 02387 02388 #ifndef NO_ADJUST 02389 02390 if (pkt_loss > RTP_BW_DEC_LOSS) { //high packet loss! 02391 dec_in_a_row++; 02392 #ifdef NETBENCH 02393 net_bw = (int)((double)net_bw / 100 * (100 - pkt_loss)); //greedy adjustment 02394 #else 02395 net_bw = (int)((float)net_bw * RTP_BW_DEC_FACT); 02396 02397 //TCP-friendly sawtooth: multiplicative decrease 02398 //net_bw = (int)((float)net_bw - (dec_in_a_row * 7 * pLayer->getMaxPacketPayloadSize())); 02399 #endif 02400 if (net_bw < RTP_BW_MINIMUM) 02401 net_bw = RTP_BW_MINIMUM; 02402 inc_in_a_row = 0; //reset bw inc counter 02403 } else 02404 dec_in_a_row = 0; 02405 //dont be greedy: only allow a little more netBW than streamBW... 02406 //at least enough to fill up empty buffers.... 02407 if (pkt_loss < RTP_BW_INC_LOSS) { // virtually no packet loss! 02408 if ( ((stats->getStreamoutSecClientBufFill(stats->getStreamoutFullSec()-1)*100) 02409 / stats->getClientPreQMaxSize() ) > RTP_BUF_SUPERHIGHWATER) { 02410 //buffer fill exceeds SUPERHIGHWATER: this shouldnt happen! 02411 dprintf_full("Rtp::sendThread: superdrain mode: ACTIVELY reducing net BW" 02412 " to drain overfull buffers (%i %%)!\n", 02413 (stats->getStreamoutSecClientBufFill(stats->getStreamoutFullSec() 02414 -1)*100) 02415 / stats->getClientPreQMaxSize() ); 02416 if (net_bw_before_superdrain == -1) 02417 net_bw_before_superdrain=net_bw; 02418 net_bw = RTP_BW_MINIMUM; // _REALLY_ reduce BW 02419 } else { 02420 if ( ( ((stats->getStreamoutSecClientBufFill(stats->getStreamoutFullSec()-1)*100) 02421 / stats->getClientPreQMaxSize() ) < RTP_BUF_SUPERHIGHWATER_LOW) && 02422 (net_bw_before_superdrain != -1) ) { 02423 //buffer falls back from SUPER to normal HIGHWATER... reset old net_bw 02424 net_bw = net_bw_before_superdrain; //reset old state 02425 net_bw_before_superdrain = -1; 02426 } else { //slowly upscale 02427 02428 dprintf_full("Rtp::sendThread: STEP UP: old netBW: %i getbaseBW %i\n",net_bw/128, 02429 stats->getBaseStreamBW() ); 02430 02431 inc_in_a_row++; 02432 int new_bw = (int)((float)net_bw // * RTP_BW_INC_FACT); 02433 //+ (RTP_BW_INC_PACKETS * pLayer->getMaxPacketPayloadSize() )); 02434 // + (RTP_BW_INC_PACKETS * pLayer->getMaxPacketPayloadSize() / ((inc_in_a_row % 5)+1) ) ); 02435 + (inc_in_a_row * pLayer->getMaxPacketPayloadSize() ) ); 02436 02437 net_bw = new_bw; 02438 //force to MAX BaseStreamBW, wich is not affected by switching 02439 if (new_bw < stats->getBaseStreamBW() )// * (1+RTP_BUF_FILL_FACT))//a little more... 02440 net_bw = new_bw; 02441 else //exactly find top level 02442 net_bw = stats->getBaseStreamBW(); 02443 02444 } 02445 } 02446 } else { //plateau: keep netBW steady 02447 } 02448 #endif 02449 02450 //SUPERHACK TO FORCE ADAPTATION to eg. 30% 02451 #ifdef NETBENCH 02452 // net_bw = (int)(stats->getStreamBW() * 0.85); 02453 02454 //implements oscillating bandwidth adjustments 02455 //stepwise go down and then up again 02456 #define NETBENCH_MIN 0.4 02457 if (NETBENCH_go_down) 02458 net_bw = (int)( NETBENCH_BASE_streamBW - //degrade BW step by step 02459 NETBENCH_lastSec * (pLayer->getMaxPacketPayloadSize()*2)); 02460 else 02461 net_bw = (int)( NETBENCH_BASE_streamBW - //upgrade BW step by step 02462 (NETBENCH_highSec - NETBENCH_lastSec) * (pLayer->getMaxPacketPayloadSize()*2)); 02463 NETBENCH_lastSec++; 02464 02465 if (net_bw > NETBENCH_BASE_streamBW && !NETBENCH_go_down) { //top reached 02466 dprintf_full("net_bw > NETBENCH_BASE_stream: top reached\n"); 02467 NETBENCH_go_down = true; 02468 NETBENCH_lastSec = 0; 02469 NETBENCH_highSec = 0; 02470 } else 02471 if (net_bw < NETBENCH_BASE_streamBW * NETBENCH_MIN && NETBENCH_go_down) { //bottom reached 02472 dprintf_full("net_bw < NETBENCH_BASE_streamBW * NETBENCH_MIN: bottom reached\n"); 02473 NETBENCH_go_down = false; 02474 NETBENCH_lastSec = 1; 02475 } 02476 else { 02477 if (NETBENCH_go_down) 02478 NETBENCH_highSec++; 02479 } 02480 #endif 02481 02482 byte_msec_fact = 1000.0 / net_bw; 02483 02484 calcAdaptPreQValues(net_bw); 02485 02486 02487 dprintf_full("Rtp::sendThread: NEW streamout %2.3f ADJUST BW: really measured netBW %i" 02488 " (after %2.2f %% pkt loss) assumedBW %i, adapt %i %% to send %2.2f secs" 02489 " (ClBuffer %i KBits of %i (%i %%): %s)\n", 02490 stats->getStreamoutFloatSec(), 02491 (stats->getStreamoutSecDataBW(oldSec) 02492 + stats->getStreamoutSecRtxBW(oldSec) 02493 - stats->getStreamoutSecNackedBW(oldSec)) /128, 02494 pkt_loss, 02495 net_bw*8/1024, 02496 stats->getAdaptRate(), 02497 stats->getAdaptSecs(), 02498 stats->getStreamoutSecClientBufFill(stats->getStreamoutFullSec()-1)/128, 02499 stats->getClientPreQMaxSize()/128, 02500 (stats->getStreamoutSecClientBufFill(stats->getStreamoutFullSec()-1)*100) 02501 /stats->getClientPreQMaxSize(), 02502 stats->getAdaptSecs()>1.0?"FILL":stats->getAdaptSecs()==1.0?"STABLE":"DRAIN"); 02503 02504 } 02505 02506 stats->writeStreamoutSecServerStats(); 02507 02508 //delete outdated packets (no RTX feasible any more) 02509 //still, keep 2 outdated secs, to get better playsec_stat 02510 sendQupdate(stats->getStreamoutFullSec()-2); 02511 } // streamout_sec >= 2 02512 02513 stats->setStreamoutSecNetBW(stats->getStreamoutSec(), net_bw); 02514 dprintf_full("Rtp::sendThread: NEW streamout %2.3f (est.BW %i KBps fact %f)" 02515 " ClPlaySec %2.3f ClBuf ahead %2.3f\n", 02516 stats->getStreamoutFloatSec(),net_bw*8/1024, byte_msec_fact, 02517 stats->getPlayoutSec(), stats->getBufAheadSec()); 02518 02519 } //new streamout sec 02520 02521 adaptPreQ(net_bw); //this will only happen if really necessary! 02522 02523 //each popPreQ will block until data is avail 02524 while ( (p = popPreQ()) ) { //actually execute adaptation 02525 if (p==(Rtp::NetPacket_t *)-1) 02526 continue; //FIXME: after unpause, the Q was empty! this will force PREFETCHING 02527 if (p->drop_me || p->partially_dropped || (!rtxEnabled && p->resent_counter > 0)) { 02528 dprintf_full("Rtp::sendThread ADAPTING (dropping) prio %i VOP_type %s rtpSeq %i TS %i:%i\n", 02529 p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_seq, 02530 p->rtp_ts,p->frag_no); 02531 if (!p->partially_dropped) //dont search again, work already done 02532 markFullFramePartially(true,false,p->rtp_ts); //partially_dropped 02533 int this_frame_sec = (int)floor((double)p->rtp_ts / es->getMediaTimeScale()); 02534 int highest_ts_sec = (int)floor((double)stats->getHighestPacketTS() / es->getMediaTimeScale()); 02535 if (this_frame_sec > highest_ts_sec - RTP_STAT_SECS) { //in stats range 02536 if (p->resent_counter >= 1) //RTX 02537 if (p->last_frag) // is a full frame 02538 stats->decPlayoutSecNetFPS(this_frame_sec); 02539 } 02540 //free everything! 02541 assert(p->payload != NULL); 02542 delete p->payload; 02543 p->payload=NULL; 02544 delete p;p=NULL; 02545 } else 02546 break; 02547 } 02548 02549 /* what happens, if this is blocked and we get an immediate close? 02550 * mutexes will be deleted, popPreQ will return and p==NULL, 02551 * so just return... */ 02552 if ((p==NULL) || (myState == STREAMERR)) { 02553 myState = state; //get new state after blocking... 02554 //if ((myState == CLOSED) || (myState == CLOSING) ||(myState == FORCE_CLOSING)) { 02555 dprintf_full("Rtp::writeThread DONE!\n"); 02556 setState(STREAMEOF); 02557 preQ.MUTEX_Q_DONE.release(); 02558 02559 return; 02560 } 02561 02562 doSend(p); // also modifies p->rtp_seq! 02563 pushNetPacket(p); // put the sent packet on the sendQ 02564 if (p->last_frag) { 02565 currentFrameNumber++; 02566 framesSeen++; 02567 } 02568 sent_pkts++; 02569 02570 /* 02571 //force fast sending of always TWO packets in a row 02572 if (sent_pkts % 2) { 02573 ucl_update(stats->getHighestPacketTS(), 0); 02574 //busy wait 02575 double timeout_dtime; 02576 //this hopefully prevents UDP pkt loss by the kernel 02577 // minimum with msleep(), nanosleep() and pthread_cond_timedwait() is 20 msecs !!! 02578 //minimum with select() is still 10 msecs :(((( 02579 gettimeofday(&now_tv,NULL); 02580 now2_dtime = timeout_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02581 timeout_dtime += ((float)(RTP_MIN_INTERARRIVAL_TIME) / 1000.0); 02582 while (now2_dtime < timeout_dtime) { // BUSY WAITING LOOP 02583 gettimeofday(&now_tv,NULL); 02584 now2_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02585 } 02586 02587 continue; //jump back to big while(1) 02588 } 02589 */ 02590 02591 gettimeofday(&now_tv,NULL); 02592 now_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02593 02594 02595 if ((stats->getStreamoutSecDataBW(stats->getStreamoutSec()) 02596 +stats->getStreamoutSecRtxBW(stats->getStreamoutSec()) ) 02597 > (signed)(net_bw - pLayer->getMaxPacketPayloadSize())) { 02598 act_wait = (int)(((float)stats->getStreamoutFullSec() + 1.0 02599 - (now_dtime - stats->getFirstPacketTime()))*1000) + 2; 02600 dprintf_full("Rtp::sendThread: ALL DONE FOR THIS SEC: act_wait %i\n",act_wait); 02601 } else 02602 act_wait=(int) (stats->getStreamoutFullSec()*1000.0 02603 + (int)( (stats->getStreamoutSecDataBW(stats->getStreamoutSec()) 02604 +stats->getStreamoutSecRtxBW(stats->getStreamoutSec()) ) 02605 * byte_msec_fact) 02606 - (int)((now_dtime - stats->getFirstPacketTime())*1000.0)); 02607 02608 if (now_dtime - stats->getLastPacketTime() + MAX(0,act_wait)/1000.0 02609 < RTP_MIN_INTERARRIVAL_TIME/1000.0) { 02610 dprintf_full("Rtp::sendThread: act_wait %i (intrtime %3.3f) would break " 02611 "MIN_INTERARRIVAL_TIME. increasing...\n",act_wait, 02612 now_dtime - stats->getLastPacketTime() + MAX(0,act_wait)/1000.0); 02613 act_wait = (int)((now_dtime - stats->getLastPacketTime() 02614 + RTP_MIN_INTERARRIVAL_TIME/1000.0)*1000); 02615 } 02616 } // normal code in OPEN or MUTED 02617 02618 // BOOKKEEPING 02619 // either a new streamout sec began or we have time left... 02620 // PAUSE is specially handled above! 02621 // use this time for useful things like receiving NACKs for RTX requests 02622 // and other rtp maintenace 02623 if ( ((new_streamout_sec) || (act_wait >= bke)) && !threadWasPaused) { 02624 stats->incStreamoutSecNumBKeepChecks(stats->getStreamoutSec()); 02625 // do all rtp, rtcp updates & callbacks, non-blocking! 02626 ucl_update(stats->getHighestPacketTS(), 0); 02627 gettimeofday(&now_tv,NULL); 02628 now2_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02629 bke = (int) (bke * 0.7 + ((now2_dtime - now_dtime)*1000 * 0.3)); 02630 dprintf_full("I spent %i msecs for bookkeeping... new est. bke %i msecs\n", 02631 (int) ((now2_dtime - now_dtime)*1000), bke); 02632 } 02633 02634 int real_wait; 02635 int now_wait; 02636 while(1) { 02637 gettimeofday(&now_tv,NULL); 02638 now2_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02639 real_wait = (int)(((now2_dtime - now_dtime)*1000000.0)/1000.0); 02640 if (real_wait >= act_wait) 02641 break; //great, the time is over... 02642 now_wait = MAX(0,act_wait) - real_wait; 02643 dprintf_full("Rtp::sendThread entering act_wait %i loop: now_wait: %i\n",act_wait,now_wait); 02644 /******* SLEEP UNTIL NEXT ROUND ********/ 02645 if (now_wait > RTP_BUSY_WAIT_MS) { //if we want to have bad sleep granularity 02646 //if (1) { //always do select 02647 //we only wait if we will have to wait longer than the avg 02648 //by this, we prefer pushing out netPackets than being late... 02649 timeout.tv_sec = 0; 02650 timeout.tv_usec = (now_wait>0)?now_wait * 1000:1000; //at least wait 1 msec 02651 select(0,NULL,NULL,NULL,&timeout); 02652 02653 } else // do busy waiting 02654 { 02655 double timeout_dtime; 02656 //this hopefully prevents UDP pkt loss by the kernel 02657 // minimum with msleep(), nanosleep() and pthread_cond_timedwait() is 20 msecs !!! 02658 //minimum with select() is still 10 msecs :(((( 02659 gettimeofday(&now_tv,NULL); 02660 now2_dtime = timeout_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02661 timeout_dtime += ((float)RTP_BUSY_WAIT_MS / 1000.0); 02662 while (now2_dtime < timeout_dtime) { // BUSY WAITING LOOP 02663 gettimeofday(&now_tv,NULL); 02664 now2_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02665 } 02666 } 02667 } 02668 02669 if (act_wait > 0) // if we had to wait, get avg wait time... 02670 //awe = (int)(awe * 0.7 + ((now2_dtime - now_dtime)*1000000.0)/1000.0 * 0.3); 02671 awe = (int)(awe * 0.7 + act_wait * 0.3); 02672 02673 gettimeofday(&now_tv,NULL); 02674 now2_dtime = now_tv.tv_sec + now_tv.tv_usec/1000000.0; 02675 real_wait = (int)(((now2_dtime - now_dtime)*1000000.0)/1000.0); 02676 dprintf_full("time %2.3f -> %2i:%03i(should be %03i) 'act_wait'ed for %i msec" 02677 " (really was %i, awe %i), %6i KBit of %6i\n", 02678 stats->getStreamoutFloatSec(), 02679 stats->getStreamoutFullSec(), 02680 (int)(((now2_dtime - stats->getFirstPacketTime()) 02681 - stats->getStreamoutFullSec())*1000.0), //now offset 02682 (int)((stats->getStreamoutSecDataBW(stats->getStreamoutSec()) 02683 +stats->getStreamoutSecRtxBW(stats->getStreamoutSec())) 02684 * byte_msec_fact), //should be 02685 act_wait, 02686 real_wait, 02687 awe, 02688 (stats->getStreamoutSecDataBW(stats->getStreamoutSec()) 02689 +stats->getStreamoutSecRtxBW(stats->getStreamoutSec()))/128, 02690 net_bw/128); 02691 } //while(1) 02692 } 02693 02694 02695 /*************************************************************** 02696 * @param net_bw: what is the actual network capability? normally less :( 02697 */ 02698 void Rtp::calcAdaptPreQValues(int net_bw) { 02699 CompareNetPackets cmpFnc; 02700 priority_queue<NetPacket_t*,std::vector<NetPacket_t*>,CompareNetPackets> pq(cmpFnc); 02701 02702 02703 if (stats->getStreamoutSec() < RTP_PREFETCH_SECS) 02704 return; 02705 stats->setAdaptSecs(1.0); //normal adaptation 02706 if (stats->getBufAheadSec() < RTP_BUF_LOWWATER) { 02707 stats->setAdaptSecs(stats->getAdaptSecs() + (float)RTP_BUF_FILL_FACT); //fill buf (eg. 1.10) 02708 } 02709 02710 if (net_bw_before_superdrain != -1) { //we are in super draining phase 02711 dprintf_full("Rtp::calcAdaptPreQValues turning off adaptation, totally use buffer, we have sooo much of it...\n"); 02712 real_bw = net_bw; //turn off adaptation, totally use buffer, we have sooo much of it... 02713 } else { 02714 real_bw = (int)((float)stats->getStreamBW() * stats->getAdaptSecs()); 02715 if (real_bw < net_bw) //if buf_drain more than net_bw 02716 real_bw = net_bw; //no need to spoil net bw! :) 02717 } 02718 //if (stats->getAdaptRate(0)) //smoother adapt changes 02719 stats->setAdaptRate(net_bw*100 / real_bw); 02720 //else 02721 // stats->setAdaptRate((int)((float)stats->getAdaptRate()*(1 - 0.3) + (net_bw*100 / real_bw)*0.3) + 1); 02722 02723 if (stats->getBufAheadSec() > RTP_BUF_HIGHWATER) { 02724 if (net_bw <= stats->getStreamBW()) { //so adaptSecs is < 1.0 02725 stats->setAdaptSecs((float)net_bw / (float)stats->getStreamBW()); 02726 real_bw = (int)((float)stats->getStreamBW() * stats->getAdaptSecs()); 02727 stats->setAdaptRate(net_bw*100 / real_bw); 02728 dprintf_full("Rtp::calcAdaptPreQValues: superdrain mode: TURNING OFF ADAPTATION since we have so much buffer!\n"); 02729 } 02730 } 02731 02732 if ((stats->getBufAheadSec() > RTP_BUF_LOWWATER) && (stats->getAdaptRate() < (float)RTP_MAX_ADAPT/100.0)) { 02733 //we have enough buffers to reduce necessary adaptation to B-frames only 02734 //stats->setAdaptSecs((float)net_bw / ((float)RTP_MAX_ADAPT/100.0 * (float)stats->getStreamBW())); 02735 stats->setAdaptSecs(stats->getAdaptSecs() - (float)RTP_BUF_DRAIN_FACT); //drain buf (eg. 0.70) 02736 02737 real_bw = (int)((float)stats->getStreamBW() * stats->getAdaptSecs()); 02738 stats->setAdaptRate(net_bw*100 / real_bw); 02739 dprintf_full("Rtp::calcAdaptPreQValues: superdrain mode: ACTIVELY DRAINING BUFFER to preserve quality!\n"); 02740 } 02741 02742 dprintf_full("Rtp:calcAdaptPreQValues: keeping %2i%% reach netBW %i KBits of " 02743 "streamBW %i KBits for %2.3f secs == realBW %i (ClBuf: %2.3f)\n", 02744 stats->getAdaptRate(), net_bw*8/1024, stats->getStreamBW()*8/1024, 02745 stats->getAdaptSecs(),real_bw/128, stats->getBufAheadSec()); 02746 02747 } 02748 02749 02750 02751 /***************************************************************/ 02752 void Rtp::markFullFramePartially(bool dropped, bool sent, u32 ts) { 02753 NetPacket_t *p=NULL; 02754 02755 dprintf_full("Rtp::markFullFramePartially%s TS %i\n",dropped?"DROPPED":"SENT",ts); 02756 preQ.MUTEX_Q.lock(); 02757 02758 p=preQ.Q; 02759 while (p != NULL) { 02760 if (p->rtp_ts == ts) { 02761 assert ( (!p->partially_dropped && !p->partially_sent) || 02762 (p->partially_dropped && !p->partially_sent) || 02763 (!p->partially_dropped && p->partially_sent)); 02764 02765 p->partially_dropped = dropped; 02766 p->partially_sent = sent; 02767 //dprintf_full("Rtp::markFullFramePartially%s TS %i:%i\n",dropped?"DROPPED":"SENT",ts,p->frag_no); 02768 } 02769 p=p->next; 02770 } 02771 02772 preQ.MUTEX_Q.release(); 02773 return; 02774 } 02775 02776 02777 /*************************************************************** 02778 * @param net_bw: what is the actual network capability? normally less :( 02779 */ 02780 void Rtp::adaptPreQ(int net_bw) { 02781 CompareNetPackets cmpFnc; 02782 priority_queue<NetPacket_t*,std::vector<NetPacket_t*>,CompareNetPackets> pq(cmpFnc); 02783 int size=0; 02784 NetPacket_t *p=NULL, *pp=NULL; 02785 02786 dprintf_full("Rtp:adaptPreQ: keeping %2i%% reach netBW %i KBits of streamBW" 02787 " %i KBits for %2.3f secs == realBW %i (ClBuf: %2.3f)\n", 02788 stats->getAdaptRate(), net_bw*8/1024, stats->getStreamBW()*8/1024, 02789 stats->getAdaptSecs(),real_bw/128, stats->getBufAheadSec()); 02790 02791 // dprintf_full("Rtp::adaptPreQ waiting for preQ mutex\n"); 02792 preQ.MUTEX_Q.lock(); 02793 //dprintf_full("Rtp::adaptPreQ got preQ mutex\n"); 02794 p=preQ.Q; 02795 02796 // read in full streamout block and mark it to non-drop 02797 // this includes all partially dropped or sent frame-packets, I_VOPs, P_VOPS.... and RTX 02798 // because all of them shall be streamed in the next sec. 02799 while (p && (size+p->pl_size <= (unsigned)real_bw)) { 02800 pq.push(p); 02801 size += p->pl_size; 02802 p->drop_me = false; 02803 p=p->next; 02804 } 02805 02806 // chop off the packets to meet the net_bw 02807 // we have to ignore partially sent frames and RTX, we force dropping of partially dropped ones 02808 while (!pq.empty() && (size > net_bw)) { 02809 p=pq.top(); 02810 pq.pop(); 02811 02812 // if (p->prio == IO_NETWORK_HIGHEST_PRIORITY) { 02813 if (p->prio <= RTP_MIN_PRIO) { //keep at least 10 fps 02814 //FIXME: what now? switching? 02815 dprintf_full("Rtp::adaptPreQ BAILING OUT because below RTP_MIN_PRIO!! (prio %i VOP_type %s TS %i:%i)\n" 02816 ,p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_ts,p->rtp_seq); 02817 break; 02818 } 02819 02820 if ( ((p->resent_counter == 0) && (!p->partially_sent)) || (p->partially_dropped) ) { 02821 //BVOP for first send, so drop it! 02822 size -= p->pl_size; //dont reduce size for RTX, so we will adapt more to fit RTX in network 02823 p->drop_me = true; 02824 dprintf_full("Rtp::adaptPreQ marking prio %i VOP_type %s TS %i:%i for dropping... partially_dropped %i\n",p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_ts,p->rtp_seq,p->partially_dropped); 02825 } else 02826 dprintf_full("Rtp::adaptPreQ NOT marking since its a RTX or partially sent (prio %i VOP_type %s TS %i:%i)\n" 02827 ,p->prio,Frame::VOPTypeToChar(p->vop_type),p->rtp_ts,p->rtp_seq); 02828 } 02829 02830 // DELETE the rest of FULL FRAME 02831 // only if B-VOP for first send, 02832 if (p && !pq.empty() && (p->prio > RTP_MIN_PRIO) 02833 && (p->resent_counter == 0) && (!p->partially_sent) ) { 02834 do { 02835 pp = pq.top(); 02836 pq.pop(); 02837 if (pp->rtp_ts == p->rtp_ts) { //no checking on RTX is needed, since list is sorted by TS:seqNo 02838 pp->drop_me = true; 02839 dprintf_full("Rtp::adaptPreQ rest-marking prio %i VOP_type %s TS %i:%i for dropping...\n",pp->prio,Frame::VOPTypeToChar(p->vop_type),pp->rtp_ts,p->rtp_seq); 02840 } else 02841 break; 02842 } while (!pq.empty() ); 02843 } 02844 02845 preQ.isDirty = false; 02846 preQ.MUTEX_Q.release(); 02847 } 02848 02849 02850 02851 /*************************************************************** 02852 * sets a shallow copy of @param rtx at the ESInfo object */ 02853 void Rtp::setRtxInfo(const rtx_info* rt) { 02854 assert(rt); 02855 dprintf_full("Rtp::setRtxInfo \n"); 02856 if(rtx) { 02857 delete rtx; 02858 } 02859 rtx= new rtx_info(rt); 02860 };