Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members
ServerSession.cpp
00001 /*********************************************************************** 00002 * * 00003 * ViTooKi * 00004 * * 00005 * title: ServerSession.cpp * 00006 * * 00007 * * 00008 * * 00009 * ITEC institute of the University of Klagenfurt (Austria) * 00010 * http://www.itec.uni-klu.ac.at * 00011 * * 00012 * * 00013 * For more information visit the ViTooKi homepage: * 00014 * http://ViTooKi.sourceforge.net * 00015 * vitooki-user@lists.sourceforge.net * 00016 * vitooki-devel@lists.sourceforge.net * 00017 * * 00018 * This file is part of ViTooKi, a free video toolkit. * 00019 * ViTooKi is free software; you can redistribute it and/or * 00020 * modify it under the terms of the GNU General Public License * 00021 * as published by the Free Software Foundation; either version 2 * 00022 * of the License, or (at your option) any later version. * 00023 * * 00024 * This program is distributed in the hope that it will be useful, * 00025 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00026 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 00027 * GNU General Public License for more details. * 00028 * * 00029 * You should have received a copy of the GNU General Public License * 00030 * along with this program; if not, write to the Free Software * 00031 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, * 00032 * MA 02111-1307, USA. * 00033 * * 00034 ***********************************************************************/ 00035 00036 /*********************************************************************** 00037 * * 00038 * REVISION HISTORY: * 00039 * * 00040 * * 00041 * * 00042 ***********************************************************************/ 00043 #ifndef WIN32 00044 #include <sys/select.h> 00045 #endif 00046 #include "ServerSession.hpp" 00047 #include "RTSP.hpp" 00048 #include "DataChannel.hpp" 00049 #include "DataSink.hpp" 00050 #include "IO.hpp" 00051 #include "RFC3016.hpp" 00052 #include "RFC2250.hpp" 00053 #include "BitField.hpp" 00054 #include "Statistics.hpp" 00055 00056 #include "cache/MetaObject.hpp" 00057 #include "cache/AdmissionControl.hpp" 00058 #include "io/Rtp.hpp" 00059 #include "io/SimpleRtp.hpp" 00060 #include "io/IOCreator.hpp" 00061 #include "io/FilteredIO.hpp" 00062 #include "io/MPGStreamIO.hpp" 00063 #include "metadata/MP21.hpp" 00064 #include "adaptors/TemporalAdaptor.hpp" 00065 #include "adaptors/SharedAdaptor.hpp" 00066 #include "container/Video4LinuxContainerFile.hpp" 00067 00068 00069 /***********************************************************************/ 00070 ServerSession::ServerSession(int socket, const struct sockaddr_in *clint, 00071 CacheManager * c, AdmissionControl* am, 00072 bool enableRTX,SessionCounter* counter,TerminalCapabilities* tc):Session(),enableRtx(enableRTX) 00073 { 00074 assert(c); 00075 assert(am); 00076 this->counter = counter; 00077 termCap=tc; 00078 admControl=am; 00079 state = SESSION_CLOSED; 00080 cm = c; 00081 if (clint) { 00082 client = new sockaddr_in(); 00083 memcpy(client, clint, sizeof(sockaddr_in)); 00084 clientId=(u32)client->sin_addr.s_addr; 00085 } else { 00086 client = NULL; 00087 clientId=0; 00088 } 00089 00090 sessionControlChannel=socket; 00091 sockaddr_in localaddr; 00092 canAdmitRequest=true; 00093 00094 #ifdef WIN32 00095 int slen = sizeof(localaddr); 00096 #else /* */ 00097 socklen_t slen = sizeof(localaddr); 00098 #endif /* */ 00099 00100 memset(&localaddr, 0, sizeof(localaddr)); 00101 getsockname(socket, (sockaddr *) & localaddr, &slen); 00102 const char* tmp=inet_ntoa(localaddr.sin_addr); 00103 localAddress=new char[strlen(tmp)+1]; 00104 strcpy(localAddress,tmp); 00105 prot = NULL; 00106 url = NULL; 00107 00108 userPref=NULL; 00109 suggestedAdaptor=NULL; 00110 // clientObj=NULL; 00111 mp4Stream=NULL; 00112 playerId=NULL; 00113 serverID=NULL; 00114 00115 gtRefCount = new ReferenceCounter(); 00116 globalTimer = new GlobalTimer(); 00117 //this will prevent deletion on DataChannel closes: 00118 gtSharedAdapt = new SharedAdaptor(gtRefCount, (Adaptor*&)globalTimer); 00119 resUsage.cpu=resUsage.disk=resUsage.network=0.0; 00120 } 00121 00122 /***********************************************************************/ 00123 ServerSession::~ServerSession() 00124 { 00125 admControl->freeRequest(&resUsage); 00126 #ifdef WIN32 00127 closesocket(sessionControlChannel); 00128 #else 00129 ::close(sessionControlChannel); 00130 #endif 00131 if(mp4Stream) 00132 delete mp4Stream; 00133 00134 if (prot) { 00135 delete prot; 00136 prot = NULL; 00137 } 00138 if (client) { 00139 delete client; 00140 client = NULL; 00141 } 00142 if (url) { 00143 delete url; 00144 url = NULL; 00145 } 00146 if(localAddress) 00147 delete localAddress; 00148 00149 if(playerId) 00150 delete playerId; 00151 if(serverID) 00152 delete serverID; 00153 list < DataChannel * >::iterator i; 00154 while(!channels.empty()) { 00155 i=channels.begin(); 00156 (*i)->teardown(TEARDOWN_ALL,true); 00157 delete (*i); 00158 channels.pop_front(); 00159 } 00160 list < rtx_group* >::iterator j; 00161 while(!rtxInfo.empty()) { 00162 j=rtxInfo.begin(); 00163 delete *j; 00164 rtxInfo.pop_front(); 00165 } 00166 if(termCap) 00167 delete termCap; 00168 if(userPref) 00169 delete userPref; 00170 00171 delete gtSharedAdapt; //this will also del gtRefCount and globalTimer 00172 00173 } 00174 00175 /***********************************************************************/ 00176 bool ServerSession::options(const Url* fileName, const char* remaining) 00177 { 00178 prot->incSeqNr(); 00179 return true; 00180 } 00181 00182 00183 /***********************************************************************/ 00184 bool ServerSession::connect(const Url* fileName, const char* remaining) 00185 { 00186 assert(fileName && remaining); 00187 // keep existing termcap when invalid ones are found 00188 TerminalCapabilities* dummyTermCap=MP21::createTerminalCapability(remaining); 00189 if(dummyTermCap) { 00190 // overwrite the default with the new ones 00191 dprintf_full("ServerSession::connect Found TermCaps\n"); 00192 if(termCap) 00193 delete termCap; 00194 // if termcaps are specified, they take precedence over default UserPrefs! 00195 if(userPref) 00196 delete userPref; 00197 userPref=NULL; 00198 } 00199 if(dummyTermCap) 00200 termCap=dummyTermCap; 00201 00202 if(!termCap) { 00203 // maybe UserPreferences? 00204 // keep existing UserPref 00205 UserPreferences* upDummy=MP21::createUserPreferences(remaining); 00206 dprintf_full("ServerSession::connect Found UserPreferences %p in XX%sXX\n",upDummy,remaining); 00207 00208 if(userPref && upDummy) 00209 delete userPref; 00210 if(upDummy) { 00211 userPref=upDummy; 00212 } 00213 } 00214 setUrl(fileName); //find perfect fitting mp4stream 00215 bool ret=prot->parseCMDDescribe(this, fileName, remaining,&playerId); 00216 if(!playerId) { 00217 // HACK: assume it's the stupid Real player 00218 playerId=new char[strlen(REAL_PLAYER_ID)+1]; 00219 strcpy(playerId,REAL_PLAYER_ID); 00220 } 00221 dprintf_full("ServerSession::connect Playerid \"%s\"\n",playerId); 00222 00223 00224 /*if(resUsage.network < 0.000001) { 00225 canAdmitRequest=false; 00226 state=SESSION_ERR; 00227 } 00228 */ 00229 00230 if (this->getContainerInfo() == NULL) 00231 printf("ServerSession::connect : vontinfo NULL!!!\n"); 00232 if(canAdmitRequest) 00233 ret=prot->generateCMDDescribeReply(this,fileName,localAddress,playerId); 00234 else {// only way to indicate a server overload is with notenoughbandwith 00235 dprintf_full("ServerSession::connect NOT ENOUGH BANDWIDTH!!!\n"); 00236 prot->generateNotEnoughBandwidth(); 00237 ret=true; 00238 } 00239 00240 00241 const char* buf=prot->getBuffer(); 00242 dprintf_full("ServerSession::connect %s\n",buf); 00243 strcpy(buffer,buf); 00244 prot->incSeqNr(); 00245 return ret; 00246 }; 00247 00248 00249 /***********************************************************************/ 00250 bool ServerSession::setup(const Url* fileName, const char* remaining) 00251 { 00252 assert(fileName && remaining); 00253 dprintf_full("ServerSession::setup %s\n",url->toString()); 00254 bool hasSessionKey=false,hasRtxPort=false; 00255 char* protocol=NULL; 00256 char* unicast=NULL; 00257 char* clientAddress=NULL; 00258 00259 int sesKey=0,esId=0,rPort=0,rPort2=0,lPort=0; 00260 u32 clientRtxPort=0, serverRtxPort=0; 00261 00262 bool err=!prot->parseCMDSetup(this,fileName,remaining,&hasSessionKey,&sesKey,&protocol, 00263 &unicast,&esId,&rPort,&rPort2,&hasRtxPort,&clientRtxPort, 00264 &clientAddress); 00265 hasRtxPort&=enableRtx; //disable rtx if it is switched off 00266 if(!enableRtx) 00267 clientRtxPort=0; 00268 00269 if(!err) { 00270 if(!prot->isSupportedTransport(protocol)) { 00271 prot->generateIllegalTransport(); 00272 err=true; 00273 } 00274 } 00275 else { 00276 prot->generateBadCMD(); 00277 } 00278 00279 if(err) { 00280 dprintf_err("ServerSession::setup error in parsing setup request\n"); 00281 const char* buf=prot->getBuffer(); 00282 strcpy(buffer,buf); 00283 if(protocol) 00284 delete protocol; 00285 if(unicast) 00286 delete unicast; 00287 prot->incSeqNr(); 00288 return false; 00289 } 00290 if(hasSessionKey==false) { 00291 sesKey=prot->generateSessionKey(); 00292 } 00293 lPort=portGen.getNextFreePortPair(); 00294 if(hasRtxPort && clientRtxPort>0) { 00295 serverRtxPort=portGen.getNextFreePortPair(); 00296 } 00297 00298 // now that we have all the info, do the following: 00299 // create an RTP with RFC3016 output channel 00300 // create DataChannel 00301 // open RTP session 00302 00303 //char *clientAddress = inet_ntoa(client->sin_addr); // points to a static array in inet.. 00304 if (clientAddress == NULL) clientAddress = inet_ntoa(client->sin_addr); 00305 00306 ESInfo* es=NULL; 00307 IO* input=NULL; 00308 00309 if(mp4Stream) 00310 es=mp4Stream->getES(esId); 00311 if(es) { 00312 input=IOCreator::createInput(es, false); 00313 } 00314 if (!(this->mp4Stream!=NULL && es!=NULL) || input==NULL) { 00315 prot->generateFileNotFound(); 00316 portGen.closePortPair(lPort); 00317 if(hasRtxPort && clientRtxPort>0) { 00318 portGen.closePortPair(serverRtxPort); 00319 } 00320 prot->incSeqNr(); 00321 00322 return false; 00323 } 00324 00325 PacketizationLayer * p; 00326 if (es->isAudioStream()) 00327 p = new RFC2250(); 00328 else 00329 p = new RFC3016(); 00330 00331 // no probs with inet_ntoa: rtp deep-copies the clientaddress 00332 IO* output=NULL; 00333 Statistics *outStats=NULL; 00334 if(suggestedAdaptor){ 00335 suggestedAdaptor->setESInfo(es); 00336 suggestedAdaptor->initialize(); 00337 } 00338 00339 // only use Rtp when retransmission is requested, otherwise SimpleRtp is cheaper 00340 // if(hasRtxPort && clientRtxPort>0) { 00341 if(enableRtx) { 00342 outStats = new Statistics(es,true); 00343 dprintf_full("ServerSession::setup: created RTP with retransmission\n"); 00344 output = new Rtp(fileName->getPath(), rPort, clientAddress, lPort, 00345 es, p, true, termCap, globalTimer, outStats, false); 00346 } 00347 else { 00348 #ifndef WINCE 00349 dprintf_full("ServerSession::setup: created SimpleRtp without retransmission\n"); 00350 char logName[MAX_STR_LEN]; 00351 struct timeval t; 00352 gettimeofday(&t,NULL); 00353 sprintf(logName,"log%10i.%3i_server.out",(int)t.tv_sec,(int)(t.tv_usec/1000)); 00354 output=new SimpleRtp(fileName->getPath(), rPort, lPort, 00355 clientAddress, es, p, true,logName,true,SimpleRtp::MIXED_CTS_BITRATE); 00356 #else 00357 dprintf_err("\nServersession::setup() RTX MUST BE ENABLED !!!"); 00358 #endif 00359 } 00360 /* 00361 if( playerId && suggestedAdaptor==NULL && es && es->isVisualStream() && 00362 strstr(playerId, QT_PLAYER_ID) && 00363 strstr(playerId, OLD_BFRAMES_DISABLED_QT_PLAYER) ) { 00364 // adaptor changes esInfo!!! no prob here, we already have made a clone in setUrl 00365 suggestedAdaptor=new TemporalAdaptor((VideoESInfo*)es); 00366 } 00367 */ 00368 DataSink* clientObj = new DataSink((u32)client->sin_addr.s_addr,output, clientAddress, 00369 ntohl(client->sin_port), 00370 this->suggestedAdaptor); 00371 00372 suggestedAdaptor=NULL; //unlink Adaptor, will be freed in DataSink 00373 this->prot->generateSetupReply(this,fileName,sesKey,protocol,unicast,clientAddress, 00374 esId,rPort,rPort2,lPort,clientRtxPort,serverRtxPort); 00375 const char* buf=prot->getBuffer(); 00376 strcpy(buffer,buf); 00377 00378 rtx_group* group=getRtxGroup(esId); 00379 if(group && group->rtx) { 00380 group->rtx->remotePort=clientRtxPort; 00381 group->rtx->localPort=serverRtxPort; 00382 output->setRtxInfo(group->rtx); 00383 } 00384 else { 00385 dprintf_err("ServerSession::setup: Session has stored %i groups \n",rtxInfo.size()); 00386 dprintf_err("ServerSession::setup: Failed to get rtx_port for es %i\n", esId); 00387 } 00388 00389 // now create the DataChannel and set the outstats of the DataSink IO (RTP) 00390 DataChannel * d = new DataChannel(input, es); 00391 d->setOutputStatistics(outStats); 00392 00393 d->setSessionId(sesKey); 00394 d->insert(clientObj); // add the client to the DC 00395 // set the DataChannel at the Session 00396 setDataChannel(d); 00397 if(protocol) 00398 delete protocol; 00399 if(unicast) 00400 delete unicast; 00401 00402 state = SESSION_INITIALIZED; 00403 prot->incSeqNr(); 00404 return true; 00405 } 00406 00407 /***********************************************************************/ 00408 bool ServerSession::startDCs(int sessionId, double startTime, double endTime, 00409 double prefetch, bool backwardsClosestIFrame) { 00410 u32 startFrame=0, endFrame=0; 00411 bool res = false; 00412 long ivop_align = -9999; 00413 00414 if(endTime <= 0.00001) { 00415 endTime = mp4Stream->getFirstVisualES()->getDurationInMs() / 1000.0; 00416 } 00417 00418 const ESInfo* es=NULL; 00419 for (list < DataChannel * >::iterator i = this->channels.begin(); i != channels.end(); i++) { 00420 dprintf_full("ServerSession::startDCs: inspecting ES with session %u\n", (*i)->getSessionId()); 00421 if ((sessionId < 0) || ((*i)->getSessionId() == (u32)sessionId)) { 00422 es=(*i)->getESInfo(); 00423 00424 if(es) { 00425 startFrame=es->getFrameNumber(startTime); 00426 endFrame=es->getFrameNumber(endTime); 00427 dprintf_full("ServerSession::startDCs startFrame %u (@sec %6.3f) endFrame %u (@sec %6.3f)\n", 00428 startFrame,startTime,endFrame,endTime); 00429 00430 IO* input=(*i)->getInput(); 00431 //input->setToFrameNumber(startFrame); 00432 dprintf_full("ServerSession::startDCs find real I-VOP\n"); 00433 if( //input->getState()==IO::OPEN && 00434 ( -1 == (ivop_align = input->setToClosestIFrame(startFrame, backwardsClosestIFrame))) ) { 00435 dprintf_err("ServerSession::startDCs Failed to set input to frame %i (state %i)\r\n", 00436 startFrame,input->getState()); 00437 } 00438 dprintf_full("ServerSession::startDCs real I-VOP startFrame %li \n",ivop_align); 00439 input->setEndFrameNumber(endFrame); 00440 DataSink* clientOut=(*i)->getDataSink(clientId); 00441 IO* clientOutput=NULL; 00442 if(clientOut && (clientOutput=clientOut->getOutput())) { 00443 // flush the Rtp output buffer !!! 00444 clientOutput->setToFrameNumber(ivop_align); //new startFrame 00445 } 00446 (*i)->play(clientId,prefetch); 00447 00448 00449 // if not yet started -> start it 00450 if (!(*i)->running() || (*i)->finished() ) { 00451 dprintf_full("ServerSession::startDCs starting inputIO thread\n"); 00452 (*i)->start(); 00453 } 00454 res=true; 00455 } 00456 else { 00457 startFrame=0; 00458 dprintf_err("ServerSession::startDCs: RTSP::handleCMDPlay Missing ES for request? PANIC!\r\n"); 00459 } 00460 } 00461 } 00462 return res; 00463 } 00464 00465 /***********************************************************************/ 00466 bool ServerSession::play(const Url* fileName, const char* remaining) 00467 { 00468 int sessionId; 00469 00470 assert(fileName && remaining); 00471 00472 double startTime=0.0, endTime=0.0, prefetch=0.0; 00473 if(!prot->parseCMDPlay(this,fileName,remaining,&sessionId,&startTime,&endTime,&prefetch)) { 00474 dprintf_err("ServerSession::play RTSP::parseCmdPlay failed\n"); 00475 prot->generateSessionNotFound(); 00476 const char* buf=prot->getBuffer(); 00477 strcpy(buffer,buf); 00478 prot->incSeqNr(); 00479 return false; 00480 } 00481 00482 //start up all DCs with their inputs and outputs, backwardsClosestIFrame 00483 dprintf_err("ServerSession::play START OK\n"); 00484 bool res = startDCs(sessionId, startTime, endTime, prefetch, true ); 00485 if(res) 00486 prot->generateOk(); 00487 else { 00488 dprintf_full("ServerSession::play didn't find any ES for the sessionId\n"); 00489 prot->generateSessionNotFound(); 00490 } 00491 const char* buf=prot->getBuffer(); 00492 strcpy(buffer,buf); 00493 00494 state = SESSION_ACTIVE; 00495 prot->incSeqNr(); 00496 return res; 00497 }; 00498 00499 00500 /***********************************************************************/ 00501 bool ServerSession::pause(const Url* fileName, const char* remaining) 00502 { 00503 bool found=false; 00504 assert(fileName && remaining); 00505 int sessionId=prot->extractSessionKeyFromCMD(remaining); 00506 00507 if (sessionId > 0 && (unsigned)sessionId!=INVALID_SESSIONID) { 00508 // search for the session 00509 // start it 00510 for (list < DataChannel * >::iterator i = this->channels.begin(); 00511 i != this->channels.end(); i++) { 00512 if ((*i)->getSessionId() == (u32)sessionId) { 00513 found=true; 00514 (*i)->pause(clientId); 00515 } 00516 } 00517 } 00518 else 00519 prot->generateSessionNotFound(); 00520 00521 if(found) 00522 prot->generateOk(); 00523 const char* buf=prot->getBuffer(); 00524 strcpy(buffer,buf); 00525 00526 state = SESSION_PAUSED; 00527 prot->incSeqNr(); 00528 return found; 00529 } 00530 00531 00532 /***********************************************************************/ 00533 bool ServerSession::tearDown(int sessionId,bool immediate, const Url* fileName, const char* remaining) 00534 { 00535 dprintf_full("ServerSession::tearDown(%i, %i)\r\n",sessionId,immediate); 00536 if(fileName && remaining) { 00537 sessionId=RTSP::extractSessionId(remaining); 00538 } 00539 if((u32)sessionId==TEARDOWN_ALL) { 00540 // teardown all 00541 state = SESSION_CLOSED; 00542 list < DataChannel * >::iterator dcI; 00543 while (!channels.empty()) { 00544 dcI=channels.begin(); 00545 (*dcI)->teardown(clientId,immediate); 00546 // check if we have to delete it, might be shared with other sessions 00547 if((*dcI)->getNumberOfActiveDataSinks()==0) { 00548 (*dcI)->wait(); // wait for thread to finish 00549 delete (*dcI); 00550 } 00551 channels.pop_front(); 00552 } 00553 if(prot) { // can happen that no prot is set when teardown called first 00554 prot->generateOk(); 00555 const char* buf=prot->getBuffer(); 00556 strcpy(buffer,buf); 00557 prot->incSeqNr(); 00558 return true; 00559 } 00560 else 00561 return false; 00562 } 00563 else { 00564 // teardown one session 00565 bool sesFound=false; 00566 list < DataChannel * >::iterator dcI; 00567 if (!channels.empty()) { 00568 dcI = channels.begin(); 00569 while (dcI != channels.end()) { 00570 if ((*dcI)->getSessionId()== (unsigned)sessionId) { 00571 sesFound=true; 00572 (*dcI)->teardown(clientId,immediate); 00573 // check if we have to delete it, might be shared with other sessions 00574 if((*dcI)->getNumberOfActiveDataSinks()==0) { 00575 (*dcI)->wait(); // wait for thread to finish 00576 delete (*dcI); 00577 } 00578 dcI = channels.erase(dcI); 00579 if(channels.empty()) { 00580 state = SESSION_CLOSED; 00581 prot->generateOk(); 00582 const char* buf=prot->getBuffer(); 00583 strcpy(buffer,buf); 00584 prot->incSeqNr(); 00585 return true; 00586 } 00587 } else 00588 ++dcI; 00589 } 00590 } 00591 if(sesFound) { 00592 prot->generateOk(); 00593 } 00594 else { 00595 prot->generateSessionNotFound(); 00596 } 00597 const char* buf=prot->getBuffer(); 00598 strcpy(buffer,buf); 00599 prot->incSeqNr(); 00600 return sesFound; 00601 } 00602 prot->generateSessionNotFound(); 00603 const char* buf=prot->getBuffer(); 00604 strcpy(buffer,buf); 00605 prot->incSeqNr(); 00606 return false; 00607 }; 00608 00609 //is teardown on streams or sessions ??? 00610 00611 00612 /***********************************************************************/ 00613 void ServerSession::run() 00614 { 00615 fd_set rfds; 00616 struct timeval tv; 00617 if(this->counter != NULL) 00618 this->counter->increment(); 00619 int retval; 00620 int lastSec = 1; 00621 int lastSwitchSec = -99; 00622 00623 // while not end of session 00624 //check for control command 00625 // do control command 00626 state = SESSION_NEW; 00627 /* the buffer for incoming rtsp/http commands */ 00628 char *inbuffer = new char[MSG_BUFFER_SIZE + 1]; 00629 dprintf_full("ServerSession::run() SESSION START\n"); 00630 dprintf_full("ServerSession::run: Listening for data on ControlChannel\r\n"); 00631 while (state!=SESSION_ERR && state!=SESSION_CLOSED) { 00632 inbuffer[0] = 0; 00633 FD_ZERO(&rfds); 00634 FD_SET(sessionControlChannel, &rfds); 00635 tv.tv_sec = 0; 00636 tv.tv_usec = 40 * 1000; //microsecs 00637 retval = select(sessionControlChannel+1, &rfds, NULL, NULL, &tv); 00638 00639 if (retval == -1) { 00640 dprintf_err("select()"); 00641 ::exit(1); 00642 }else if (retval) { 00643 int bytesRead = readRequest(inbuffer, MSG_BUFFER_SIZE); 00644 00645 dprintf_full("ServerSession::run: readBlock ended. read %i bytes\r\n",bytesRead); 00646 if (bytesRead == 0) { 00647 //no data read 00648 // the other side has closed the connection, teardown all 00649 dprintf_full("ServerSession::run: No data read from ControlChannel\r\n"); 00650 tearDown(TEARDOWN_ALL,true,NULL,NULL); 00651 } else if (bytesRead < 0) { 00652 // error: 00653 dprintf_err("ServerSession::run Error reading from controlchannel"); 00654 tearDown(TEARDOWN_ALL,true,NULL,NULL); 00655 state = SESSION_ERR; 00656 } else { // we got a command 00657 dprintf_full("ServerSession::run: got command\r\n"); 00658 // check for protocol if not yet set 00659 if (!prot) { 00660 if (determineProtocol(inbuffer) == PROTO_UNKNOWN) { 00661 dprintf_err("ServerSession::run: Unknown protocol: %s\n", 00662 inbuffer); 00663 } 00664 } 00665 if (prot) { 00666 switch (prot->getID()) { 00667 case PROTO_RTSP: 00668 case PROTO_HTTP: 00669 if (bytesRead >= MSG_BUFFER_SIZE) { 00670 dprintf_err("received command exceeded buffer size"); 00671 break; 00672 } 00673 break; 00674 default: 00675 dprintf_err("ServerSession::run: Unknown protocol: %s\n",inbuffer); 00676 break; 00677 } 00678 00679 if (!prot->applyReqToSession(inbuffer, bytesRead, this)) { 00680 dprintf_err("ServerSession::run: Failed to parse request >%s<\n", inbuffer); 00681 } 00682 else { 00683 // check if we have options 00684 if(strncmp("OPTIONS", inbuffer,strlen("OPTIONS"))==0) { 00685 const char* buf=prot->getBuffer(); 00686 strcpy(buffer,buf); 00687 } 00688 } 00689 sendResponse(buffer,strlen(buffer)); 00690 } 00691 } 00692 } else if ((url != NULL && (strcmp(url->getFileName(), "v4l")) !=0)) 00693 if((state == SESSION_ACTIVE) && (cm->findMetaObject(url)->getNumberOfVariations() > 1)) { 00694 //check for SWITCHING only if PLAYING and there is a choice for other streams 00695 00696 int nowSec = (int)ceil(globalTimer->getActualSec()); 00697 if (nowSec > lastSec) { //new streamout sec 00698 lastSec = nowSec; 00699 int sumNetBW = 0; 00700 bool doSwitch=false; 00701 for (list < DataChannel * >::iterator dc = channels.begin(); 00702 dc != channels.end(); dc++) { 00703 //FIXME: completely ignores SessionIDs, resets all DCs 00704 //FIXME: outStats is only from first DataChannel, but sync'd by globalTimer 00705 if (!(*dc)->tearDownFlagSet()) { 00706 Statistics *outStats = (*dc)->getOutputStatistics(); 00707 if (outStats && (3 * (*dc)->getESInfo()->getMediaTimeScale() + outStats->getHighestPacketTS() 00708 < (*dc)->getESInfo()->getDuration() ) ) { //FIXME: global nowSec! 00709 // stream will continue to run for at least 3 more seconds! 00710 double realBWfact = (double)outStats->getBWfromStreamoutSec(nowSec-2)/(double)outStats->getStreamBW(); 00711 dprintf_full("ServerSession::run now %6.3f sec ES-Id %llu Pkt Loss %4.2f %% netBW %li kbps streamBW %i kbps realBWfact %2.2f VSec %6.3f (of %6.3f) adaptRate %i %%\n", 00712 globalTimer->getActualSec(), (*dc)->getESInfo()->getStreamId(), 00713 outStats->getStreamoutSecLossPercent(nowSec-2), 00714 outStats->getBWfromStreamoutSec(nowSec-2)*8 / 1024, 00715 outStats->getStreamBW()*8 / 1024, 00716 realBWfact, 00717 (double)outStats->getHighestPacketTS() / (*dc)->getESInfo()->getMediaTimeScale(), 00718 (double)(*dc)->getESInfo()->getDuration() / (double)(*dc)->getESInfo()->getMediaTimeScale(), 00719 outStats->getAdaptRate()); 00720 00721 // too much adaptation, switch down 00722 if ((outStats->getAdaptRate() < 70) && (outStats->getAdaptSecs() >= 1.0)) { 00723 doSwitch = true; 00724 sumNetBW += (int)((double)outStats->getStreamBW() * 0.7); 00725 // sumNetBW += (int)((double)outStats->getBWfromStreamoutSec(nowSec-2) * 1.15); //add adapt range 00726 dprintf_full("ServerSession::run DECIDED ON STREAM SWITCHING : DOWN for adapt\n"); 00727 } else 00728 00729 //buffer low, so force even lower quality stream! 00730 if ((nowSec - lastSwitchSec > 5) && (outStats->getAdaptSecs() > 1.0)) { 00731 doSwitch = true; 00732 sumNetBW += (int)((double)outStats->getStreamBW() * 0.7); 00733 dprintf_full("ServerSession::run DECIDED ON STREAM SWITCHING : DOWN to fill buf\n"); 00734 } else 00735 00736 00737 //wait at least 5 secs, then if no adaptation and super-high BW: SWITCH UP! 00738 if ((nowSec - lastSwitchSec > 15) && (outStats->getAdaptRate() >= 100) 00739 && (outStats->getAdaptSecs() <= 1.0) //force high quality if buffers high 00740 && ((realBWfact > 1.25) || (outStats->getBufferFillLevelInSecs() > RTP_BUF_HIGHWATER) ) ) { 00741 doSwitch = true; 00742 sumNetBW += (int)((double)outStats->getStreamBW() * 1.4); //accept frame dropping for higher quality stream! 00743 dprintf_full("ServerSession::run DECIDED ON STREAM SWITCHING : UP!!\n"); 00744 } 00745 } 00746 } 00747 } 00748 00749 if (doSwitch) { 00750 dprintf_full("ServerSession::run DECIDED ON STREAM SWITCHING out of %i variations! new overall NetworkCapacity: %i kbps\n", cm->findMetaObject(url)->getNumberOfVariations(), sumNetBW*8/1024); 00751 //sumNetBW in termcaps is needed for multi-stream containers: with Video+Audio! 00752 //termCap->setNetworkCapacityInByte((u32)(sumNetBW * 1.15)); //add a faked adaptation range 00753 termCap->setNetworkCapacityInByte((u32)(sumNetBW)); 00754 00755 lastSwitchSec = nowSec; 00756 00757 //exchange and set to the new input 00758 assert(mp4Stream); 00759 ContainerInfo *oldmp4 = mp4Stream; 00760 setUrl(new Url(url->toString()), false); //re-issue optimum stream finding process, forbid transcoding 00761 if (0 == strcmp(mp4Stream->getFirstVisualES()->getInput(),oldmp4->getFirstVisualES()->getInput())) { 00762 // I chose the same stream, so its useless 00763 dprintf_err("ServerSession::run: Sorry, there was no fitting stream... I took the same, so ignoring!\n"); 00764 delete mp4Stream; 00765 mp4Stream = oldmp4; 00766 } else { 00767 //set everything paused (input, output, DCs) 00768 //FIXME: completely ignores SessionIDs, resets all DCs 00769 for (list < DataChannel * >::iterator dc = channels.begin(); 00770 dc != channels.end(); dc++) { 00771 dprintf_full("DC ESid %llu newID %llu\n",(*dc)->getESInfo()->getStreamId(), 00772 mp4Stream->getFirstVisualES()->getStreamId()); 00773 ESInfo *new_es = mp4Stream->getES((*dc)->getESInfo()->getStreamId()); 00774 assert(new_es != NULL); 00775 if ( (*dc)->getESInfo() != new_es ) { 00776 long ivop_align; 00777 double vsec_time; 00778 list <u32> ids; 00779 list <u32>::iterator id; 00780 00781 //pause clients and RTP to get stable HighestPacketTS 00782 ids = (*dc)->getListOfActiveClientIds(); 00783 for(id = ids.begin(); id != ids.end(); ++id) { 00784 (*dc)->pause((*id)); 00785 } 00786 00787 //FIXME: only first DataSink 00788 DataSink* clientOut=(*dc)->getDataSink(clientId); 00789 IO* clientOutput=clientOut->getOutput(); 00790 IO *input = IOCreator::createInput(new_es, false); 00791 assert(input); 00792 input->setEndFrameNumber((*dc)->getInput()->getEndFrameNumber()); 00793 vsec_time=(double)(*dc)->getOutputStatistics()->getHighestPacketTS() 00794 / (double)(*dc)->getESInfo()->getMediaTimeScale(); 00795 u32 startFrame = new_es->getFrameNumber(vsec_time) + 1; 00796 ivop_align = input->setToClosestIFrame(startFrame, false); 00797 assert (ivop_align > 0); 00798 00799 //force DC to continue until switchPoint is reached 00800 // (normally already far after that.... but still...) 00801 (*dc)->setFinalFrameNumber(ivop_align - 1); 00802 00803 //ATTENTION: RTP preQ might (will) be _far_ ahead to switchPoint 00804 //so chop of unneeded data 00805 clientOutput->setEndFrameNumber(ivop_align); //new startFrame 00806 00807 //restart clients and RTP to flush out data until switchPoint 00808 // we have to wait until all data is really sent out to the network! 00809 ids = (*dc)->getListOfPausedClientIds(); 00810 for(id = ids.begin(); id != ids.end(); ++id) { 00811 (*dc)->play((*id), 0); 00812 } 00813 00814 //wait until DC-thread reaches switchPoint and stops 00815 (*dc)->initiateThreadStop(false); 00816 //reset DC to stream all data which is further arriving with the new_input 00817 (*dc)->setFinalFrameNumber(-1); 00818 00819 long actFrame; 00820 do {//wait until RTP is done 00821 actFrame = (*dc)->getOutputStatistics()->getHighestPacketTS() 00822 / (*dc)->getESInfo()->getVOPTimeIncrement(); 00823 dprintf_full("ServerSession::run waiting for RTP to reach switchPoint %li (now at frame %li)\n", 00824 ivop_align-1, actFrame); 00825 msleep(40); 00826 } while (actFrame < ivop_align-1); 00827 00828 //pause all clients and change to new ESs and input 00829 ids = (*dc)->getListOfActiveClientIds(); 00830 for(id = ids.begin(); id != ids.end(); ++id) { 00831 (*dc)->pause((*id)); 00832 //change RTP-ES, automatically also changes Statistics-ES and adaptPreQ 00833 (*dc)->getDataSink((*id))->getOutput()->setESInfo(new_es); 00834 } 00835 (*dc)->setESInfo(new_es); 00836 00837 //switch to new input with all clients paused 00838 (*dc)->setInput(input); 00839 00840 dprintf_full("ServerSession::run STREAM SWITCHING DONE! now restarting on new stream !\n"); 00841 00842 //restart with no prefetch and continue with the new stream 00843 ids = (*dc)->getListOfPausedClientIds(); 00844 for(id = ids.begin(); id != ids.end(); ++id) { 00845 (*dc)->play((*id), 0); 00846 } 00847 //DC was stopped in setInput, so restart it 00848 (*dc)->start(); 00849 } 00850 } 00851 delete oldmp4; //also deletes ESes! 00852 }//got the same stream 00853 }//SWITCH NOW!!! 00854 } 00855 00856 }//enough variations? 00857 } // while valid 00858 if(this->counter != NULL) 00859 this->counter->decrement(); 00860 dprintf_full("ServerSession::run() end!\n"); 00861 00862 delete [] inbuffer; 00863 this->exit(); 00864 00865 } 00866 00867 00868 00869 /***********************************************************************/ 00870 void ServerSession::setUrl(const Url *uri, bool makeExactMatch) 00871 { 00872 bool insertResUsage=false; 00873 if (cm && uri) { 00874 if (url) { 00875 delete url; 00876 } 00877 dprintf_full("ServerSession::setUrl: %s \r\n", uri->toString()); 00878 //if (url->getFileName() == NULL) 00879 // dprintf_full("ServerSession::setUrl: file NULL!!!\n"); 00880 00881 dprintf_full("ServerSession::setUrl: %s filename: %s\r\n", uri->toString(), uri->getFileName()); 00882 url = new Url(uri->toString()); 00883 bool exactMatch; 00884 MetaObject *meta = cm->findMetaObject(url); 00885 if (meta != NULL) { 00886 dprintf_full("ServerSession::setUrl metadata found\n"); 00887 } else { 00888 dprintf_full("ServerSession::setUrl metadata NOT found\n"); 00889 } 00890 00891 dprintf_full("ServerSession::setUrl termcap: %p, userpref %p meta: %p\n", termCap, userPref, meta); 00892 00893 if( (!termCap && !userPref)) { 00894 // just send ANY version 00895 dprintf_full("ServerSession::setUrl NO metaobject/metadata found!\n"); 00896 mp4Stream = cm->getVideo(url); 00897 if (mp4Stream == NULL) { 00898 fprintf(stderr,"\nERROR: demuxed stream not found or maybe you are starting the server from a different computer (hostname/IP) now? DELETE demux directory!\n"); 00899 //::exit(1); 00900 return; 00901 } 00902 00903 mp4Stream=mp4Stream->clone(); // CM returns the orig video, we don't want it in server 00904 // calc resUsage 00905 VideoESInfo* vis=mp4Stream->getFirstVisualES(); 00906 if(vis) { 00907 00908 dprintf_full("ServerSession::setUrl VIS found\n"); 00909 Feature src(vis); 00910 src.isCached=true; 00911 ResourceUsage* tmpRU=CostFunction::calcResourceUsage(&src,&src,admControl->getResourceLimit()); 00912 if(tmpRU) { 00913 resUsage.network=tmpRU->network; 00914 resUsage.disk=tmpRU->disk; 00915 resUsage.cpu=tmpRU->cpu; 00916 delete tmpRU; 00917 insertResUsage=true; 00918 } 00919 } 00920 } 00921 00922 else if(meta && userPref) { 00923 // FIXME: hardcode delay values to 100ms 00924 dprintf_full("ServerSession::setUrl meta && userPref\n"); 00925 suggestedAdaptor=admControl->makeBestMatch(userPref,100,meta,&mp4Stream,&resUsage); 00926 00927 if(mp4Stream) 00928 mp4Stream=mp4Stream->clone(); 00929 } 00930 else if(meta && termCap ) 00931 { //use MetaObject for best match 00932 // bandwidth should be the minimum of network and decodeBandwidth 00933 // if one of them is zero, ignore the zero param 00934 dprintf_full("ServerSession::setUrl meta && termCap\n"); 00935 u32 bandwidth=termCap->getNetworkCapacityInByte()*8; 00936 if(bandwidth==0) 00937 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00938 else if(termCap->getMaxDecoderBitRateInBit() && 00939 termCap->getMaxDecoderBitRateInBit() < bandwidth) { 00940 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00941 } 00942 dprintf_full("ServerSession::setUrl: termCap->getColorDisplay()=%u\r\n",termCap->getColorDisplay()); 00943 mp4Stream=meta->findMp4Stream(0,termCap->getDisplayWidth(), 0,termCap->getDisplayHeight(), 0,bandwidth, (termCap->getColorDisplay()==0), 00944 (float)termCap->getDisplayRefreshRate(), &exactMatch); 00945 if(mp4Stream) 00946 mp4Stream=mp4Stream->clone(); 00947 if(makeExactMatch && !exactMatch) { 00948 suggestedAdaptor=admControl->makeExactMatch(termCap->getDisplayWidth(), 00949 termCap->getDisplayHeight(), 00950 bandwidth, 00951 (termCap->getColorDisplay()==0), 00952 (float)termCap->getDisplayRefreshRate(), 00953 mp4Stream,&resUsage); 00954 } 00955 else 00956 { 00957 // calc the resource usage for the non transcoding case 00958 if(mp4Stream) 00959 { 00960 VideoESInfo* vis=mp4Stream->getFirstVisualES(); 00961 if(vis) 00962 { 00963 Feature src(vis); 00964 src.isCached=true; 00965 ResourceUsage* tmpRU=CostFunction::calcResourceUsage(&src,&src,admControl->getResourceLimit()); 00966 if(tmpRU) 00967 { 00968 resUsage.network=tmpRU->network; 00969 resUsage.disk=tmpRU->disk; 00970 resUsage.cpu=tmpRU->cpu; 00971 delete tmpRU; 00972 insertResUsage=true; 00973 } 00974 } 00975 } 00976 00977 00978 } 00979 00980 } 00981 #ifndef WINCE 00982 else if(termCap && ((strcmp(uri->getFileName(), "v4l")) ==0)) 00983 { 00984 dprintf_full("ServerSession::setUrl v4l && termCap &&\n"); 00985 u32 bandwidth=termCap->getNetworkCapacityInByte()*8; 00986 if(bandwidth==0) 00987 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00988 else if(termCap->getMaxDecoderBitRateInBit() && termCap->getMaxDecoderBitRateInBit() < bandwidth) 00989 { 00990 bandwidth=termCap->getMaxDecoderBitRateInBit(); 00991 } 00992 dprintf_full("ServerSession::setUrl: v4l termCap->getColorDisplay()=%u\r\n", 00993 termCap->getColorDisplay()); 00994 00995 mp4Stream = Video4LinuxContainerFile::loadContainerInfo(); 00996 if(mp4Stream) 00997 { 00998 dprintf_full("ServerSession::setUrl: v4l mp4stream ok\n"); 00999 mp4Stream=mp4Stream->clone(); 01000 } 01001 else 01002 dprintf_full("ServerSession::setUrl: v4l mp4stream NULL!! \n"); 01003 if(makeExactMatch && !exactMatch) 01004 { 01005 suggestedAdaptor=admControl->makeExactMatch(termCap->getDisplayWidth(), termCap->getDisplayHeight(),bandwidth, (termCap->getColorDisplay()==0), (float)termCap->getDisplayRefreshRate(),mp4Stream,&resUsage); 01006 } 01007 dprintf_full("ServerSession::setUrl: v4l OK\r\n"); 01008 01009 } 01010 #endif 01011 } 01012 01013 // now insert the resource usage for those other cases 01014 if(insertResUsage) 01015 { 01016 if(!userPref && !admControl->insertRequest(&resUsage)) 01017 { 01018 resUsage.cpu=resUsage.disk=resUsage.network=0.0; 01019 if(mp4Stream) 01020 delete mp4Stream; 01021 mp4Stream=NULL; 01022 if(suggestedAdaptor) 01023 delete suggestedAdaptor; 01024 suggestedAdaptor=NULL; 01025 } 01026 } 01027 }; 01028 01029 01030 /***********************************************************************/ 01031 bool ServerSession::setOptions(const Url* fileName, const char* remaining) 01032 { 01033 01034 assert(fileName && remaining); 01035 u32 session=0; 01036 std::map<char*, char*, struct stringCompare>* param=new std::map<char*, char*, stringCompare>(); 01037 this->prot->parseSetParameter(remaining,&session,param); 01038 if(param->size()>0) { 01039 // search the DC for the session 01040 dprintf_full("ServerSession::setOptions: detected %i params\n",param->size()); 01041 DataChannel* dc=NULL; 01042 list < DataChannel * >::iterator dcI; 01043 if (!channels.empty()) { 01044 dcI = channels.begin(); 01045 while (dcI != channels.end()) { 01046 if ((*dcI)->getSessionId()== (u32)session) { 01047 dc=(*dcI); 01048 dcI=channels.end(); 01049 } 01050 } 01051 } 01052 if(dc) { 01053 dprintf_full("ServerSession::setOptions: found DataChannel\n"); 01054 // analyze params: we currently only support FRAME_FILTER 01055 char** invalid=new char*[param->size()]; 01056 int invalidCnt=0; 01057 std::map<char*, char*, stringCompare>::iterator mapI; 01058 for(mapI=param->begin();mapI!=param->end();++mapI) { 01059 if( !strstr((*mapI).first,"FRAME_FILTER")) { 01060 invalid[invalidCnt]=(*mapI).first; 01061 invalidCnt++; 01062 } 01063 else if(strstr((*mapI).first,"FRAME_FILTER")) { 01064 // set it at input 01065 dprintf_full("ServerSession::setOptions: detected FRAME_FILTER\n"); 01066 IO* io=dc->getInput(); 01067 #ifdef WINCE 01068 FilteredIO * fio = (FilteredIO*) io; 01069 #else 01070 FilteredIO* fio=dynamic_cast<FilteredIO*>(io); 01071 #endif 01072 if(fio) { 01073 //if( typeid(io) == typeid(FilteredIO)) { 01074 dprintf_full("ServerSession.setOptions found %s\n",(*mapI).second); 01075 BitField* b=BitField::fromHexString( (*mapI).second); 01076 01077 if(b) { 01078 // we must invert the bitfield, because a 1 there tells us what frames the proxy 01079 // already has, we need to send the 0 --> invert 01080 b->invert(); 01081 ((FilteredIO*) io)->setBitField(b); 01082 // prot->generateOk(); 01083 } 01084 else { 01085 dprintf_err("ServerSession::setOptions: failed to create BitField for String\n"); 01086 prot->generateInvalidParameter(0,NULL); 01087 delete invalid; 01088 strcpy(buffer,prot->getBuffer()); 01089 prot->incSeqNr(); 01090 RTSP::freeMapMemory(param); 01091 delete param; 01092 delete[] invalid; 01093 return true; 01094 } 01095 } 01096 else { // IO doesn't support it 01097 invalid[0]="FRAME_FILTER"; 01098 dprintf_full("ServerSession::setOptions: FRAME_FILTERing not supported by IO\n"); 01099 prot->generateInvalidParameter(1,invalid); 01100 delete invalid; 01101 strcpy(buffer,prot->getBuffer()); 01102 prot->incSeqNr(); 01103 RTSP::freeMapMemory(param); 01104 delete param; 01105 return true; 01106 } 01107 } 01108 } // for 01109 if(invalidCnt==0) 01110 prot->generateOk(); 01111 else 01112 prot->generateInvalidParameter(invalidCnt,invalid); 01113 delete[] invalid; 01114 } 01115 else { // Session not found 01116 prot->generateSessionNotFound(); 01117 } 01118 } 01119 else { //set without params 01120 prot->generateOk(); 01121 } 01122 prot->incSeqNr(); 01123 strcpy(buffer,prot->getBuffer()); 01124 RTSP::freeMapMemory(param); 01125 delete param; 01126 return true; 01127 }; 01128 01129 01130 01131 /***********************************************************************/ 01132 bool ServerSession::getOptions(const Url* fileName, const char* remaining) 01133 { 01134 assert(fileName && remaining); 01135 u32 session=0; 01136 int numparams=0; 01137 char** params=NULL; 01138 this->prot->parseGetParameter(remaining,&session,&numparams,¶ms); 01139 // FIXME: implement 01140 01141 //handle parameters. currently, only 'position' is supported! 01142 for (int i = 0; i < numparams; i++) { 01143 01144 if (strcmp(params[i], "position") == 0) { 01145 for (list < DataChannel * >::iterator dc = channels.begin(); 01146 dc != channels.end(); dc++) { 01147 //calculate time in msecs from requested session 01148 if ((*dc)->getSessionId() == session) { 01149 if ((*dc)->getOutputStatistics()) { 01150 u32 rtptimems = (u32)((*dc)->getOutputStatistics()->getPlayoutSec() * 1000); 01151 01152 //add result into map and generate response msg 01153 std::map<char*, char*, struct stringCompare>* param=new std::map<char*, char*, stringCompare>(); 01154 char rtptimesStr[11]; 01155 sprintf(rtptimesStr,"%d", rtptimems); 01156 std::pair<char*, char*> pos("position", rtptimesStr); 01157 param->insert(param->begin(), pos); 01158 01159 prot->generateResponseGetParameter(param); 01160 const char* buf=prot->getBuffer(); 01161 strcpy(buffer,buf); 01162 } else { 01163 dprintf_err("ServerSession::setup() \nCannot get output statistics!\n"); 01164 } 01165 } 01166 } 01167 } 01168 } 01169 01170 prot->incSeqNr(); 01171 return true; 01172 }; 01173