Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members
ProxySession.cpp
00001 /*********************************************************************** 00002 * * 00003 * ViTooKi * 00004 * * 00005 * title: ProxySession.cpp * 00006 * * 00007 * * 00008 * * 00009 * ITEC institute of the University of Klagenfurt (Austria) * 00010 * http://www.itec.uni-klu.ac.at * 00011 * * 00012 * * 00013 * For more information visit the ViTooKi homepage: * 00014 * http://ViTooKi.sourceforge.net * 00015 * vitooki-user@lists.sourceforge.net * 00016 * vitooki-devel@lists.sourceforge.net * 00017 * * 00018 * This file is part of ViTooKi, a free video toolkit. * 00019 * ViTooKi is free software; you can redistribute it and/or * 00020 * modify it under the terms of the GNU General Public License * 00021 * as published by the Free Software Foundation; either version 2 * 00022 * of the License, or (at your option) any later version. * 00023 * * 00024 * This program is distributed in the hope that it will be useful, * 00025 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00026 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 00027 * GNU General Public License for more details. * 00028 * * 00029 * You should have received a copy of the GNU General Public License * 00030 * along with this program; if not, write to the Free Software * 00031 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, * 00032 * MA 02111-1307, USA. * 00033 * * 00034 ***********************************************************************/ 00035 00036 /*********************************************************************** 00037 * * 00038 * REVISION HISTORY: * 00039 * * 00040 * * 00041 * * 00042 ***********************************************************************/ 00043 00044 #include "ProxySession.hpp" 00045 #include "RTSP.hpp" 00046 #include "DataChannel.hpp" 00047 #include "DataSink.hpp" 00048 #include "../cache/MetaObject.hpp" 00049 #include "VideoESInfo.hpp" 00050 #include "../cache/AdmissionControl.hpp" 00051 #include "IO.hpp" 00052 #include "ServerSession.hpp" 00053 #include "ClientSession.hpp" 00054 #include "Protocol.hpp" 00055 #include "Url.hpp" 00056 #include "RFC3016.hpp" 00057 #include "../cache/CacheManager.hpp" 00058 #include "BitField.hpp" 00059 #include "ReferenceCounter.hpp" 00060 #include "Statistics.hpp" 00061 00062 #include "../metadata/MP21.hpp" 00063 #include "../io/Rtp.hpp" 00064 #include "../io/SimpleRtp.hpp" 00065 #include "../io/MPGStreamIO.hpp" 00066 #include "../io/IncompleteIO.hpp" 00067 #include "../io/IOCreator.hpp" 00068 #include "../adaptors/TemporalAdaptor.hpp" 00069 #include "../adaptors/DataDump.hpp" 00070 #include "../adaptors/MP4Decoder.hpp" 00071 #include "../adaptors/MP4audioDecoder.hpp" 00072 #include "AudioAdaptorChain.hpp" 00073 #include "VideoAdaptorChain.hpp" 00074 00075 #ifndef WIN32 00076 #include <arpa/inet.h> 00077 #include <sys/types.h> 00078 #include <sys/socket.h> 00079 #include <netdb.h> 00080 #include <netinet/in.h> 00081 #else 00082 #include <winsock2.h> 00083 #include <ws2tcpip.h> 00084 #ifndef WINCE 00085 #include <direct.h> 00086 #include <Wspiapi.h> 00087 #endif 00088 #endif 00089 00090 00091 ProxyStatistics prxStat; 00092 00093 ProxySession::ProxySession(int socket, const struct sockaddr_in *clint, 00094 CacheManager * c, AdmissionControl* adm, 00095 bool allowRTX,SessionCounter* counter, 00096 TerminalCapabilities* defTC):Session(), 00097 rtxEnabled(allowRTX),neverForwardMetaData(true) 00098 { 00099 assert(adm); 00100 dprintf_full("ProxySession::ProxySession\n"); 00101 prxStat.requests++; 00102 this->counter = counter; 00103 admControl=adm; 00104 state = SESSION_CLOSED; 00105 rtxEnabledToServer=false; 00106 rtxEnabledToClient=false; 00107 delMp4Stream=false; 00108 dualMiss=false; 00109 canAdmitRequest=true; 00110 dontCacheDualMiss=false; 00111 cm = c; 00112 if (clint) { 00113 client = new sockaddr_in(); 00114 memcpy(client, clint, sizeof(sockaddr_in)); 00115 clientId=(u32)client->sin_addr.s_addr; 00116 } else { 00117 client = NULL; 00118 clientId=0; 00119 } 00120 serverSession=NULL; 00121 server=NULL; 00122 sessionControlChannel=socket; 00123 prot = NULL; 00124 url = NULL; 00125 termCap=defTC; 00126 userPref=NULL; 00127 suggestedAdaptor=NULL; 00128 mp4Stream=NULL; 00129 playerId=NULL;serverID=NULL; 00130 insertMp4StreamIntoCM=false; 00131 noDescribeSeenYet=true; 00132 resUsage.cpu=resUsage.disk=resUsage.network=0.0; 00133 origSrc=NULL; 00134 mp4StreamReserved = 0; 00135 origSrcReserved = 0; 00136 } 00137 00138 ProxySession::~ProxySession() 00139 { 00140 dprintf_full("ProxySession::~ProxySession()\n"); 00141 prxStat.endedRequests++; 00142 admControl->freeRequest(&resUsage); 00143 if(origSrc) 00144 origSrc->getUsageCounter()->decrease(); 00145 if(mp4Stream) 00146 mp4Stream->getUsageCounter()->decrease(); 00147 00148 if(dualMiss && origSrc && !dontCacheDualMiss) { 00149 cm->lockCache(); 00150 cm->putVideo(origSrc,origSrcReserved); 00151 cm->unlockCache(); 00152 origSrc=NULL;origSrcReserved=0; 00153 } 00154 if( insertMp4StreamIntoCM && mp4Stream) { 00155 cm->lockCache(); 00156 cm->putVideo(mp4Stream,mp4StreamReserved); 00157 cm->unlockCache(); 00158 mp4StreamReserved=0; 00159 delMp4Stream=false; 00160 mp4Stream=NULL; 00161 } 00162 //TEST 00163 //assert(mp4StreamReserved == 0 && origSrcReserved == 0); 00164 00165 if(mp4StreamReserved > 0) 00166 { 00167 cm->freeDiskSpace (mp4StreamReserved); 00168 mp4StreamReserved = 0; 00169 } 00170 if(origSrcReserved > 0) 00171 { 00172 cm->freeDiskSpace (origSrcReserved); 00173 origSrcReserved = 0; 00174 } 00175 00176 00177 if(origSrc && dualMiss && dontCacheDualMiss) { 00178 delete origSrc; 00179 } 00180 origSrc=NULL; 00181 #ifdef WIN32 00182 closesocket(sessionControlChannel); 00183 #else 00184 ::close(sessionControlChannel); 00185 #endif 00186 tearDown(TEARDOWN_ALL,true,NULL,NULL); 00187 if (prot) { 00188 delete prot; 00189 prot = NULL; 00190 } 00191 if (client) { 00192 delete client; 00193 client = NULL; 00194 } 00195 if (url) { 00196 delete url; 00197 url = NULL; 00198 } 00199 if(serverSession) { 00200 delete serverSession; 00201 } 00202 if(playerId) 00203 delete playerId; 00204 00205 std::list < rtx_group* >::iterator j; 00206 while(!rtxInfo.empty()) { 00207 j=rtxInfo.begin(); 00208 delete *j; 00209 rtxInfo.pop_front(); 00210 } 00211 std::list < rtx_info* >::iterator k; 00212 00213 while(!rtxInfoServer.empty()) { 00214 k=rtxInfoServer.begin(); 00215 delete *k; 00216 rtxInfoServer.pop_front(); 00217 } 00218 if(serverID) 00219 delete serverID; 00220 00221 if(delMp4Stream && mp4Stream) { 00222 mp4Stream->getUsageCounter()->decrease(); 00223 delete mp4Stream; 00224 } 00225 mp4Stream=NULL; 00226 if(termCap) 00227 delete termCap; 00228 if(userPref) 00229 delete userPref; 00230 if(server) 00231 delete server; 00232 00233 if(suggestedAdaptor) 00234 delete suggestedAdaptor; 00235 } 00236 00237 00238 /***********************************************************************/ 00239 bool ProxySession::options(const Url* fileName, const char* remaining) 00240 { 00241 prot->incSeqNr(); 00242 return true; 00243 } 00244 00245 00246 /* 00247 * -parse cmd describe, detect if client supports retransmission 00248 * -check with CM: if not or only partially cached forward the command to the server 00249 * and create a ServerSession object 00250 * -read the reply from the server, detect if server supports retransmission 00251 */ 00252 bool ProxySession::connect(const Url* fileName, const char* remaining) 00253 { 00254 assert(fileName && remaining && prot); 00255 bool ret=prot->parseCMDDescribe(this, fileName, remaining,&playerId); 00256 // illegal describe?? 00257 if(!ret) { 00258 prot->generateBadCMD(); 00259 const char* buf=prot->getBuffer(); 00260 strcpy(buffer,buf); 00261 state=SESSION_ERR; 00262 prxStat.rejectedDueToErr++; 00263 } 00264 // problem: a 2nd describe arrives, don't allow changing termcaps in the same session! 00265 else { 00266 // we have a valid DESCRIBE 00267 // is this the first describe? then mp4Stream should be NULL 00268 TerminalCapabilities* dummyTermCap=MP21::createTerminalCapability(remaining); 00269 if(dummyTermCap) { 00270 // overwrite the default with the new ones 00271 dprintf_full("ProxySession::connect Found TermCaps\n"); 00272 if(termCap) 00273 delete termCap; 00274 // if termcaps are specified, they take precedence over default UserPrefs! 00275 if(userPref) 00276 delete userPref; 00277 userPref=NULL; 00278 } 00279 if(dummyTermCap) 00280 termCap=dummyTermCap; 00281 00282 if(!termCap) { 00283 // maybe UserPreferences? 00284 // keep existing UserPref 00285 UserPreferences* upDummy=MP21::createUserPreferences(remaining); 00286 dprintf_full("ProxySession::connect Found UserPreferences %p in XX%sXX\n",upDummy,remaining); 00287 00288 if(userPref && upDummy) 00289 delete userPref; 00290 if(upDummy) { 00291 userPref=upDummy; 00292 } 00293 } 00294 setUrl(fileName); // sets insertMP4.. to true if adaptation is needed 00295 00296 if(!mp4Stream) // no resUsage known at this time point 00297 insertMp4StreamIntoCM=true; 00298 else if(resUsage.network < 0.0000001) { 00299 // mp4stream is set 00300 canAdmitRequest=false; 00301 fprintf(stderr,"ProxySession::connect: Request rejected due to AdmissionControl\n"); 00302 state=SESSION_ERR; 00303 prot->generateNotEnoughBandwidth(); 00304 prot->incSeqNr(); 00305 insertMp4StreamIntoCM=false; 00306 if(origSrc && mp4Stream) { 00307 delete mp4Stream; // clone created in setUrl 00308 } 00309 origSrc=mp4Stream=NULL; 00310 return true; 00311 } 00312 00313 if(origSrc) 00314 origSrc->getUsageCounter()->increase(); 00315 if(mp4Stream) 00316 mp4Stream->getUsageCounter()->increase(); 00317 if(!mp4Stream) { 00318 // serverConnection needed 00319 dprintf_full("ProxySession::connect with url %s fileName %s\n",url->toString(),fileName->toString()); 00320 if(!establishConnectionToServer()) { 00321 dprintf_full("ProxySession::connect with url %s fileName %s\n",url->toString(),fileName->toString()); 00322 // failed to connect to server 00323 prot->generateDestUnreachable(); 00324 strcpy(buffer,prot->getBuffer()); 00325 ret=false; 00326 } 00327 else{ 00328 dprintf_full("ProxySession::connect with url %s fileName %s\n",url->toString(),fileName->toString()); 00329 ret=handleConnectMiss(fileName); //handle miss comm. 00330 00331 if(resUsage.network < 0.00001) { 00332 // mp4stream is set 00333 canAdmitRequest=false; 00334 fprintf(stderr,"ProxySession::connect: Request rejected due to AdmissionControl\n"); 00335 state=SESSION_ERR; 00336 insertMp4StreamIntoCM=false; 00337 if(origSrc) 00338 delete origSrc; 00339 origSrc=NULL; 00340 if(mp4Stream) 00341 delete mp4Stream; 00342 mp4Stream=NULL; 00343 if(mp4StreamReserved > 0) 00344 { 00345 cm->freeDiskSpace (mp4StreamReserved); 00346 mp4StreamReserved = 0; 00347 } 00348 if(origSrcReserved > 0) 00349 { 00350 cm->freeDiskSpace (origSrcReserved); 00351 origSrcReserved = 0; 00352 } 00353 prot->generateNotEnoughBandwidth(); 00354 prot->incSeqNr(); 00355 const char* buf=prot->getBuffer(); 00356 strcpy(buffer,buf); 00357 return true; 00358 } 00359 if(mp4Stream) 00360 mp4Stream->getUsageCounter()->increase(); 00361 if(origSrc) 00362 origSrc->getUsageCounter()->increase(); 00363 if(ret) 00364 ret=handleConnectHit(fileName); 00365 } 00366 } 00367 else { 00368 ret=handleConnectHit(fileName); 00369 } 00370 detectRtxCapabilities(); 00371 if(!mp4Stream) { 00372 ret=false; 00373 } 00374 else if(ret) { 00375 if(noDescribeSeenYet) { 00376 /* 00377 // insert video into cm! we do this in delete! 00378 if(insertMp4StreamIntoCM && mp4Stream) { 00379 mp4Stream->getUsageCounter()->increase(); 00380 cm->putVideo(mp4Stream); 00381 } 00382 */ 00383 ; 00384 } 00385 if(state!=SESSION_ERR) 00386 state=SESSION_INITIALIZED; 00387 noDescribeSeenYet=false; 00388 } 00389 } 00390 if(!ret) { 00391 state=SESSION_ERR; 00392 } 00393 prot->incSeqNr(); 00394 return ret; 00395 }; 00396 00397 bool ProxySession::handleConnectHit(const Url* fileName) 00398 { 00399 // no serverconnection is needed, no request forwarding is needed 00400 assert(mp4Stream); 00401 dprintf_full("ProxySession::handleConnectHit\n"); 00402 00403 bool ret=prot->generateCMDDescribeReply(this,fileName,fileName->getServerName(),playerId); 00404 const char* buf=prot->getBuffer(); 00405 strcpy(buffer,buf); 00406 return ret; 00407 }; 00408 00409 bool ProxySession::handleConnectMiss(const Url* fileName) 00410 { 00411 // we need a serverconnection 00412 assert(serverSession); 00413 dprintf_full("ProxySession::handleConnectMiss fn%p (%s) s%p\n", fileName,fileName->toString(),serverSession); 00414 if(!serverSession->getProtocol()) 00415 serverSession->determineProtocol(buffer); 00416 if(serverID) 00417 delete serverID; 00418 // reset rtx_groups 00419 std::list < rtx_group* >::iterator j; 00420 while(!rtxInfo.empty()) { 00421 j=rtxInfo.begin(); 00422 delete *j; 00423 rtxInfo.pop_front(); 00424 } 00425 00426 // if playerId is not our player and the server is our server 00427 // regenerate the describe, so that we can enable rtx 00428 // Problem: we don't know with which kind of server we are dealing at the moment 00429 // so regenerate the DESCRIBE always!!! 00430 // replace the User Agent string with our id 00431 // no foreign player supports termcaps so far, set to NULL 00432 if(neverForwardMetaData) 00433 serverSession->getProtocol()->generateDescribeForUrl(url->toString(),PLAYER_ID,NULL); 00434 else { 00435 if(this->userPref) 00436 serverSession->getProtocol()->generateDescribeForUrl(url->toString(),PLAYER_ID,NULL,userPref); 00437 else if(this->termCap) 00438 serverSession->getProtocol()->generateDescribeForUrl(url->toString(),PLAYER_ID,termCap); 00439 else 00440 serverSession->getProtocol()->generateDescribeForUrl(url->toString(),PLAYER_ID,NULL); 00441 } 00442 strcpy(buffer,serverSession->getProtocol()->getBuffer()); 00443 00444 //fwd the cmd to the server 00445 dprintf_full("ProxySession::handleConnectMiss: responding...\n"); 00446 serverSession->sendResponse(buffer,strlen(buffer)); 00447 serverSession->readRequest(buffer,MSG_BUFFER_SIZE); 00448 dprintf_full("ProxySession::handleConnectMiss: after request forwarding %p\n",serverSession->getProtocol()); 00449 serverSession->getProtocol()->incSeqNr(); 00450 dprintf_full("ProxySession::handleConnectMiss: after incSeqNr \n"); 00451 ContainerInfo* dummy=NULL; 00452 if(mp4Stream) { 00453 //FIXME? filename->toString or url->toString.... why fileName not url??? seems the same.... 00454 dummy=prot->parseResponseDescribe(buffer, fileName->toString(),&serverID, &this->rtxInfo); 00455 assert(dummy); 00456 delete dummy; //just use the old mp4stream 00457 } 00458 else 00459 //FIXME? filename->toString or url->toString.... why fileName not url??? seems the same.... 00460 mp4Stream=prot->parseResponseDescribe(buffer, fileName->toString(),&serverID, &this->rtxInfo); 00461 dprintf_full("ProxySession::handleConnectMiss connect to server >%s<, url #%s# fileName #%s#\n",serverID,url->toString(), fileName->toString()); 00462 if(!mp4Stream) { 00463 // forward the buffer from the server 00464 return false; 00465 } 00466 00467 //FIXME: THESE TWO BLOCKS ARE HACKS! 00468 // correct voptimeincrement 00469 VideoESInfo* ves=mp4Stream->getFirstVisualES(); 00470 if(ves) { 00471 MP4Decoder* mp4d=new MP4Decoder(ves,true); 00472 mp4d->initialize(); 00473 delete mp4d; 00474 } 00475 00476 AudioESInfo* aes=mp4Stream->getFirstAudioES(); 00477 if(aes) { 00478 MP4audioDecoder* mp4a=new MP4audioDecoder(aes,true); 00479 mp4a->initialize(); 00480 delete mp4a; 00481 } 00482 00483 // check for transcoding!!!!! 00484 prxStat.bytesRequested+=mp4Stream->getEstimatedNetworkFileSize(); 00485 ContainerInfo* pCI=mp4Stream->clone(); 00486 MetaObject* meta=new MetaObject(pCI); 00487 if(userPref) { 00488 // FIXME: hardcode delay values to 100ms 00489 dprintf_full("ServerSession::handleConnectMiss meta && userPref\n"); 00490 if(suggestedAdaptor) 00491 delete suggestedAdaptor; 00492 suggestedAdaptor=admControl->makeBestMatch(userPref,100,meta,&pCI,&resUsage); 00493 00494 } 00495 else if(termCap) { 00496 // bandwidth should be the minimum of network and decodeBandwidth 00497 // if one of them is zero, ignore the zero param 00498 u32 bandwidth=termCap->getNetworkCapacityInByte()*8; 00499 if(bandwidth==0) 00500 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00501 else if(termCap->getMaxDecoderBitRateInBit() && 00502 termCap->getMaxDecoderBitRateInBit() < bandwidth) { 00503 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00504 } 00505 // the returned mp4stream is equal to the previous mp4Stream here 00506 // we have only one in MetaObject 00507 bool exactMatch=false; 00508 // ContainerInfo* dummyMp4Stream = 00509 meta->findMp4Stream(0,termCap->getDisplayWidth(),0,termCap->getDisplayHeight(), 00510 0,bandwidth,(termCap->getColorDisplay()==0), 00511 (float)termCap->getDisplayRefreshRate() ,&exactMatch); 00512 if(!exactMatch) { 00513 insertMp4StreamIntoCM=true; 00514 suggestedAdaptor=admControl->makeExactMatch(termCap->getDisplayWidth(), 00515 termCap->getDisplayHeight(), 00516 bandwidth, 00517 (termCap->getColorDisplay()==0), 00518 (float)termCap->getDisplayRefreshRate(), 00519 mp4Stream,&resUsage); 00520 } 00521 00522 } 00523 00524 if (!suggestedAdaptor) {// no transcoding 00525 VideoESInfo* vis=mp4Stream->getFirstVisualES(); 00526 if(vis) { 00527 Feature src(vis); 00528 src.isCached=false; 00529 ResourceUsage* tmpRU=CostFunction::calcResourceUsage(&src,&src,admControl->getResourceLimit()); 00530 if(tmpRU) { 00531 resUsage.network=tmpRU->network; 00532 resUsage.disk=tmpRU->disk; 00533 resUsage.cpu=tmpRU->cpu; 00534 delete tmpRU; 00535 } 00536 } 00537 insertMp4StreamIntoCM=true; 00538 } 00539 00540 if(suggestedAdaptor && mp4Stream) 00541 suggestedAdaptor->setESInfo(mp4Stream->getFirstVisualES()); 00542 00543 delete meta; 00544 00545 // check for admission but not for userPrefs+termcaps which are already inserted! 00546 if(!userPref && !termCap && !admControl->insertRequest(&resUsage)) { 00547 resUsage.cpu=resUsage.disk=resUsage.network=0.0; 00548 if(mp4Stream) 00549 delete mp4Stream; 00550 mp4Stream=NULL; 00551 if(suggestedAdaptor) 00552 delete suggestedAdaptor; 00553 suggestedAdaptor=NULL; 00554 } 00555 00556 if(mp4Stream && suggestedAdaptor && 00557 strcmp(suggestedAdaptor->getName(),"Forwarder")!=0 && 00558 !strstr(suggestedAdaptor->getName(),"Temporal")) { // transcoding 00559 // we must insert two ContainerInfos! 00560 dualMiss=true; 00561 origSrc=mp4Stream; 00562 mp4Stream=mp4Stream->clone(); 00563 insertMp4StreamIntoCM=true; 00564 } else if(mp4Stream && suggestedAdaptor && 00565 (strcmp(suggestedAdaptor->getName(),"Forwarder")==0 || 00566 strstr(suggestedAdaptor->getName(),"Temporal"))) { 00567 insertMp4StreamIntoCM=false; 00568 } 00569 00570 if(insertMp4StreamIntoCM && mp4Stream) { 00571 u64 durInMS=0; 00572 if(mp4Stream->getFirstVisualES()) 00573 durInMS=mp4Stream->getFirstVisualES()->getDurationInMs(); 00574 else if(mp4Stream->getFirstAudioES()) 00575 durInMS=mp4Stream->getFirstAudioES()->getDurationInMs(); 00576 u32 byteSize=0; 00577 if(userPref) { 00578 byteSize=userPref->bitrate.best/8*durInMS/1000; 00579 } 00580 else if(termCap) { 00581 u32 bandwidth=termCap->getNetworkCapacityInByte()*8 -1; 00582 if(bandwidth==0) 00583 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00584 else if(termCap->getMaxDecoderBitRateInBit() && 00585 termCap->getMaxDecoderBitRateInBit() < bandwidth) { 00586 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00587 } 00588 byteSize=bandwidth/8*durInMS/1000; 00589 if(byteSize > mp4Stream->getEstimatedNetworkFileSize()) 00590 byteSize=mp4Stream->getEstimatedNetworkFileSize(); 00591 } 00592 else 00593 byteSize=mp4Stream->getEstimatedNetworkFileSize(); 00594 cm->lockCache(); 00595 assert(mp4StreamReserved == 0); 00596 if(cm->reserveDiskSpace(byteSize)) 00597 mp4StreamReserved+=byteSize; 00598 else { 00599 insertMp4StreamIntoCM=false; 00600 } 00601 cm->unlockCache(); 00602 } 00603 00604 if(dualMiss && origSrc) { 00605 dprintf_full("ProxySession::handleConnectMiss dualMiss!\n"); 00606 cm->lockCache(); 00607 assert(origSrcReserved == 0); 00608 if(cm->reserveDiskSpace(origSrc->getEstimatedNetworkFileSize())) 00609 origSrcReserved=origSrc->getEstimatedNetworkFileSize(); 00610 else { 00611 origSrcReserved=0; 00612 // we can't cache the original but maybe we can cache the transcoded! 00613 // DON't change insertmp4streamintocm!!! 00614 dontCacheDualMiss=true; 00615 /*insertMp4StreamIntoCM=false; 00616 if(mp4StreamReserved > 0) 00617 { 00618 cm->freeDiskSpace (mp4StreamReserved); 00619 mp4StreamReserved = 0; 00620 } */ 00621 } 00622 cm->unlockCache(); 00623 } 00624 00625 return true; 00626 }; 00627 00628 00629 void ProxySession::detectRtxCapabilities() 00630 { 00631 if(serverID && strstr(serverID,SERVER_ID)) { 00632 rtxEnabledToServer=true&rtxEnabled; 00633 } 00634 else { 00635 rtxEnabledToServer=false; 00636 } 00637 if(playerId && strstr(playerId,PLAYER_ID)) { 00638 rtxEnabledToClient=true&rtxEnabled; 00639 } 00640 else { 00641 rtxEnabledToClient=false; 00642 } 00643 dprintf_full("ProxySession::detectRtxCapabilities() rtxToClient %i rtxToServer %i\n",rtxEnabledToClient,rtxEnabledToServer); 00644 }; 00645 00648 IO* ProxySession::handleSetupMiss(const Url* fileName, int esId, int* sessionId) 00649 { 00650 assert(mp4Stream && serverSession && serverSession->getProtocol() && sessionId); 00651 // always regenerate the setup request to catch scenarios where the client supports 00652 // retransmission and the server doesn't 00653 ESInfo* es=mp4Stream->getES((u32)esId); 00654 if(!es) { 00655 // stream not found 00656 prot->generateFileNotFound(); 00657 strcpy(buffer,prot->getBuffer()); 00658 return NULL; 00659 } 00660 RTSP* serverProt=serverSession->getProtocol(); 00661 IO *input=NULL; 00662 int proxyPortToServer=0,serverPortToProxy=0, proxyRtxPortToServer=0; 00663 int retProxyPortToServer=0; 00664 bool bUnicast=true,retBUnicast=false; 00665 char* ssrc=NULL; 00666 rtx_info* esRtxInfo=new rtx_info(); 00667 proxyPortToServer=portGen.getNextFreePortPair(); 00668 if(rtxEnabledToServer) { 00669 dprintf_full("ProxySession::setup rtx to server is enabled\n"); 00670 proxyRtxPortToServer=portGen.getNextFreePortPair(); 00671 } 00672 serverProt->generateSetup(fileName->toString(),esId,proxyPortToServer,bUnicast,proxyRtxPortToServer,*sessionId); 00673 00674 const char* serverBuffer=serverProt->getBuffer(); 00675 00676 //fwd the SETUP to the server 00677 serverSession->sendResponse(serverBuffer,strlen(serverBuffer)); 00678 serverSession->readRequest(buffer,MSG_BUFFER_SIZE); // reply from server is stored in buffer 00679 serverSession->getProtocol()->incSeqNr(); 00680 // parse the reply from the server 00681 int retVal=prot->parseResponseSetup(buffer,sessionId,&retProxyPortToServer, 00682 &serverPortToProxy,&retBUnicast,&ssrc,esRtxInfo); 00683 if(retVal<2 || retProxyPortToServer!=proxyPortToServer || retProxyPortToServer<=0 || 00684 serverPortToProxy<=0 || retBUnicast!=bUnicast || retBUnicast==false) { 00685 // generate error 00686 prot->generateBadCMD(); 00687 portGen.closePortPair(proxyPortToServer); 00688 if(rtxEnabledToServer) 00689 portGen.closePortPair(proxyRtxPortToServer); 00690 00691 strcpy(buffer,prot->getBuffer()); 00692 input= NULL; 00693 } 00694 else { 00695 // succeeded in parsing reply 00696 // we can now create the Input IO class 00697 // but first check if the server returned any rtx info 00698 if(rtxEnabledToServer && (esRtxInfo->localPort<=0 || esRtxInfo->remotePort<=0)) { 00699 portGen.closePortPair(proxyRtxPortToServer); 00700 esRtxInfo->localPort=0; 00701 esRtxInfo->remotePort=0; 00702 } 00703 // IncompleteIO is too difficult to handle 00704 // for now just assume, that it's a full miss 00705 00706 ESInfo* es=mp4Stream->getES((u32)esId); 00707 // build Rtp as input 00708 PacketizationLayer* pl=new RFC3016(); 00709 00710 const char* urlServerName=url->getServerName(); 00711 00712 #ifndef WINCE 00713 if(rtxEnabled) { 00714 #endif 00715 Statistics *stats = new Statistics(es,false); 00716 input=new Rtp(fileName->getPath(),serverPortToProxy,urlServerName, 00717 proxyPortToServer,es,pl,false,termCap,NULL,stats); 00718 #ifndef WINCE 00719 } 00720 else { 00721 char *logName = new char[MAX_STR_LEN]; 00722 struct timeval t; 00723 gettimeofday(&t,NULL); 00724 sprintf(logName,"log%10i.%3i_proxy.in",(int)t.tv_sec,(int)(t.tv_usec/1000)); 00725 input=new SimpleRtp(fileName->getPath(),serverPortToProxy,proxyPortToServer, 00726 urlServerName,es,pl,false,logName,true,SimpleRtp::MIXED_CTS_BITRATE,true,true); 00727 delete [] logName; 00728 } 00729 #endif 00730 00731 00732 if(rtxEnabledToServer) { 00733 dprintf_full("Proxysession::setup rtx for input is enabled\n"); 00734 input->setRtxInfo(esRtxInfo); 00735 } 00736 rtxInfoServer.push_front(esRtxInfo); // save it for garbage collection 00737 esRtxInfo=NULL; 00738 } 00739 // cleanup 00740 if(ssrc) 00741 delete ssrc; 00742 if(esRtxInfo) 00743 delete esRtxInfo; 00744 return input; 00745 }; 00748 DataSink* ProxySession::handleSetupHit(const Url* fileName, int esId, 00749 int remoteClientPort, int remoteUpperClientPort, 00750 int remoteClientRtxPort, 00751 int sesKey, const char* protocol, const char* unicast) 00752 { 00753 int localPortToClient=portGen.getNextFreePortPair(); 00754 int proxyRtxPortToClient=0; 00755 if(remoteClientRtxPort>0 && rtxEnabledToClient) { 00756 proxyRtxPortToClient=portGen.getNextFreePortPair(); 00757 } 00758 else { 00759 remoteClientRtxPort=proxyRtxPortToClient=0; 00760 } 00761 00762 // create an RTP with RFC3016 output channel 00763 // create DataChannel 00764 // open RTP session 00765 00766 ESInfo* es=mp4Stream->getES((u32)esId); 00767 if(!es) { 00768 // stream not found 00769 prot->generateFileNotFound(); 00770 strcpy(buffer,prot->getBuffer()); 00771 return NULL; 00772 } 00773 00774 PacketizationLayer * p = new RFC3016(); 00775 char *clientAddress = inet_ntoa(client->sin_addr); // points to a static array in inet.. 00776 Statistics *stats = NULL; 00777 IO* output=NULL; 00778 // no probs with inet_ntoa: rtp deep-copies the clientaddress 00779 00780 #ifndef WINCE 00781 if(rtxEnabled) { 00782 #endif 00783 stats = new Statistics(es,true); 00784 output = new Rtp(fileName->getPath(), remoteClientPort, clientAddress, localPortToClient, 00785 es, p, true,termCap,NULL,stats); 00786 #ifndef WINCE 00787 } 00788 else { 00789 char *logName = new char[MAX_STR_LEN]; 00790 struct timeval t; 00791 gettimeofday(&t,NULL); 00792 sprintf(logName,"log%10i.%3i_proxy.out",(int)t.tv_sec,(int)(t.tv_usec/1000)); 00793 output=new SimpleRtp(fileName->getPath(),remoteClientPort,localPortToClient, 00794 clientAddress,es,p,true,logName,true,SimpleRtp::MIXED_CTS_BITRATE,false,false); 00795 delete [] logName; 00796 } 00797 #endif 00798 00799 this->prot->generateSetupReply(this,fileName,sesKey,protocol,unicast,clientAddress, 00800 esId,remoteClientPort,remoteUpperClientPort,localPortToClient,remoteClientRtxPort,proxyRtxPortToClient); 00801 00802 strcpy(buffer,prot->getBuffer()); 00803 00804 rtx_group* group=getRtxGroup(esId); 00805 if(rtxEnabledToClient && proxyRtxPortToClient>0 && group && group->rtx) { 00806 dprintf_full("Proxysession::setup rtx for output is enabled\n"); 00807 group->rtx->remotePort=remoteClientRtxPort; 00808 group->rtx->localPort=proxyRtxPortToClient; 00809 output->setRtxInfo(group->rtx); 00810 } 00811 else { 00812 dprintf_err("Ses has stored %i groups \n",rtxInfo.size()); 00813 dprintf_full("ProxySession::setup: No rtx_port for es %i [%u,%u] g:%p\n", 00814 esId,remoteClientRtxPort,proxyRtxPortToClient,group); 00815 } 00816 output->open(); 00817 /* 00818 if( playerId && suggestedAdaptor==NULL && es && es->isVisualStream() && 00819 strstr(playerId, QT_PLAYER_ID) && 00820 strstr(playerId, OLD_BFRAMES_DISABLED_QT_PLAYER) ) { 00821 suggestedAdaptor=new TemporalAdaptor((VideoESInfo*)es); 00822 } 00823 */ 00824 00825 DataSink* clientObj = 00826 new DataSink((u32)client->sin_addr.s_addr,output, clientAddress, 00827 ntohl(client->sin_port)); 00828 return clientObj; 00829 }; 00830 00831 bool ProxySession::setup(const Url* fileName, const char* remaining) 00832 { 00833 assert(fileName && remaining && prot); 00834 dprintf_full("ProxySession::setup %s\n",url->toString()); 00835 bool hasSessionKey=false,hasRtxPort=false; 00836 char* protocol=NULL; 00837 char* unicast=NULL; 00838 00839 int sesKey=0,esId=0,rPort=0,rPort2=0; 00840 u32 clientRtxPort=0; 00841 //u32 proxyRtxPortToClient=0, proxyRtxPortToServer=0; 00842 00843 bool err=!prot->parseCMDSetup(this,fileName,remaining,&hasSessionKey,&sesKey,&protocol, 00844 &unicast,&esId,&rPort,&rPort2,&hasRtxPort,&clientRtxPort); 00845 if(!err) { 00846 if(!prot->isSupportedTransport(protocol)) { 00847 prot->generateIllegalTransport(); 00848 err=true; 00849 } 00850 } 00851 else { 00852 prot->generateBadCMD(); 00853 } 00854 00855 if(err) { 00856 dprintf_err("ProxySession::setup error in parsing setup request\n"); 00857 const char* buf=prot->getBuffer(); 00858 strcpy(buffer,buf); 00859 if(protocol) 00860 delete protocol; 00861 if(unicast) 00862 delete unicast; 00863 prot->incSeqNr(); 00864 return false; 00865 } 00866 if(hasSessionKey==false) { 00867 sesKey=prot->generateSessionKey(); 00868 } 00869 00870 // now check: is this ES cached? 00871 ESInfo* es=mp4Stream->getES((u32)esId); 00872 IO* input=NULL; 00873 DataSink* clientObj=NULL; 00874 // we accept as soon as we have more than 90% 00875 00876 u32 uiSize=es->getCurrentDemuxedMediaSize(); 00877 if(uiSize>0) { 00878 // cache hit, hurrahhh! 00879 // build an MPGStreamIO as input 00880 dprintf_full("ProxySession::setup HIT\n"); 00881 input=IOCreator::createInput(es, true); 00882 clientObj=handleSetupHit(fileName,esId,rPort,rPort2,clientRtxPort,sesKey,protocol,unicast); 00883 } 00884 else { 00885 // cache miss 00886 dprintf_full("ProxySession::setup MISS\n"); 00887 int sessionId=-1; // the sessionid from the server 00888 if(sessionTranslator.find(sesKey)!=sessionTranslator.end()) { 00889 // we already have a key 00890 sessionId=sessionTranslator[sesKey]; 00891 } 00892 int bakSessionId=sessionId; 00893 input=handleSetupMiss(fileName,esId,&sessionId); 00894 if(input) { 00895 if(bakSessionId!=sessionId) // a new one was generated 00896 sessionTranslator[sesKey]=sessionId; 00897 clientObj=handleSetupHit(fileName,esId,rPort,rPort2,clientRtxPort,sesKey,protocol,unicast); 00898 } 00899 } 00900 if(input && clientObj) { 00901 // now create the DataChannel 00902 // make sure we ve got the right es! 00903 if(suggestedAdaptor && es->isVisualStream()) 00904 suggestedAdaptor->setESInfo(es); 00905 if(dualMiss && origSrc && !dontCacheDualMiss) { 00906 ESInfo* esFullQ=origSrc->getES((u32)esId); 00907 00908 if(esFullQ) { 00909 00910 // save the full quality to a file and set the new input at origSrc 00911 // add a DataDumper to suggestedAdaptor 00912 char* pFileName=Url::transformUrlToLocalFile(origSrc->getUrl()); 00913 FILE* fp=NULL; 00914 char *out = new char[4096]; 00915 strcpy(out,"demux"); 00916 #ifdef WIN32 00917 strcat(out,"\\"); 00918 #else 00919 strcat(out,"/"); 00920 #endif 00921 strcat(out,pFileName); 00922 delete[] pFileName; 00923 while((fp=fopen(out,"r")) != 0) { 00924 // merde! file exists 00925 sprintf(out,"%s%u",out,(unsigned int)(rand()%10)); 00926 fclose(fp);fp=NULL; 00927 } 00928 if(fp) 00929 fclose(fp); 00930 // create a writer, use esFullQ to get correct FrameStatistic 00931 MPGStreamIO* output = new MPGStreamIO(esFullQ, out, true, false); 00932 output->open(); 00933 AdaptorChain* pAc; 00934 if(es->isAudioStream()) 00935 pAc=new AudioAdaptorChain(); 00936 else if(es->isVisualStream()) 00937 pAc=new VideoAdaptorChain(); 00938 if(es->isAudioStream()||es->isVisualStream()) { 00939 pAc->addAdaptor(new DataDump(es,output)); 00940 pAc->addAdaptor(suggestedAdaptor); 00941 suggestedAdaptor=pAc; 00942 esFullQ->setInput(out); 00943 } 00944 00945 delete [] out; 00946 } 00947 } 00948 DataChannel * d = new DataChannel(input, es,(es->isVisualStream()?this->suggestedAdaptor:NULL)); 00949 suggestedAdaptor=NULL; 00950 d->setSessionId(sesKey); 00951 d->insert(clientObj); // add the client to the DC 00952 00953 if(insertMp4StreamIntoCM) { 00954 char* outCacheName=d->enableCaching(false,"demux",true,fileName); 00955 // we must delete the old file and set the new one but without incompleteio there is no old file :-) 00956 // set the new one here because other clients could use the same ESInfo 00957 dprintf_full("ProxySession.setup: prior calling setInput\n"); 00958 es->setInput(outCacheName); 00959 delete[] outCacheName; 00960 } 00961 00962 // set the DataChannel at the Session 00963 setDataChannel(d); 00964 d=NULL; 00965 clientObj=NULL; 00966 input=NULL; 00967 err=false; 00968 } 00969 else { 00970 err=true; 00971 prot->generateInternalServerError(); 00972 strcpy(buffer,prot->getBuffer()); 00973 } 00974 00975 //cleanup 00976 if(protocol) 00977 delete protocol; 00978 if(unicast) 00979 delete unicast; 00980 if(input) 00981 delete input; 00982 if(clientObj) 00983 delete clientObj; 00984 prot->incSeqNr(); 00985 return !err; 00986 } 00987 00988 bool ProxySession::play(const Url* fileName, const char* remaining) 00989 { 00990 assert(fileName && remaining); 00991 00992 int sessionId=0;u32 startFrame=0, endFrame=0; 00993 bool res=false; 00994 bool remote=false; 00995 double start=0.0, end=0.0; 00996 double prefetch=0.0; 00997 00998 if(!prot->parseCMDPlay(this,fileName,remaining,&sessionId,&start,&end,&prefetch)) { 00999 prot->generateSessionNotFound(); 01000 const char* buf=prot->getBuffer(); 01001 strcpy(buffer,buf); 01002 prot->incSeqNr(); 01003 return false; 01004 } 01005 if(end<=0.00001) { 01006 end=mp4Stream->getFirstVisualES()->getDurationInMs()/1000.0; 01007 } 01008 // if we can find an entry in sessiontranslator then we have to fwd the play 01009 remote=(sessionTranslator.find(sessionId)!=sessionTranslator.end()); 01010 if(remote) { 01011 // fwd,don't allow proxy to introduce such a high delay, reduce prefetch 01012 serverSession->getProtocol()->generatePlay(url->toString(),sessionTranslator[sessionId],start,end,prefetch/2); 01013 const char* playForServer=serverSession->getProtocol()->getBuffer(); 01014 serverSession->sendResponse(playForServer,strlen(playForServer)); 01015 01016 // get reply 01017 serverSession->readRequest(bufferBackup,MSG_BUFFER_SIZE); 01018 serverSession->getProtocol()->incSeqNr(); 01019 if(!RTSP::isCmdOk(bufferBackup)) { 01020 dprintf_err("ProxySession::play: Server sent an error\n"); 01021 prot->incSeqNr(); 01022 strcpy(buffer,bufferBackup); 01023 return false; 01024 } 01025 } 01026 // if we came that far we have an okay 01027 const ESInfo* es=NULL; 01028 for (list < DataChannel * >::iterator i = this->channels.begin(); i != channels.end(); 01029 i++) { 01030 dprintf_full("ProxySession::play: Looking for sesId %i, found %i\n",sessionId,(*i)->getSessionId()); 01031 if ((*i)->getSessionId() == (u32)sessionId) { 01032 es=(*i)->getESInfo(); 01033 01034 if(es) { 01035 startFrame=es->getFrameNumber(start); 01036 endFrame=es->getFrameNumber(end); 01037 DataSink* clientOut=(*i)->getDataSink(clientId); 01038 IO* clientOutput=NULL; 01039 01040 if(clientOut && (clientOutput=clientOut->getOutput())) { 01041 // flush the Rtp output buffer!!! 01042 clientOutput->setToFrameNumber(startFrame); 01043 } 01044 if(!remote) 01045 (*i)->play(clientId,prefetch); 01046 else 01047 (*i)->play(clientId,prefetch/2); 01048 IO* tmpInput=(*i)->getInput(); 01049 dprintf_full("ProxySession::play: got input %p\n",tmpInput); 01050 if(tmpInput->getState()==IO::OPEN && !tmpInput->setToFrameNumber(startFrame)) { 01051 dprintf_err("ProxySession::play Failed to set input to frame %i\r\n",startFrame); 01052 } 01053 else 01054 res=true; 01055 01056 // if not yet started -> start it 01057 if (!(*i)->running()) { 01058 (*i)->start(); 01059 } 01060 } 01061 else { 01062 startFrame=0; 01063 dprintf_err("RTSP::handleCMDPlay Missing ES for request? PANIC!\r\n"); 01064 } 01065 } 01066 } 01067 if(remote) 01068 strcpy(buffer,bufferBackup); 01069 else { 01070 if(res) 01071 prot->generateOk(); 01072 else 01073 prot->generateSessionNotFound(); 01074 01075 const char* buf=prot->getBuffer(); 01076 strcpy(buffer,buf); 01077 } 01078 prot->incSeqNr(); 01079 return res; 01080 } 01081 01082 bool ProxySession::pause(const Url* fileName, const char* remaining) 01083 { 01084 bool found=false; 01085 assert(fileName && remaining); 01086 int sessionId=prot->extractSessionKeyFromCMD(remaining); 01087 // if we can find an entry in sessiontranslator then we have to fwd the pause 01088 bool remote=(sessionTranslator.find(sessionId)!=sessionTranslator.end()); 01089 if(remote) { 01090 serverSession->getProtocol()->generatePause(url->toString(),sessionTranslator[sessionId]); 01091 const char* pauseForServer=serverSession->getProtocol()->getBuffer(); 01092 serverSession->sendResponse(pauseForServer,strlen(pauseForServer)); 01093 // get reply 01094 serverSession->readRequest(bufferBackup,MSG_BUFFER_SIZE); 01095 serverSession->getProtocol()->incSeqNr(); 01096 } 01097 if (sessionId > 0 && ((u32)sessionId)!=INVALID_SESSIONID) { 01098 // search for the session 01099 // start it 01100 for (list < DataChannel * >::iterator i = this->channels.begin(); 01101 i != this->channels.end(); i++) { 01102 if ((*i)->getSessionId() == (unsigned)sessionId) { 01103 found=true; 01104 (*i)->pause(clientId); 01105 01106 } 01107 } 01108 } 01109 if(!remote && !found) { 01110 prot->generateSessionNotFound(); 01111 dprintf_small("ProxySession::pause session %i not found\n",sessionId); 01112 strcpy(buffer,prot->getBuffer()); 01113 } 01114 else { 01115 if(RTSP::isCmdOk(bufferBackup)) 01116 found=true; 01117 strcpy(buffer,bufferBackup); 01118 } 01119 prot->incSeqNr(); 01120 return found; 01121 } 01122 01123 01124 bool ProxySession::tearDown(int sessionId, bool immediate, const Url* fileName, const char* remaining) 01125 { 01126 dprintf_full("ProxySession::tearDown(%i, %i)\r\n",sessionId,immediate); 01127 if(fileName && remaining) { 01128 sessionId=RTSP::extractSessionId(remaining); 01129 } 01130 // serverSession->tearDown(sessionId,immediate,fileName,remaining); 01131 if( ((u32)sessionId)==TEARDOWN_ALL) { 01132 // teardown all 01133 state = SESSION_CLOSED; 01134 list < DataChannel * >::iterator dcI; 01135 while (!channels.empty()) { 01136 dcI=channels.begin(); 01137 // pause the caching client 01138 (*dcI)->pause(0); 01139 DataSink* ds=(*dcI)->getDataSink(0); 01140 if(ds) { 01141 IO* dsOut=ds->getOutput(); 01142 if(dsOut) { 01143 ESInfo* es=(ESInfo*) (*dcI)->getESInfo(); 01144 01145 float completeStatePercent=es->getFrameStatistic()->getPercentageSetBits(); //expensive 01146 if(completeStatePercent>0.75) { 01147 es->setCompleteState(true); 01148 dprintf_full("ProxySession::teardown: stream %s is complete with %f%%\r\n",es->getInput(),completeStatePercent); 01149 } 01150 dprintf_full("ProxySession::teardown: stream %s is cached with %f%%\r\n",es->getInput(),completeStatePercent); 01151 es->getUsageCounter()->decrease(); 01152 } 01153 } 01154 (*dcI)->teardown(clientId,immediate); 01155 // check if we have to delete it, might be shared with other sessions 01156 if((*dcI)->getNumberOfActiveDataSinks()==0) { 01157 (*dcI)->wait(); // wait for thread to finish 01158 delete (*dcI); // decrease RefCounter 01159 } 01160 channels.pop_front(); 01161 } 01162 // generate reply cmd 01163 prot->generateOk(); 01164 strcpy(buffer,prot->getBuffer()); 01165 prot->incSeqNr(); 01166 return true; 01167 } 01168 else { 01169 // teardown one session 01170 list < DataChannel * >::iterator dcI; 01171 if (!channels.empty()) { 01172 dcI = channels.begin(); 01173 while (dcI != channels.end()) { 01174 if ((*dcI)->getSessionId()== (unsigned)sessionId) { 01175 (*dcI)->pause(0); 01176 DataSink* ds=(*dcI)->getDataSink(0); 01177 if(ds) { 01178 IO* dsOut=ds->getOutput(); 01179 if(dsOut) { 01180 ESInfo* es=(ESInfo*) (*dcI)->getESInfo(); 01181 01182 float completeStatePercent=es->getFrameStatistic()->getPercentageSetBits(); //expensive 01183 if(completeStatePercent>0.75) { 01184 es->setCompleteState(true); 01185 dprintf_full("ProxySession::teardown: stream %s is complete with %f%%\r\n",es->getInput(),completeStatePercent); 01186 } 01187 dprintf_full("ProxySession::teardown: stream %s is cached with %f%%\r\n",es->getInput(),completeStatePercent); 01188 es->getUsageCounter()->decrease(); 01189 } 01190 } 01191 (*dcI)->teardown(clientId,immediate); 01192 // check if we have to delete it, might be shared with other sessions 01193 if((*dcI)->getNumberOfActiveDataSinks()==0) { 01194 (*dcI)->wait(); // wait for thread to finish 01195 delete (*dcI); 01196 } 01197 channels.erase(dcI); 01198 if(channels.empty()) 01199 state = SESSION_CLOSED; 01200 prot->generateOk(); 01201 strcpy(buffer,prot->getBuffer()); 01202 prot->incSeqNr(); 01203 return true; 01204 } 01205 else { 01206 ++dcI; 01207 } 01208 } 01209 } 01210 01211 } 01212 // session was not found 01213 prot->generateSessionNotFound(); 01214 dprintf_small("ProxySession::teardown: session %i not found\n",sessionId); 01215 strcpy(buffer,prot->getBuffer()); 01216 prot->incSeqNr(); 01217 return false; 01218 }; 01219 01220 //is teardown on streams or sessions ??? 01221 01222 void ProxySession::run() 01223 { 01224 // while not end of session 01225 //check for control command 01226 // do control command 01227 state = SESSION_NEW; 01228 /* the buffer for incoming rtsp/http commands */ 01229 if(this->counter != NULL) 01230 this->counter->increment(); 01231 while (state!=SESSION_ERR && state!=SESSION_CLOSED) { 01232 buffer[0] = '\0'; 01233 bufferBackup[0]='\0'; 01234 dprintf_full("ProxySession::run: Listening for data on ControlChannel\r\n"); 01235 int bytesRead = readRequest(buffer, MSG_BUFFER_SIZE); 01236 strcpy(bufferBackup, buffer); 01237 dprintf_full("ProxySession::run: readBlock ended. read %i bytes\r\n",bytesRead); 01238 if (bytesRead == 0) { 01239 //no data read 01240 // the other side has closed the connection, teardown all 01241 dprintf_full("ProxySession::run: No data read from ControlChannel\r\n"); 01242 tearDown(TEARDOWN_ALL,true, NULL, NULL); 01243 } else if (bytesRead < 0) { 01244 // error: 01245 dprintf_err("ProxySession::run Error reading from controlchannel"); 01246 tearDown(TEARDOWN_ALL,true,NULL, NULL); 01247 state = SESSION_ERR; 01248 } else { // we got a command 01249 dprintf_full("ProxySession::run: got command\r\n"); 01250 // check for protocol if not yet set 01251 if (!prot) { 01252 if ( determineProtocol(buffer) == PROTO_UNKNOWN) { 01253 dprintf_err("ProxySession::run: Unknown protocol: %s\n", 01254 buffer); 01255 } 01256 } 01257 if (prot) { 01258 switch (prot->getID()) { 01259 case PROTO_RTSP: 01260 case PROTO_HTTP: 01261 if (bytesRead >= MSG_BUFFER_SIZE) { 01262 dprintf_err("received command exceeded buffer size"); 01263 break; 01264 } 01265 break; 01266 default: 01267 dprintf_err("ProxySession::run: Unknown protocol: %s\n",buffer); 01268 break; 01269 } 01270 01271 if (!prot->applyReqToSession(buffer, bytesRead, this)) { 01272 dprintf_err("ProxySession::run: Failed to parse request >%s<\n", buffer); 01273 } 01274 if(strlen(buffer)==0) { 01275 prot->generateInternalServerError(); 01276 strcpy(buffer,prot->getBuffer()); 01277 } 01278 sendResponse(buffer,strlen(buffer)); 01279 } 01280 } 01281 } // while valid 01282 if(this->counter != NULL) 01283 this->counter->decrement(); 01284 this->exit(); 01285 01286 }; 01287 01288 01289 void ProxySession::setUrl(const Url *uri, bool makeExactMatch) 01290 { 01291 bool insertResUsage=false; 01292 // problem: a 2nd describe arrives, don't allow that in the same session! 01293 if (cm && uri && !mp4Stream) { 01294 if (url) { 01295 delete url; 01296 } 01297 dprintf_full("ProxySession::setUrl: %s\r\n", uri->toString()); 01298 url = new Url(uri->toString()); 01299 cm->lockCache(); 01300 MetaObject* meta=cm->findMetaObject(url); 01301 01302 01303 bool exactMatch=false; 01304 if( (!termCap && !userPref)) { 01305 mp4Stream = cm->getVideo(url); 01306 01307 // calc resUsage 01308 VideoESInfo* vis=NULL; 01309 if(mp4Stream) 01310 vis=mp4Stream->getFirstVisualES(); 01311 01312 if(vis) { 01313 Feature src(vis); 01314 src.isCached=true; 01315 ResourceUsage* tmpRU=CostFunction::calcResourceUsage(&src,&src,admControl->getResourceLimit()); 01316 if(tmpRU) { 01317 resUsage.network=tmpRU->network; 01318 resUsage.disk=tmpRU->disk; 01319 resUsage.cpu=tmpRU->cpu; 01320 delete tmpRU; 01321 insertResUsage=true; 01322 } 01323 } 01324 insertMp4StreamIntoCM=false; 01325 dprintf_full("ProxySession::setUrl found video (no termcaps or userpref)\n"); 01326 } 01327 else if(meta && userPref) { 01328 // FIXME: hardcode delay values to 100ms 01329 dprintf_full("ProxySession::setUrl meta && userPref\n"); 01330 suggestedAdaptor=admControl->makeBestMatch(userPref,100,meta,&mp4Stream,&resUsage); 01331 dprintf_full("ProxySession::setUrl (N=%f,C=%f,D=%f)\n", 01332 resUsage.network,resUsage.cpu,resUsage.disk); 01333 01334 if(mp4Stream && suggestedAdaptor && resUsage.network>=0.0000001 && 01335 strcmp(suggestedAdaptor->getName(),"Forwarder")!=0 && 01336 !strstr(suggestedAdaptor->getName(),"Temporal")){ // transcoding 01337 // a hit with transcoding! 01338 origSrc=mp4Stream; 01339 mp4Stream=mp4Stream->clone(); 01340 insertMp4StreamIntoCM=true; 01341 01342 u64 durInMS=0; 01343 if(mp4Stream->getFirstVisualES()) 01344 durInMS=mp4Stream->getFirstVisualES()->getDurationInMs(); 01345 else if(mp4Stream->getFirstAudioES()) 01346 durInMS=mp4Stream->getFirstAudioES()->getDurationInMs(); 01347 u32 byteSize=0; 01348 if(userPref) { 01349 byteSize=userPref->bitrate.best/8*durInMS/1000; 01350 } 01351 assert (mp4StreamReserved == 0); 01352 if(cm->reserveDiskSpace(byteSize)) 01353 mp4StreamReserved+=byteSize; 01354 else { 01355 insertMp4StreamIntoCM=false; 01356 } 01357 prxStat.bytesHit+=mp4Stream->getFileSize(); 01358 prxStat.bytesRequested+=mp4Stream->getFileSize(); 01359 prxStat.objHits++; 01360 } 01361 else if(mp4Stream && resUsage.network>=0.0000001 ) { 01362 // never cache temporal transcoding or no transcoding 01363 origSrc=mp4Stream; 01364 insertMp4StreamIntoCM=false; 01365 mp4Stream=mp4Stream->clone(); 01366 this->delMp4Stream=true; 01367 prxStat.bytesHit+=mp4Stream->getFileSize(); 01368 prxStat.bytesRequested+=mp4Stream->getFileSize(); 01369 prxStat.objHits++; 01370 prxStat.qualHits++; 01371 } 01372 else if(mp4Stream) { 01373 // happens in MISS! prxStat.bytesRequested+=mp4Stream->getFileSize(); 01374 mp4Stream=NULL; 01375 } 01376 } 01377 else if(meta && termCap) { 01378 // bandwidth should be the minimum of network and decodeBandwidth 01379 // if one of them is zero, ignore the zero param 01380 u32 bandwidth=termCap->getNetworkCapacityInByte()*8; 01381 if(bandwidth==0) 01382 bandwidth=termCap->getMaxDecoderBitRateInBit(); 01383 else if(termCap->getMaxDecoderBitRateInBit() && 01384 termCap->getMaxDecoderBitRateInBit() < bandwidth) { 01385 bandwidth=termCap->getMaxDecoderBitRateInBit(); 01386 } 01387 mp4Stream=meta->findMp4Stream(0,termCap->getDisplayWidth(), 01388 0,termCap->getDisplayHeight(), 01389 0,bandwidth, 01390 (termCap->getColorDisplay()==0),(float)termCap->getDisplayRefreshRate() ,&exactMatch); 01391 01392 if(makeExactMatch && !exactMatch) { 01393 // clone the mp4Stream 01394 origSrc=mp4Stream; 01395 mp4Stream=mp4Stream->clone(); 01396 mp4Stream->getUsageCounter()->increase(); 01397 insertMp4StreamIntoCM=true; 01398 suggestedAdaptor=admControl->makeExactMatch(termCap->getDisplayWidth(), 01399 termCap->getDisplayHeight(), 01400 bandwidth, 01401 (termCap->getColorDisplay()==0), 01402 (float)termCap->getDisplayRefreshRate(), 01403 mp4Stream,&resUsage); 01404 // check if we have a TemporalAdaptor, do not cache results of temporaladaptor 01405 if(strstr(suggestedAdaptor->getName(),"Temporal") || 01406 strcmp(suggestedAdaptor->getName(),"Forwarder")) { 01407 insertMp4StreamIntoCM=false; 01408 delMp4Stream=true; 01409 } 01410 prxStat.bytesHit+=mp4Stream->getFileSize(); 01411 prxStat.bytesRequested+=mp4Stream->getFileSize(); 01412 prxStat.objHits++; 01413 } 01414 else { 01415 // calc the resource usage for the non transcoding case 01416 origSrc=mp4Stream; 01417 mp4Stream=mp4Stream->clone(); 01418 mp4Stream->getUsageCounter()->increase(); 01419 insertMp4StreamIntoCM=false; 01420 delMp4Stream=true; 01421 if(mp4Stream) { 01422 VideoESInfo* vis=mp4Stream->getFirstVisualES(); 01423 if(vis) { 01424 Feature src(vis); 01425 src.isCached=true; 01426 ResourceUsage* tmpRU=CostFunction::calcResourceUsage(&src,&src,admControl->getResourceLimit()); 01427 if(tmpRU) { 01428 resUsage.network=tmpRU->network; 01429 resUsage.disk=tmpRU->disk; 01430 resUsage.cpu=tmpRU->cpu; 01431 delete tmpRU; 01432 insertResUsage=true; 01433 } 01434 } 01435 } 01436 } 01437 } 01438 cm->unlockCache(); 01439 } 01440 // now insert the resource usage for those other cases (all non-transcoding!) 01441 if(insertResUsage) { 01442 if(!admControl->insertRequest(&resUsage)) { 01443 resUsage.cpu=resUsage.disk=resUsage.network=0.0; 01444 prxStat.bytesRequested+=mp4Stream->getFileSize(); 01445 if(delMp4Stream) { 01446 mp4Stream->getUsageCounter()->decrease(); 01447 delete mp4Stream; 01448 mp4Stream=NULL; 01449 delMp4Stream=false; 01450 } 01451 if(origSrc) 01452 origSrc->getUsageCounter()->decrease(); 01453 origSrc=NULL; 01454 insertMp4StreamIntoCM=false; 01455 if(suggestedAdaptor) 01456 delete suggestedAdaptor; 01457 suggestedAdaptor=NULL; 01458 } 01459 else { 01460 prxStat.bytesHit+=mp4Stream->getFileSize(); 01461 prxStat.bytesRequested+=mp4Stream->getFileSize(); 01462 prxStat.objHits++; 01463 prxStat.qualHits++; 01464 } 01465 } 01466 if(!mp4Stream) 01467 prxStat.objMiss++; 01468 if(suggestedAdaptor && mp4Stream) 01469 suggestedAdaptor->setESInfo(mp4Stream->getFirstVisualES()); 01470 }; 01471 01472 01473 bool ProxySession::establishConnectionToServer() 01474 { 01475 // get the url 01476 // extract the ip+port 01477 // connect and set the sessionControlChannel 01478 // extract the url 01479 assert(url!=NULL); 01480 dprintf_full("ProxySession::establishConnectionToServer: url %s\n",url->toString()); 01481 const char* ip=url->getServerName(); 01482 01483 if(!url || !ip || !prot) { 01484 dprintf_err("ProxySession::establishConnectionToServer() failed\r\n"); 01485 state=SESSION_ERR; 01486 return false; 01487 } 01488 01489 int port=-1; 01490 char *realIP = NULL; 01491 struct hostent *ipAddr; 01492 01493 /* 01494 //depricated gethostbyname 01495 ipAddr=gethostbyname(ip); 01496 if (!ipAddr) { 01497 dprintf_err("ProxySession::establishConnectionToServer: DNS resolution failed for host %s\r\n",ip); 01498 state=SESSION_ERR; 01499 return false; 01500 } else { 01501 realIP = new char[strlen(inet_ntoa( *((struct in_addr*)ipAddr->h_addr)))+1]; 01502 strcpy(realIP, inet_ntoa( *((struct in_addr*)ipAddr->h_addr))); 01503 dprintf_full("ProxySession::establishConnectionToServer: DNS returned %s\r\n", realIP); 01504 } 01505 */ 01506 01507 struct addrinfo ai, *res_ai=NULL; 01508 int ret =0; 01509 01510 memset(&ai,0,sizeof(struct addrinfo)); 01511 ai.ai_flags = AI_CANONNAME; 01512 ai.ai_family = PF_UNSPEC; 01513 ai.ai_socktype = 0; 01514 ai.ai_protocol = 0; 01515 dprintf_full("ProxySession::establishConnectionToServer: getaddrinfo\n"); 01516 if ((ret = getaddrinfo(ip, NULL, &ai, &res_ai)) < 0) 01517 { 01518 dprintf_err("ERROR %s\n",gai_strerror(ret)); 01519 ::exit(1); 01520 } 01521 assert(res_ai->ai_canonname); 01522 realIP = new char[strlen(res_ai->ai_canonname)+1]; 01523 strcpy(realIP, res_ai->ai_canonname); 01524 freeaddrinfo(res_ai); 01525 assert((ipAddr=gethostbyname(ip))); 01526 dprintf_full("ProxySession::establishConnectionToServer: real server name: #%s#\n",ip); 01527 01528 if(url->hasPort()) 01529 port=url->getPort(); 01530 else 01531 port=prot->getDefaultPort(); 01532 01533 int socketToServer=::socket(AF_INET,SOCK_STREAM,0); 01534 if(socketToServer==-1) { 01535 dprintf_full("ProxySession::establishConnectionToServer: Couldn't create socket\r\n"); 01536 state=SESSION_ERR; 01537 return false; 01538 } 01539 01540 if(!server) 01541 server=new sockaddr_in(); 01542 01543 server->sin_family=AF_INET; 01544 // server->sin_port=htons(((u16)port)); 01545 server->sin_port=htons(port); 01546 server->sin_addr.s_addr=inet_addr(ipAddr->h_addr); 01547 memset(&(server->sin_zero),'\0',8); 01548 bcopy(ipAddr->h_addr, (char *)&server->sin_addr, ipAddr->h_length); 01549 // bcopy(ipAddr->h_addr, realIP, strlen(realIP)); 01550 01551 if(::connect(socketToServer,(struct sockaddr*)server,sizeof(struct sockaddr)) < 0) { 01552 dprintf_full("ProxySession::establishConnectionToServer: Failed to connect to orig. server %s\r\n",realIP); 01553 state=SESSION_ERR; 01554 ::exit(2); 01555 return false; 01556 } 01557 01558 dprintf_full("ProxySession::establishConnectionToServer: Could connect to server\r\n"); 01559 01560 01561 if(serverSession) 01562 delete serverSession; 01563 serverSession=new ServerSession(socketToServer,NULL,cm,admControl,rtxEnabled); 01564 dprintf_full("ProxySession::establishConnectionToServer: Successfully created serverSession to %s\n",ip); 01565 return true; 01566 }; 01567 01568 bool ProxySession::videoFullyCached(ContainerInfo* stream) const 01569 { 01570 if(!stream) 01571 return false; 01572 if(stream->getESList()->empty()) 01573 return false; 01574 01575 for (list < ESInfo * >::iterator i = stream->getESList()->begin(); i != stream->getESList()->end();++i) { 01576 if(!esFullyCached(*i)) 01577 return false; 01578 } 01579 return true; 01580 }; 01581 01582 bool ProxySession::esFullyCached(const ESInfo* es) const 01583 { 01584 if(!es) 01585 return false; 01586 01587 const char* esInput=NULL; 01588 FILE* esFP=NULL; 01589 bool ret=false; 01590 esInput=es->getInput(); 01591 ret=(esInput && (esFP=fopen(esInput,"rb")) && es->getCompleteState()); 01592 if(esFP) 01593 fclose(esFP); 01594 return ret; 01595 } 01596