SemiProxySession.cpp

00001 /*********************************************************************** 00002 * * 00003 * ViTooKi * 00004 * * 00005 * title: SemiProxySession.cpp * 00006 * * 00007 * * 00008 * * 00009 * ITEC institute of the University of Klagenfurt (Austria) * 00010 * http://www.itec.uni-klu.ac.at * 00011 * * 00012 * * 00013 * For more information visit the ViTooKi homepage: * 00014 * http://ViTooKi.sourceforge.net * 00015 * vitooki-user@lists.sourceforge.net * 00016 * vitooki-devel@lists.sourceforge.net * 00017 * * 00018 * This file is part of ViTooKi, a free video toolkit. * 00019 * ViTooKi is free software; you can redistribute it and/or * 00020 * modify it under the terms of the GNU General Public License * 00021 * as published by the Free Software Foundation; either version 2 * 00022 * of the License, or (at your option) any later version. * 00023 * * 00024 * This program is distributed in the hope that it will be useful, * 00025 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00026 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 00027 * GNU General Public License for more details. * 00028 * * 00029 * You should have received a copy of the GNU General Public License * 00030 * along with this program; if not, write to the Free Software * 00031 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, * 00032 * MA 02111-1307, USA. * 00033 * * 00034 ***********************************************************************/ 00035 00036 /*********************************************************************** 00037 * * 00038 * REVISION HISTORY: * 00039 * * 00040 * * 00041 * * 00042 ***********************************************************************/ 00043 00044 /*********************************************************************** 00045 * Video Session Migration System, 2005 * 00046 * Author: Klaus Schoeffmann * 00047 ************************************************************************/ 00048 00049 00050 #include "SemiProxySession.hpp" 00051 00052 vector<SemiProxySession*> SemiProxySession::sessionList; 00053 00054 00055 /***********************************************************************/ 00056 SemiProxySession::SemiProxySession(int fdclient, struct sockaddr_in* clientsock, SemiProxy *sp) : Session() 00057 { 00058 00059 fdserver = 0; 00060 this->fdclient = sessionControlChannel = fdclient; 00061 this->sp = sp; 00062 this->clientsock = clientsock; 00063 this->buffer = new char[MSG_BUFFER_SIZE]; 00064 initSemiProxySession(); 00065 00066 sessionList.push_back(this); 00067 } 00068 00069 /***********************************************************************/ 00070 SemiProxySession::~SemiProxySession() 00071 { 00072 Globals::sdebug << "\nSemiProxySession::cleanup() " << RtspSessionID << " ..."; 00073 if (buffer != NULL) delete buffer; 00074 //do not delete asess, because this object is deleted by SessionMgmt.cpp 00075 //if (asess != NULL) delete asess; 00076 if (timemsmnt != NULL) delete timemsmnt; 00077 if (SessionURI != NULL) delete SessionURI; 00078 Globals::sdebug << "\nSemiProxySession::cleanup() done!"; 00079 } 00080 00081 /***********************************************************************/ 00082 void SemiProxySession::initSemiProxySession() 00083 { 00084 bytesRead = 0; 00085 lastClientCSeq = 0; 00086 lastServerCSeq = 0; 00087 prevDuration = 0; 00088 noTermCapsFound = false; 00089 lastClientRtspMsg = RTSP_NONE; 00090 RtspSessionID = 0; 00091 lastCRtspSessionID = 0; 00092 SessionURI = NULL; 00093 asess = NULL; 00094 timemsmnt = NULL; 00095 requestedHostname = NULL; 00096 receivedServerPosition = 0; 00097 receivedClientPosition = 0; 00098 sendServerPositionRequest = false; 00099 sendClientPositionRequest = false; 00100 sessionHasBeenPaused = false; 00101 playerInfo = PI_UNKOWN; 00102 00103 //init variables of super-class 00104 state = SESSION_NEW; 00105 termCap = NULL; 00106 userPref = NULL; 00107 client = NULL; 00108 clientId = 0; 00109 prot = NULL; 00110 url = NULL; 00111 serverID = NULL; 00112 canAdmitRequest = false; 00113 globalTimer = NULL; 00114 gtRefCount = NULL; 00115 gtSharedAdapt = NULL; 00116 //resUsage = 0; 00117 sessionControlChannel = 0; 00118 mp4Stream = NULL; 00119 suggestedAdaptor = NULL; 00120 } 00121 00122 /***********************************************************************/ 00123 void SemiProxySession::resetFlags() 00124 { 00125 prevDuration = 0; 00126 noTermCapsFound = false; 00127 lastClientRtspMsg = RTSP_NONE; 00128 RtspSessionID = 0; 00129 lastCRtspSessionID = 0; 00130 receivedServerPosition = 0; 00131 receivedClientPosition = 0; 00132 sendServerPositionRequest = false; 00133 sendClientPositionRequest = false; 00134 sessionHasBeenPaused = false; 00135 } 00136 00137 00138 /***********************************************************************/ 00139 void SemiProxySession::removeSession() 00140 { 00141 closeSession(); 00142 sp->removeSession(asess); 00143 } 00144 00145 /***********************************************************************/ 00146 void SemiProxySession::closeSession() 00147 { 00148 state=SESSION_CLOSED; 00149 00150 if (asess != NULL && !asess->isClosed()) asess->setClosed(); 00151 00152 if (this->fdclient != 0) 00153 { 00154 #ifndef WIN32 00155 close(this->fdclient); 00156 #else 00157 closesocket(this->fdclient); 00158 #endif 00159 Globals::sdebug << "\n::closeSession(): client-socket CLOSED !"; 00160 } 00161 00162 if (this->fdserver != 0) 00163 { 00164 #ifndef WIN32 00165 close(this->fdserver); 00166 #else 00167 closesocket(this->fdserver); 00168 #endif 00169 Globals::sdebug << "\n::closeSession(): server-socket CLOSED !"; 00170 } 00171 } 00172 00173 /***********************************************************************/ 00174 SemiProxySession* SemiProxySession::getSemiProxySession(uint RtspSessionId) 00175 { 00176 Globals::sdebug << "\n:getSemiProxySession(): starts"; 00177 00178 vector<SemiProxySession*>::const_iterator iter = sessionList.begin(); 00179 while (iter != sessionList.end()) 00180 { 00181 SemiProxySession *sess = *iter; 00182 bool contained = sess->containsSessionID(RtspSessionId); 00183 00184 Globals::sdebug << "\n::getSemiProxySession(): " << RtspSessionId << " contained?: " << contained; 00185 if (contained == true) return sess; 00186 00187 iter++; 00188 } 00189 00190 //not found 00191 return NULL; 00192 00193 } 00194 00195 /***********************************************************************/ 00196 bool SemiProxySession::checkServerConnection(const Url* fileName) 00197 { 00198 assert(fileName); 00199 //if not already connected to server: try to connect 00200 if (this->fdserver == 0) 00201 { 00202 //if url contains a port: connect to this port 00203 //otherwise: use defaultport (554) 00204 uint port = DEFAULT_RTSPSERVER_PORT; 00205 if (fileName->getPort() > 0) { 00206 port = fileName->getPort(); 00207 Globals::sdebug << "\n::checkServerConnection: port of fileName used: " << port; 00208 } 00209 Globals::sdebug << "\ncheckServerConnection:"; 00210 if (params.getHost() != NULL) { 00211 requestedHostname = params.getHost(); 00212 Globals::sdebug << " host=" << params.getHost(); 00213 } 00214 if (params.getPort() > 0) { 00215 port = params.getPort(); 00216 Globals::sdebug << " port=" << params.getPort(); 00217 } 00218 00219 //connect to servername, specified in url 00220 //dont use servername from URL, because it is not correctly resolved sometimes 00221 //(e.g. try for "rtsp://rm1.warpnet.jet-stream.nl:554/jet-stream/mpeg4.mp4/") 00222 //this->fdserver = sp->createRtspServerSocket(fileName->getServerName(), port); 00223 this->fdserver = sp->createRtspServerSocket(requestedHostname, port); 00224 if (this->fdserver == 0) 00225 { 00226 removeSession(); 00227 return false; 00228 } 00229 } 00230 return true; 00231 } 00232 00233 00234 00235 /***********************************************************************/ 00236 char *SemiProxySession::extractStrParameter(string &strbuf, char *pname, bool removeIt) 00237 { 00238 assert(pname); 00239 Globals::sdebug << "\n::extractStrParameter(): looking for " << pname; 00240 00241 uint pstart, pend, pend1, pend2, pend3; 00242 //first look, if specified parameter is in passed string 00243 if ((pstart = strbuf.find(pname)) != string::npos) 00244 { 00245 //find end of parameter 00246 //pend1 = strbuf.find("&", pstart+strlen(pname)); //next parameter separated by '&' 00247 pend1 = strbuf.find("/", pstart+strlen(pname)); //next parameter separated by '&' 00248 pend2 = strbuf.find("/", pstart+strlen(pname)); //on setup, trackid separated by '/' 00249 pend3 = strbuf.find(" ", pstart+strlen(pname)); //last parameter 00250 if (pend1 == string::npos && pend2 == string::npos && pend3 == string::npos) 00251 { 00252 Globals::sdebug << "\nParamEndProblem"; 00253 throw ParamEndProblem(); 00254 } 00255 00256 //take lowest one 00257 pend = (pend1 <= pend2) ? pend1 : pend2; 00258 pend = (pend <= pend3) ? pend : pend3; 00259 00260 string tmp = strbuf.substr(pstart + strlen(pname), pend - (pstart + strlen(pname))); 00261 Globals::sdebug << "...= " << pname << tmp; 00262 00263 //remove parameter from string (if should remove) 00264 if (removeIt) 00265 { 00266 //move this param from all positions in the string 00267 //(this is necessary, if there are more than one messages on one received-buffer - 00268 //e.g. SET_PARAMETER and PLAY) 00269 char *tmpparam = semifunc::newStrCpy((char*)strbuf.c_str() + pstart, pend-pstart); 00270 //strbuf.erase(pstart, pend - pstart); 00271 semifunc::replaceStr(strbuf, tmpparam, "", true); 00272 } 00273 00274 return (char*)tmp.c_str(); 00275 } 00276 else 00277 { 00278 //Param NOT FOUND 00279 Globals::sdebug << " NOT FOUND!"; 00280 return NULL; 00281 } 00282 } 00283 00284 /***********************************************************************/ 00285 int SemiProxySession::extractParameter(string &strbuf, char *pname, bool removeIt) 00286 { 00287 //Globals::sdebug << "\n::extractParameter(): looking for " << pname; 00288 00289 char *par = extractStrParameter(strbuf, pname, removeIt); 00290 if (par != NULL) 00291 { 00292 return atoi(par); 00293 } 00294 else 00295 { 00296 return -1; 00297 } 00298 } 00299 00300 /***********************************************************************/ 00301 ulong SemiProxySession::extractDuration(string &strbuf) 00302 { 00303 ulong dur = 0; 00304 00305 uint rpos1, rpos2; 00306 if ((rpos1 = strbuf.find("a=range:npt=")) != string::npos) 00307 { 00308 if ((rpos2 = strbuf.find("-", rpos1)) != string::npos) 00309 { 00310 double ddur = atof(strbuf.c_str() + rpos2 + 1); 00311 ddur *= 1000; 00312 dur = (ulong)ddur; 00313 Globals::sdebug << "\n::extractDuration(): =" << dur; 00314 } 00315 } 00316 00317 return dur; 00318 } 00319 00320 /***********************************************************************/ 00321 ulong SemiProxySession::extractRangeFrom(string &strbuf) 00322 { 00323 ulong from = 0; 00324 00325 uint rpos1; 00326 if ((rpos1 = strbuf.find("Range: npt=")) != string::npos) 00327 { 00328 double dfrom = atof(strbuf.c_str() + rpos1 + 11); 00329 dfrom *= 1000; 00330 from = (ulong)dfrom; 00331 Globals::sdebug << "\n::extractRangeFrom(): =" << from; 00332 } 00333 00334 return from; 00335 } 00336 00337 /***********************************************************************/ 00338 ulong SemiProxySession::extractPrebuffer(string &strbuf) 00339 { 00340 ulong prebuffer = 0; 00341 00342 uint bpos1; 00343 if ((bpos1 = strbuf.find("x-prebuffer: maxtime=")) != string::npos) 00344 { 00345 double dprebuffer = atof(strbuf.c_str() + bpos1 + 21); 00346 dprebuffer *= 1000; 00347 prebuffer = (ulong)dprebuffer; 00348 Globals::sdebug << "\n::extractPrebuffer(): =" << prebuffer; 00349 } 00350 00351 return prebuffer; 00352 } 00353 00354 /**********************************************************************/ 00355 uint SemiProxySession::getContentLength(string strbuf) 00356 { 00357 uint clpos1, clpos2; 00358 if ((clpos1 = strbuf.find("Content-Length: ")) != string::npos) 00359 { 00360 clpos2 = strbuf.find("\r\n", clpos1); 00361 return atoi(strbuf.c_str() + clpos1 + 16); 00362 } 00363 else 00364 { 00365 cerr << "\n::getContentLength(): Content-Length not found (cannot change)!" << endl; 00366 return 0; 00367 } 00368 } 00369 00370 /**********************************************************************/ 00371 void SemiProxySession::changeContentLength(string &strbuf, uint newVal) 00372 { 00373 uint clpos1, clpos2; 00374 if ((clpos1 = strbuf.find("Content-Length: ")) != string::npos) 00375 { 00376 clpos2 = strbuf.find("\r\n", clpos1); 00377 strbuf.replace(strbuf.begin() + clpos1 + 16, strbuf.begin() + clpos2, semifunc::toString(newVal)); 00378 } 00379 else 00380 { 00381 cerr << "\n::changeContentLength(): Content-Length not found (cannot change)!" << endl; 00382 } 00383 } 00384 00385 /**********************************************************************/ 00386 void SemiProxySession::extractCSeq(char *tmpbuffer, uint &lastCSeq) 00387 { 00388 assert(tmpbuffer); 00389 char *cseqpos; 00390 Globals::sdebug << "\n::extractCSeq(): "; 00391 00392 if ((cseqpos = strstr(tmpbuffer, "CSeq: ")) != NULL) 00393 { 00394 uint i = 0; 00395 while (cseqpos[i+5] != 0 && cseqpos[i+5] != '\r' && cseqpos[i+5] != '\n') i++; 00396 Globals::sdebug << "\n::extractCSeq(): i = " << i; 00397 char *strCSeq = new char[i+1]; 00398 strncpy(strCSeq, cseqpos+5, i); 00399 strCSeq[i] = 0; 00400 lastCSeq = atoi(strCSeq); 00401 delete strCSeq; 00402 Globals::sdebug << "\n::extractCSeq(): lastCSeq = " << lastCSeq; 00403 } 00404 } 00405 00406 /**********************************************************************/ 00407 void SemiProxySession::replaceCSeq(string &str, uint newCSeq) 00408 { 00409 //replace CSeq 00410 uint pos1 = str.find("CSeq: "); 00411 if (pos1 != string::npos) 00412 { 00413 pos1 += 6; 00414 uint pos2 = str.find("\r\n", pos1); 00415 if (pos2 != string::npos) 00416 { 00417 str.replace(pos1, pos2-pos1, semifunc::toString(newCSeq)); 00418 Globals::sdebug << "\n::replaceCSeq(): replaced CSeq to " << newCSeq; 00419 } 00420 else 00421 { 00422 cerr << "\n::replaceCSeq(): found CSeq without end!" << endl; 00423 } 00424 } 00425 } 00426 00427 /***********************************************************************/ 00428 void SemiProxySession::replaceRange(string &strbuf, char *newVal, bool replaceOnlyFromValue) 00429 { 00430 //search content of msg for range in npt-format 00431 unsigned int rastart, raend; 00432 if ((rastart = strbuf.find("Range: npt=")) != string::npos) 00433 { 00434 assert(newVal); 00435 if (replaceOnlyFromValue) 00436 raend = strbuf.find("-", rastart + 11); 00437 else 00438 raend = strbuf.find("\r\n", rastart + 11); 00439 strbuf.replace(rastart + 11, raend-(rastart + 11), newVal); 00440 } 00441 else 00442 { 00443 cerr << "\n::replaceRange(): Range-Parameter not found! (cannot change)" << endl; 00444 } 00445 00446 } 00447 00448 00449 /***********************************************************************/ 00450 string SemiProxySession::createTerminalCapabilities(Profile *profile) 00451 { 00452 00453 if (profile == NULL) 00454 { 00455 return ""; 00456 } 00457 else 00458 { 00459 00460 TerminalCapabilities *tc = new TerminalCapabilities(); 00461 tc->setColorDisplay(profile->isColorCapable()); 00462 tc->setDisplayWidth(profile->getWidth()); 00463 tc->setDisplayHeight(profile->getHeight()); 00464 tc->setDisplayBitsPerPixel(profile->getBitsPerPixel()); 00465 tc->setDisplayRefreshRate(profile->getRefreshRate()); 00466 tc->setNetworkCapacityInByte(profile->getNetkBps() * 1000); 00467 //audio properties 00468 tc->setAudioSamplingFrequency(44000); 00469 tc->setAudioBitsPerSample(16); 00470 tc->setNumAudioChannels(2); 00471 00472 00473 string termCaps (MP21::generateTerminalCapabilityDescription(tc)); 00474 00475 delete tc; 00476 00477 return termCaps; 00478 } 00479 } 00480 00481 /***********************************************************************/ 00482 void SemiProxySession::forwardToServer(char *buffer, int len) 00483 { 00484 Globals::sdebug << "\n::forwardToServer(): starts"; 00485 if (buffer == NULL) 00486 { 00487 Globals::sdebug << "\n::forwardToServer(): detected buffer=NULL!!!"; 00488 } 00489 else 00490 { 00491 00492 //first store CSeq received from client 00493 Globals::sdebug << "\n::forwardToServer(): before call to extractCSeq()"; 00494 extractCSeq(buffer, lastClientCSeq); 00495 Globals::sdebug << "\n::forwardToServer(): lastClientCSeq=" << lastClientCSeq; 00496 00497 //additional newlines, needed for pvplayer 00498 string strbuf(buffer); 00499 if (params.getPort() > 0 || params.getHost() != NULL) { 00500 strbuf.append("\r\n\r\n"); 00501 } 00502 00503 00504 //add/change terminal capabilites? 00505 if (sp->shouldAddTermCaps()) 00506 { 00507 00508 //check terminal capabilities (only if profile specified) 00509 if (lastClientRtspMsg == RTSP_DESCRIBE && params.getProfileId() != 0) 00510 { 00511 uint tcBegin, tcEnd; 00512 00513 Profile *profile = sp->getProfileList()->getProfileById(params.getProfileId()); 00514 assert(profile != NULL); 00515 string termCaps = createTerminalCapabilities(profile); 00516 00517 tcBegin = strbuf.find("<xml "); 00518 tcEnd = strbuf.find("</DIDL>"); 00519 00520 if (strbuf.find("Content-Type: application/mpeg21_dia") != string::npos && 00521 tcBegin != string::npos && tcEnd != string::npos) 00522 { 00523 00524 //termcaps found 00525 strbuf.replace(tcBegin, (tcEnd + 7) - tcBegin, termCaps); 00526 uint currentLength = getContentLength(strbuf); 00527 currentLength -= (tcEnd + 7) - tcBegin; 00528 currentLength += termCaps.length(); 00529 //correct content-length 00530 changeContentLength(strbuf, currentLength); 00531 00532 } 00533 else 00534 { 00535 00536 noTermCapsFound = true; 00537 00538 Globals::sdebug << "\n::forwardToServer(): No Terminalcapabilities found... will add!"; 00539 00540 if (strbuf.find("Content-Length:") != string::npos) 00541 { 00542 cerr << "\n::forwardToServer(): cannot handle existing DESCRIBE content!" << endl; 00543 } 00544 else 00545 { 00546 00547 ostringstream ctinfo, content; 00548 ctinfo << "Content-Type: application/mpeg21_dia\r\n" 00549 << "Content-Length: " << termCaps.length() << "\r\n\r\n" 00550 << termCaps << "\r\n"; 00551 00552 uint uapos1, uapos2; 00553 if ((uapos1 = strbuf.find("User-Agent:")) != string::npos) 00554 { 00555 uapos2 = strbuf.find("\r\n", uapos1); 00556 strbuf.insert(uapos2+2, ctinfo.str()); 00557 00558 //append tercaps to end 00559 strbuf.insert(strbuf.rend()-strbuf.rbegin(), content.str()); 00560 } 00561 else 00562 { 00563 cerr << "\n::forwardToServer(): User-Agent not found in message!" << endl; 00564 } 00565 00566 } 00567 00568 } 00569 } 00570 } 00571 00572 00573 //replace CSeq value 00574 replaceCSeq(strbuf, lastServerCSeq+1); 00575 00576 //debugging output 00577 string tmpstr(strbuf); 00578 Globals::sdebug << "\n::forwardToServer(): Forwarding to SERVER:\n" << tmpstr; 00579 00580 send(this->fdserver, strbuf.c_str(), strbuf.length(), 0); 00581 00582 Globals::sdebug << "\n::forwardToServer(): success"; 00583 } 00584 } 00585 00586 00587 /***********************************************************************/ 00588 void SemiProxySession::forwardToClient(char *buffer, int len) 00589 { 00590 Globals::sdebug << "\n::forwardToClient(): starts"; 00591 if (buffer == NULL) 00592 { 00593 Globals::sdebug << "\n::forwardToClient(): detected buffer=NULL!!!"; 00594 } 00595 else 00596 { 00597 00598 //first store CSeq received from server 00599 extractCSeq(buffer, lastServerCSeq); 00600 Globals::sdebug << "\n::forwardToClient(): lastServerCSeq=" << lastServerCSeq; 00601 00602 string str(buffer); 00603 00604 //if the client uses a host/port parameter (because it doesn't support a proxy), 00605 //replace the content-base to the original filename 00606 if ((params.getFullURI() != NULL) && ((params.getHost() != NULL) || (params.getPort() != 0)) ) 00607 { 00608 uint cbstart = str.find("Content-Base: "); 00609 if (cbstart != string::npos) 00610 { 00611 uint cbend = str.find("\r\n", cbstart); 00612 std::ostringstream newContentBase; 00613 newContentBase << "Content-Base: " << params.getFullURI(); 00614 semifunc::replaceStr(str, cbstart, cbend, (char*)newContentBase.str().c_str()); 00615 } 00616 } 00617 00618 //replace CSeq value 00619 replaceCSeq(str, lastClientCSeq); 00620 00621 //debugging output 00622 string tmpstr (str); 00623 semifunc::replaceStr(tmpstr, "\r\n", "\r \n\t", true); 00624 Globals::sdebug << "\n::forwardToClient(): Forwarding to CLIENT:\n" << tmpstr; 00625 00626 send(this->fdclient, str.c_str(), str.length(), 0); 00627 00628 //if last client message was PLAY, start time-measurement for current time-section 00629 //but only for the _first_ PLAY message (session-state not ACTIVE) 00630 if (this->lastClientRtspMsg == RTSP_PLAY && state != SESSION_ACTIVE) 00631 { 00632 Globals::sdebug << "\n::forwardToClient(): _time measurement started_"; 00633 timemsmnt->startTS(); 00634 } else { 00635 Globals::sdebug << "\n::forwardToClient(): _time measureemnt not started (" << lastClientRtspMsg << "==" << RTSP_PLAY << ", " << state << "!=" << SESSION_ACTIVE << ")"; 00636 } 00637 00638 //request current media time from media player 00639 //but only for the _first_ PAUSE message (if there are more than on track) 00640 if (this->lastClientRtspMsg == RTSP_PAUSE && sp->shouldRequestClientPosition() && state != SESSION_PAUSED 00641 && SemiProxySession::getSemiProxySession(this->RtspSessionID)->getPlayerInfo() != PI_PVPLAYER) 00642 { 00643 ulong cplayout = SemiProxySession::getSemiProxySession(this->RtspSessionID)->getClientTimeMs(); 00644 timemsmnt->setTSclientPlayout(cplayout); 00645 } 00646 00647 Globals::sdebug << "\n::forwardToClient(): success"; 00648 } 00649 } 00650 00651 00652 /***********************************************************************/ 00653 void SemiProxySession::handleOptionsResponse(string &buf, int &bytesRead) 00654 { 00655 forwardToClient((char*)buf.c_str(), bytesRead); 00656 } 00657 00658 /***********************************************************************/ 00659 void SemiProxySession::handleDescribeResponse(string &buf, int &bytesRead) 00660 { 00661 if (timemsmnt != NULL) 00662 { 00663 delete timemsmnt; 00664 timemsmnt = NULL; 00665 } 00666 00667 //create TimeMeasurement object and store duration 00668 timemsmnt = new TimeMeasurement(); 00669 timemsmnt->setDuration(extractDuration(buf)); 00670 00671 forwardToClient((char*)buf.c_str(), bytesRead); 00672 } 00673 00674 /***********************************************************************/ 00675 void SemiProxySession::handleSetupResponse(string &buf, int &bytesRead) 00676 { 00677 Globals::sdebug << "\n::handleSetupResponse(): session initialized"; 00678 00679 //store sessionid in track 00680 uint sStart, sEnd1, sEnd2; 00681 sStart = buf.find("Session: "); 00682 if (sStart != string::npos) { 00683 sStart += strlen("Session: "); 00684 sEnd1 = buf.find("\n"); 00685 sEnd2 = buf.find("\r"); 00686 if (sEnd2 != string::npos && sEnd1 != string::npos && sEnd2 < sEnd1) sEnd1 = sEnd2; 00687 tracks.setIdOfLastTrack(atoi(buf.substr(sStart, sEnd1-sStart).c_str())); 00688 } 00689 00690 forwardToClient((char*)buf.c_str(), bytesRead); 00691 00692 state = SESSION_INITIALIZED; 00693 } 00694 00695 /***********************************************************************/ 00696 void SemiProxySession::handlePlayResponse(string &buf, int &bytesRead) 00697 { 00698 Globals::sdebug << "\n::handlePlayResponse(): last msg was play"; 00699 00700 //if this is the _first_ PLAY message 00701 if (tracks.getTCHavingState(TRACK_ACTIVE) == 0) { 00702 //and a SETUP was executed before... 00703 if (state == SESSION_INITIALIZED) 00704 { 00705 00706 Globals::sdebug << "\n::handlePlayResponse(): looking for URI"; 00707 if (RtspSessionID != 0 && SessionURI != NULL && params.getMovieId() != NULL) 00708 { 00709 00710 //store elapsed time of origin session 00711 timemsmnt->setPreviousSessionDuration(prevDuration); 00712 00713 assert(sp->getProfileList()->getProfileById(params.getProfileId()) != NULL); 00714 00715 asess = new SSession(RtspSessionID, SessionURI, inet_ntoa(clientsock->sin_addr), params.getMovieId(), params.getSessionId(), prevDuration, sp->getProfileList()->getProfileById(params.getProfileId())->getBufferingDelayms(), timemsmnt); 00716 00717 //set session state to active 00718 asess->setActive(); 00719 00720 //add this session to our global list 00721 sp->addSession(asess); 00722 00723 00724 //if this session was a take-over and user has chosen to migrate, REMOVE OTHER SESSION 00725 if (params.getSessionId() != 0 && params.getAction() == ACTION_MIGRATE) 00726 { 00727 Globals::sdebug << "\n::handlePlayResponse(): closing OTHER SESSION"; 00728 if (sp->getSession(params.getSessionId())->getState() != STATE_CLOSED) 00729 { 00730 assert(SemiProxySession::getSemiProxySession(params.getSessionId()) != NULL); 00731 SemiProxySession::getSemiProxySession(params.getSessionId())->sendTeardownToClient(); 00732 SemiProxySession::getSemiProxySession(params.getSessionId())->closeSession(); 00733 } 00734 else 00735 { 00736 Globals::sdebug << "\n::handlePlayResponse(): session is already closed!"; 00737 } 00738 } 00739 00740 } 00741 else 00742 { 00743 Globals::sdebug << "\n::handlePlayResponse(): No semi-parameters where found. Session was not added to the semi-system!"; 00744 } 00745 00746 } 00747 else if (state == SESSION_PAUSED) 00748 { 00749 Globals::sdebug << "\n::handlePlayResponse(): session resumed"; 00750 asess->setActive(); 00751 } 00752 } 00753 00754 forwardToClient((char*)buf.c_str(), bytesRead); 00755 00756 //set state of track 00757 tracks.getTrackById(lastCRtspSessionID)->setState(TRACK_ACTIVE); 00758 00759 state = SESSION_ACTIVE; 00760 00761 } 00762 00763 /***********************************************************************/ 00764 void SemiProxySession::handlePauseResponse(string &buf, int &bytesRead) 00765 { 00766 Globals::sdebug << "\n::handlePauseResponse(): session paused"; 00767 00768 forwardToClient((char*)buf.c_str(), bytesRead); 00769 00770 //set state of track 00771 tracks.getTrackById(lastCRtspSessionID)->setState(TRACK_PAUSED); 00772 00773 state = SESSION_PAUSED; 00774 sessionHasBeenPaused = true; 00775 } 00776 00777 /***********************************************************************/ 00778 void SemiProxySession::handleTeardownResponse(string &buf, int &bytesRead) 00779 { 00780 Globals::sdebug << "\n::handleTeardownResponse(): session CLOSED"; 00781 00782 forwardToClient((char*)buf.c_str(), bytesRead); 00783 00784 //set state of track 00785 tracks.getTrackById(lastCRtspSessionID)->setState(TRACK_CLOSED); 00786 00787 //close session 00788 closeSession(); 00789 Globals::sdebug << "\n::handleTeardownResponse(): session removed"; 00790 } 00791 00792 00793 /***********************************************************************/ 00794 void SemiProxySession::parseResponseFromServer(char *buffer, int len) 00795 { 00796 assert(buffer); 00797 00798 Globals::sdebug << "\n::parseResponseFromServer(): starts"; 00799 00800 //only parse response, if all necessary params were passed from the client 00801 if (params.getMovieId() != NULL && params.getUserId() != 0 && params.getProfileId() != 0) 00802 { 00803 00804 string buf (buffer); 00805 00806 //look if RTSP 200 OK is contained in the message 00807 if (buf.find("200") != string::npos && buf.find("OK") != string::npos) 00808 { 00809 switch(lastClientRtspMsg) 00810 { 00811 case RTSP_OPTIONS: 00812 handleOptionsResponse(buf, len); 00813 break; 00814 case RTSP_DESCRIBE: 00815 handleDescribeResponse(buf, len); 00816 break; 00817 case RTSP_SETUP: 00818 handleSetupResponse(buf, len); 00819 break; 00820 case RTSP_PLAY: 00821 handlePlayResponse(buf, len); 00822 break; 00823 case RTSP_PAUSE: 00824 handlePauseResponse(buf, len); 00825 break; 00826 case RTSP_TEARDOWN: 00827 handleTeardownResponse(buf, len); 00828 break; 00829 } 00830 } 00831 00832 } 00833 else 00834 { 00835 Globals::sdebug << "\n::parseResponseFromServer(): IGNORED (not all params found!)"; 00836 cerr << "\n::parseResponseFromServer(): IGNORED (not all params found!)"; 00837 } 00838 00839 Globals::sdebug << "\n::parseResponseFromServer(): ends"; 00840 } 00841 00842 00843 00844 /***********************************************************************/ 00845 bool SemiProxySession::options(const Url* fileName, const char* remaining) 00846 { 00847 assert(remaining); assert(fileName); 00848 Globals::sdebug << "\n::options(): starts"; 00849 lastClientRtspMsg = RTSP_OPTIONS; 00850 00851 if (checkServerConnection(fileName)) 00852 { 00853 00854 forwardToServer(buffer, bytesRead); 00855 return true; 00856 00857 } 00858 else 00859 { 00860 return false; 00861 } 00862 } 00863 00864 /***********************************************************************/ 00865 bool SemiProxySession::connect(const Url* fileName, const char* remaining) 00866 { 00867 assert(remaining); assert(fileName); 00868 Globals::sdebug << "\n::connect(): starts"; 00869 00870 resetFlags(); 00871 00872 lastClientRtspMsg = RTSP_DESCRIBE; 00873 00874 if (checkServerConnection(fileName)) 00875 { 00876 //look if known players 00877 if (strstr(remaining, "User-Agent: PVPlayer") != NULL) playerInfo = PI_PVPLAYER; 00878 else if (strstr(remaining, "User-Agent: QTS") != NULL) playerInfo = PI_QUICKTIME; 00879 else if (strstr(remaining, "User-Agent: ItecMP4Player") != NULL) playerInfo = PI_MUVIPLAYER; 00880 00881 00882 forwardToServer(buffer, bytesRead); 00883 return true; 00884 00885 } 00886 else 00887 { 00888 return false; 00889 } 00890 } 00891 00892 /***********************************************************************/ 00893 uint SemiProxySession::getSmallerStringPos(string str, char *needle1, char *needle2, uint from) 00894 { 00895 uint pos1 = str.find(needle1, from); 00896 uint pos2 = str.find(needle2, from); 00897 return (pos1 < pos2) ? pos1 : pos2; 00898 } 00899 00900 /***********************************************************************/ 00901 bool SemiProxySession::setup(const Url* fileName, const char* remaining) 00902 { 00903 assert(remaining); assert(fileName); 00904 Globals::sdebug << "\n::setup(): starts"; 00905 lastClientRtspMsg = RTSP_SETUP; 00906 00907 string strbuf (buffer); 00908 00909 //change destination-parameter? 00910 if (sp->shouldAddDestination()) 00911 { 00912 uint cpstart; 00913 if ((cpstart = strbuf.find("unicast")) != string::npos) 00914 { 00915 ostringstream strdest; 00916 strdest << "destination=" << inet_ntoa(clientsock->sin_addr) << ";"; 00917 00918 //if there is already an existing destination, replace it, 00919 //otherwise insert it into the string 00920 uint deststart, destend; 00921 if ((deststart = strbuf.find("destination=")) != string::npos) 00922 { 00923 destend = getSmallerStringPos(strbuf, "\r\n", ";"); 00924 strbuf.replace(strbuf.begin()+deststart, strbuf.begin()+destend-1, strdest.str()); 00925 } 00926 else 00927 { 00928 strbuf.insert(cpstart, strdest.str()); 00929 } 00930 } 00931 } 00932 00933 //store track in tracklist 00934 uint tstart = strbuf.find("/trackID="); 00935 if (tstart != string::npos) { 00936 uint tend = strbuf.find(" ", tstart); 00937 uint trackNr = atoi(strbuf.substr(tstart+strlen("/trackID="), tend-(tstart+strlen("/trackID="))).c_str()); 00938 tracks.addTrack(trackNr); 00939 } 00940 00941 forwardToServer((char*)strbuf.c_str(), strbuf.length()); 00942 00943 return true; 00944 00945 } 00946 00947 /***********************************************************************/ 00948 bool SemiProxySession::play(const Url* fileName, const char* remaining) 00949 { 00950 assert(remaining); assert(fileName); 00951 Globals::sdebug << "\n::play(): starts"; 00952 lastClientRtspMsg = RTSP_PLAY; 00953 00954 string strbuf (buffer); 00955 00956 //extract prebuffer-time 00957 ulong prebuffer = extractPrebuffer(strbuf); 00958 if (prebuffer == 0 && params.getProfileId() != 0) 00959 { 00960 //if no prebuffer-time was specified by the player, use prebuffer-time from profile 00961 prebuffer = sp->getProfileList()->getProfileById(params.getProfileId())->getBufferingDelayms(); 00962 } 00963 Globals::sdebug << "\n::play(): prebuffer=" << prebuffer; 00964 00965 //on known protocol: extract RTSP-SessionId and URI 00966 if (prot) 00967 { 00968 lastCRtspSessionID = prot->extractSessionKeyFromCMD(remaining); 00969 //extract sessionID and URI only for first track 00970 if (tracks.getTCHavingState(TRACK_ACTIVE) == 0) { 00971 RtspSessionID = lastCRtspSessionID;//prot->extractSessionKeyFromCMD(remaining); 00972 SessionURI = new char[strlen(fileName->toString())+1]; 00973 strcpy(SessionURI, fileName->toString()); 00974 } 00975 } else { 00976 Globals::sdebug << "\n::play(): prot IS NULL !!!"; 00977 } 00978 00979 //for consecutive play (resumes): 00980 //if this session is a migrated session, add elapsed time of 00981 //original session to the range, requested by the client 00982 //(otherwise, some players would start from wrong position) 00983 if (sessionHasBeenPaused) 00984 { 00985 Globals::sdebug << "\n::play(): consecutive play"; 00986 00987 if (asess->getPreviousSessionDuration() != 0) 00988 { 00989 Globals::sdebug << "\n::play(): prevDur= " << asess->getPreviousSessionDuration(); 00990 replaceRange(strbuf, semifunc::convertToString(extractRangeFrom(strbuf)+asess->getPreviousSessionDuration())); 00991 } 00992 00993 } 00994 00995 00996 if (state != SESSION_ACTIVE && tracks.getTCHavingState(TRACK_ACTIVE) == 0) 00997 { 00998 assert(timemsmnt != NULL); 00999 //create new time-section on time-measurement 01000 timemsmnt->addNewSection(prebuffer); 01001 //set requested range-from value 01002 timemsmnt->setTSrequestedFrom(extractRangeFrom(strbuf)); 01003 } 01004 01005 01006 //check if this session should take over another session 01007 //do not migrate a migrated session again (which is only paused and resumed!) 01008 //moreover, do not migrate a session if the PLAY message for the second track was received 01009 if (params.getSessionId() != 0 && tracks.getTCHavingState(TRACK_ACTIVE) == 0) 01010 { 01011 01012 //session, which should be taken over must exist! 01013 SSession *otherSession = sp->getSession(params.getSessionId()); 01014 if (otherSession != NULL) 01015 { 01016 01017 //get time OF OTHER SESSION 01018 01019 ulong splayout = 0, cplayout = 0; 01020 01021 01022 //should request mediatime from client? 01023 Globals::sdebug << "\n::play() requesting position from client?"; 01024 if (sp->shouldRequestClientPosition() && otherSession && otherSession->getState() != STATE_CLOSED 01025 && SemiProxySession::getSemiProxySession(params.getSessionId())->getPlayerInfo() != PI_PVPLAYER) 01026 { 01027 //get client-playout time for other session from server 01028 assert(SemiProxySession::getSemiProxySession(params.getSessionId()) != NULL); 01029 cplayout = SemiProxySession::getSemiProxySession(params.getSessionId())->requestClientTimeMs(); 01030 01031 Globals::sdebug << "\n::play(" << RtspSessionID << "): client playout of other session= " << cplayout; 01032 } 01033 01034 if (cplayout == 0) 01035 { 01036 Globals::sdebug << "\n::play(): NO REQUEST FROM CLIENT, try server"; 01037 01038 //should request mediatime from server? 01039 if (sp->shouldRequestServerPosition() 01040 && otherSession && otherSession->getState() != STATE_CLOSED) 01041 { 01042 //get client-playout time for other session from server 01043 assert(SemiProxySession::getSemiProxySession(params.getSessionId()) != NULL); 01044 splayout = SemiProxySession::getSemiProxySession(params.getSessionId())->requestServerTimeMs(); 01045 01046 Globals::sdebug << "\n::play(" << RtspSessionID << "): server playout of other session= " << splayout; 01047 } 01048 else 01049 { 01050 Globals::sdebug << "\n::play(): NO REQUEST FROM SERVER, use own estimation"; 01051 } 01052 } 01053 01054 01055 //calculate position (correct value from server or user own estimation) 01056 ulong msecsOS = sp->getSession(params.getSessionId())->getElapsedMsecs(params.getSection(), splayout, cplayout); 01057 Globals::sdebug << "\n::play(): msecs of other session= " << msecsOS; 01058 01059 //add/subtract the specified number of seconds (if specified) 01060 if (msecsOS >= (uint)(params.getSubtractSecs() * 1000)) 01061 { 01062 msecsOS -= params.getSubtractSecs() * 1000; 01063 } 01064 01065 Globals::sdebug << "\n::play(): msecs of other session after subtract(" 01066 << params.getSubtractSecs() * 1000 << ") = " << msecsOS; 01067 01068 //store time of old session in variable 'elapsedTimeMsecs' for NEW SESSION 01069 //(will be used on creation of SSession) 01070 //prevDuration = msecsOS + startupOffset; 01071 prevDuration = msecsOS; 01072 01073 //convert time to string and replace range-parameter in the message 01074 replaceRange(strbuf, semifunc::convertToString(msecsOS)); 01075 01076 } 01077 } else if (params.getSessionId() != 0) { 01078 replaceRange(strbuf, semifunc::convertToString(prevDuration)); 01079 } 01080 01081 01082 //check if an explicit range parameter were specified 01083 if (params.getRange() != NULL) 01084 { 01085 //if so, replace the previous range 01086 replaceRange(strbuf, params.getRange(), false); 01087 } 01088 01089 forwardToServer((char*)strbuf.c_str(), strbuf.length()); 01090 01091 return true; 01092 } 01093 01094 /***********************************************************************/ 01095 bool SemiProxySession::pause(const Url* fileName, const char* remaining) 01096 { 01097 assert(remaining); assert(fileName); 01098 Globals::sdebug << "\n::pause(): starts (state=" << state << ")"; 01099 01100 if (tracks.getTCHavingState(TRACK_PAUSED) == 0) 01101 { 01102 if (state == SESSION_ACTIVE && asess != NULL) 01103 { 01104 Globals::sdebug << "\n::pause(): set inactive"; 01105 01106 //set state of active session to PAUSED 01107 asess->setInactive(); 01108 } 01109 01110 //should request mediatime from server? 01111 if (sp->shouldRequestServerPosition()) 01112 { 01113 //get client-playout time from server 01114 ulong playout = SemiProxySession::getSemiProxySession(this->RtspSessionID)->getServerTimeMs(); 01115 //set client playout-time on current time-section 01116 timemsmnt->setTSserverPlayout(playout); 01117 } 01118 } 01119 01120 if (prot) { 01121 lastCRtspSessionID = prot->extractSessionKeyFromCMD(remaining); 01122 } 01123 01124 lastClientRtspMsg = RTSP_PAUSE; 01125 01126 forwardToServer(buffer, bytesRead); 01127 01128 return true; 01129 } 01130 01131 /***********************************************************************/ 01132 bool SemiProxySession::tearDown(int sessionKey, bool immediate, 01133 const Url* fileName, const char* remaining) 01134 { 01135 assert(remaining); assert(fileName); 01136 Globals::sdebug << "\n::teardown(): starts"; 01137 lastClientRtspMsg = RTSP_TEARDOWN; 01138 01139 if (prot) { 01140 lastCRtspSessionID = prot->extractSessionKeyFromCMD(remaining); 01141 } 01142 01143 forwardToServer(buffer, bytesRead); 01144 01145 return true; 01146 } 01147 01148 /***********************************************************************/ 01149 bool SemiProxySession::getOptions(const Url* fileName, const char* remaining) 01150 { 01151 assert(remaining); assert(fileName); 01152 Globals::sdebug << "\n::getOptions(): starts"; 01153 lastClientRtspMsg = RTSP_GETPARAMETER; 01154 01155 forwardToServer(buffer, bytesRead); 01156 01157 return true; 01158 } 01159 01160 /***********************************************************************/ 01161 bool SemiProxySession::setOptions(const Url* fileName, const char* remaining) 01162 { 01163 assert(remaining); assert(fileName); 01164 Globals::sdebug << "\n::setOptions(): starts"; 01165 lastClientRtspMsg = RTSP_SETPARAMETER; 01166 01167 forwardToServer(buffer, bytesRead); 01168 01169 return true; 01170 } 01171 01172 /***********************************************************************/ 01173 void SemiProxySession::setUrl(const Url *uri, bool makeExactMatch) 01174 {} 01175 01176 /***********************************************************************/ 01177 void SemiProxySession::setStateClosed() 01178 { 01179 this->state = SESSION_CLOSED; 01180 } 01181 01182 /***********************************************************************/ 01183 bool SemiProxySession::containsSessionID(uint SessionID) 01184 { 01185 Globals::sdebug << "\n::containsSessionID(): searching for " << SessionID << "\n"; 01186 if (asess != NULL) 01187 { 01188 vector<Track*>::const_iterator iter = tracks.begin(); 01189 for (; iter != tracks.end(); iter++) 01190 { 01191 Track *trk = *iter; 01192 if (trk->getId() == SessionID) return true; 01193 } 01194 } 01195 01196 return false; 01197 } 01198 01199 01200 /***********************************************************************/ 01201 uint SemiProxySession::requestServerTimeMs() 01202 { 01203 Globals::sdebug << "\nSPS::requestServerTimeMs(" << RtspSessionID << "): **starts**"; 01204 receivedServerPosition = 0; 01205 sendServerPositionRequest = true; 01206 01207 ulong sleepms = PROXY_SLEEP_USECS; 01208 int maxSecsToWait = Globals::getPRMaxWaitSecs(); 01209 struct timeval timeStart, timeCurr; 01210 gettimeofday(&timeStart, NULL); 01211 gettimeofday(&timeCurr, NULL); 01212 01213 while (receivedServerPosition == 0) 01214 { 01215 if ((timeCurr.tv_sec - timeStart.tv_sec) >= maxSecsToWait) 01216 { 01217 Globals::sdebug << "\nSPS::requestServerTimeMs(): ABORT waiting for position from Server!"; 01218 Globals::sdebug << "\nSPS::requestServerTimeMs(): s=" << timeStart.tv_sec << " c=" << timeCurr.tv_sec; 01219 Globals::sdebug << "(\nmaxSecsToWait=" << maxSecsToWait << " reached!) "; 01220 break; 01221 } 01222 gettimeofday(&timeCurr, NULL); 01223 #ifndef WIN32 01224 usleep(sleepms); 01225 #else 01226 Sleep(sleepms); 01227 #endif 01228 } 01229 01230 Globals::sdebug << "\nSPS::requestServerTimeMs(" << RtspSessionID << "): **ends**"; 01231 return receivedServerPosition; 01232 } 01233 01234 /***********************************************************************/ 01235 uint SemiProxySession::requestClientTimeMs() 01236 { 01237 Globals::sdebug << "\nSPS::requestClientTimeMs(" << RtspSessionID << "): **starts**"; 01238 receivedClientPosition = 0; 01239 sendClientPositionRequest = true; 01240 01241 ulong sleepms = PROXY_SLEEP_USECS; 01242 int maxSecsToWait = Globals::getPRMaxWaitSecs(); 01243 struct timeval timeStart, timeCurr; 01244 gettimeofday(&timeStart, NULL); 01245 gettimeofday(&timeCurr, NULL); 01246 01247 while (receivedClientPosition == 0) 01248 { 01249 if ((timeCurr.tv_sec - timeStart.tv_sec) >= maxSecsToWait) 01250 { 01251 Globals::sdebug << "\nSPS::requestClientTimeMs(): ABORT waiting for position from Client!"; 01252 Globals::sdebug << "\nSPS::requestClientTimeMs(): s=" << timeStart.tv_sec << " c=" << timeCurr.tv_sec; 01253 Globals::sdebug << "\n(maxSecsToWait=" << maxSecsToWait << " reached!) "; 01254 break; 01255 } 01256 gettimeofday(&timeCurr, NULL); 01257 #ifndef WIN32 01258 usleep(sleepms); 01259 #else 01260 Sleep(sleepms); 01261 #endif 01262 } 01263 01264 Globals::sdebug << "\nSPS::requestClientTimeMs(" << RtspSessionID << "): **ends**"; 01265 return receivedClientPosition; 01266 } 01267 01268 01269 /***********************************************************************/ 01270 uint SemiProxySession::getServerTimeMs() 01271 { 01272 uint msecs = 0; 01273 if (fdserver != 0) 01274 { 01275 char tmpbuffer[MSG_BUFFER_SIZE + 1]; 01276 memset(tmpbuffer, 0, MSG_BUFFER_SIZE + 1); 01277 01278 string str("GET_PARAMETER "); 01279 str.append(SessionURI); 01280 str.append(" RTSP/1.0\r\n"); 01281 str.append("CSeq: " + semifunc::toString(lastClientCSeq+1) + "\r\n"); 01282 str.append("Content-Type: text/parameters\r\n"); 01283 str.append("Session: " + semifunc::toString(RtspSessionID) + "\r\n"); 01284 str.append("Content-Length: 12\r\n\r\n"); 01285 str.append("position\r\n\r\n"); 01286 01287 Globals::sdebug << "\nSPS::getServerTimeMs(): sending buffer: \r\n" << str << "\r\n"; 01288 01289 //send message and wait for response 01290 send(this->fdserver, str.c_str(), str.length(), 0); 01291 //sendMessage(this->fdserver, str.c_str(), str.length()); 01292 uint bytesRead = recv(fdserver, tmpbuffer, MSG_BUFFER_SIZE, 0); 01293 01294 extractCSeq(tmpbuffer, lastServerCSeq); 01295 01296 Globals::sdebug << "\nSPS::getServerTimeMs(): received buffer: \r\n" << tmpbuffer<< "(" << bytesRead << ")\r\n"; 01297 01298 if (bytesRead > 0) 01299 { 01300 char *positionPos; 01301 if ((positionPos = strstr(tmpbuffer, "position: ")) != NULL) 01302 { 01303 msecs = atoi(positionPos + 10); 01304 Globals::sdebug << "\nSPS::getServerTimeMs(): received msecs= " << msecs; 01305 } 01306 } 01307 else 01308 { 01309 cerr << "\nSPS::getServerTimeMs(): Error reading get_parameter response" << endl; cerr.flush(); 01310 //could not retrieve client-time from server: set time to 0 01311 msecs = 0; 01312 } 01313 } 01314 else 01315 { 01316 Globals::sdebug << "\nSemiProxySession::getServerTimeMs(): server socket closed!"; 01317 } 01318 01319 return msecs; 01320 01321 } 01322 01323 /***********************************************************************/ 01324 void SemiProxySession::testClientOptions() 01325 { 01326 char tmpbuffer[MSG_BUFFER_SIZE + 1]; 01327 memset(tmpbuffer, 0, MSG_BUFFER_SIZE+1); 01328 01329 string str("OPTIONS * RTSP/1.0\r\n"); 01330 str.append("CSeq: " + semifunc::toString(lastServerCSeq+1) + "\r\n\r\n"); 01331 01332 Globals::sdebug << "\nSPS::testClientOptions(): sending buffer: \r\n" << str << "\r\n"; 01333 01334 //send message and wait for response 01335 send(fdclient, str.c_str(), str.length(), 0); 01336 //sendMessage(this->fdclient, str.c_str(), str.length()); 01337 uint bytesRead = recv(fdclient, tmpbuffer, MSG_BUFFER_SIZE, 0); 01338 01339 extractCSeq(tmpbuffer, lastClientCSeq); 01340 01341 Globals::sdebug << "\nSPS::testClientOptions(): received buffer: \r\n" << tmpbuffer << "(" << bytesRead << ")\r\n"; 01342 01343 } 01344 01345 /***********************************************************************/ 01346 uint SemiProxySession::getClientTimeMs() 01347 { 01348 uint msecs = 0; 01349 if (fdclient != 0) 01350 { 01351 char tmpbuffer[MSG_BUFFER_SIZE + 1]; 01352 memset(tmpbuffer, 0, MSG_BUFFER_SIZE+1); 01353 01354 string str("GET_PARAMETER "); 01355 str.append(params.getFullURI()); 01356 str.append(" RTSP/1.0\r\n"); 01357 str.append("CSeq: " + semifunc::toString(lastServerCSeq+1) + "\r\n"); 01358 str.append("Content-Type: text/parameters\r\n"); 01359 str.append("Session: " + semifunc::toString(RtspSessionID) + "\r\n"); 01360 str.append("Content-Length: 12\r\n\r\n"); 01361 str.append("position\r\n\r\n"); 01362 01363 Globals::sdebug << "\nSPS::getClientTimeMs(): sending buffer: \r\n" << str << "\r\n"; 01364 01365 //send message and wait for response 01366 send(fdclient, str.c_str(), str.length(), 0); 01367 01368 //wait for a response 01369 uint bytesRead = recv(fdclient, tmpbuffer, MSG_BUFFER_SIZE, 0); 01370 01371 //extractCSeq(tmpbuffer, lastClientCSeq); 01372 01373 Globals::sdebug << "\nSPS::getClientTimeMs(): received buffer: \r\n" << tmpbuffer << "(" << bytesRead << ")\r\n"; 01374 01375 if (bytesRead > 0) 01376 { 01377 char *positionPos; 01378 if ((positionPos = strstr(tmpbuffer, "position: ")) != NULL) 01379 { 01380 msecs = atoi(positionPos + 10); 01381 Globals::sdebug << "\nSPS::getClientTimeMs(): received msecs= " << msecs; 01382 } 01383 } 01384 else 01385 { 01386 cerr << "\nSPS::getClientTimeMs(): Error reading get_parameter response" << endl; cerr.flush(); 01387 //could not retrieve client-time from player: set time to 0 01388 msecs = 0; 01389 } 01390 } 01391 else 01392 { 01393 Globals::sdebug << "\nSemiProxySession::getClientTimeMs(): client socket closed!"; 01394 } 01395 01396 return msecs; 01397 01398 } 01399 01400 /***********************************************************************/ 01401 void SemiProxySession::sendTeardownToClient() 01402 { 01403 char tmpbuffer[MSG_BUFFER_SIZE + 1]; 01404 memset(tmpbuffer, 0, MSG_BUFFER_SIZE+1); 01405 01406 string str("TEARDOWN "); 01407 str.append(params.getFullURI()); 01408 str.append(" RTSP/1.0\r\n"); 01409 str.append("CSeq: " + semifunc::toString(lastServerCSeq+1) + "\r\n"); 01410 str.append("Session: " + semifunc::toString(RtspSessionID) + "\r\n\r\n"); 01411 01412 Globals::sdebug << "\nSPS::sendTeardownToClient(): sending buffer: \r\n" << str << "\r\n"; 01413 01414 //send message and wait for response 01415 send(fdclient, str.c_str(), str.length(), 0); 01416 01417 // uint bytesRead = recv(fdclient, tmpbuffer, MSG_BUFFER_SIZE, 0); 01418 // Globals::sdebug << "\nSPS::sendTeardownToClient(): received buffer: \r\n" << tmpbuffer << "(" << bytesRead << ")\r\n"; 01419 Globals::sdebug << "\nSPS::sendTeardownToClient(): done!\r\n"; 01420 } 01421 01422 01423 /***********************************************************************/ 01424 bool SemiProxySession::checkForValidAddress(char *recbuffer, int &bytesRead) 01425 { 01426 Globals::sdebug << "\n::checkForValidAddress(): starts"; 01427 01428 if (bytesRead <= 0 || recbuffer == NULL) 01429 { 01430 Globals::sdebug << "\n::checkForValidAddress(): returning false (bytesRead=" << bytesRead << ")"; 01431 return false; 01432 } 01433 01434 //NOTE(!): bei SETUP wird der filename nicht mitgeschickt! (hacky) 01435 if (strstr(recbuffer, "SETUP rtsp://") != NULL || strstr(recbuffer, "PLAY rtsp://") != NULL) return true; 01436 01437 01438 //this functions assumes the following URI-Syntax: 01439 //rtsp://<servername>[:<serverport>]/<filename>[/port=<paramport>][/host=<paramhost>] 01440 char *servername; 01441 uint serverport; 01442 char *snstart, *snend, *ptstart, *fnbegin; 01443 if ((snstart = strstr(recbuffer, "rtsp://")) != NULL) 01444 { 01445 01446 01447 Globals::sdebug << "\n::checkForValidAddress(): found rtsp protocol"; 01448 01449 snstart += 7; 01450 if (snstart == NULL) return false; 01451 01452 //find end of hostname (this is either ':' or '/') 01453 ptstart = snend =strstr(snstart, ":"); 01454 fnbegin = strstr(snstart, "/"); 01455 if (fnbegin != NULL && (snend == NULL || snend > fnbegin)) 01456 { 01457 Globals::sdebug << "\n::checkForValidAddress(): / found before :"; 01458 snend = fnbegin; 01459 ptstart = NULL; 01460 } 01461 01462 if (snstart != NULL && snend != NULL) 01463 { 01464 01465 Globals::sdebug << "\n::checkForValidAddress(): now copying name (" << (snend-snstart) << "b)..."; 01466 01467 //copy address 01468 servername = semifunc::newStrCpy(snstart, snend-snstart); 01469 //store reference to requested hostname for checkServerConnection(); 01470 requestedHostname = servername; 01471 Globals::sdebug << "success! (" << servername << ")"; 01472 01473 //copy port 01474 serverport = DEFAULT_RTSPSERVER_PORT; 01475 if (ptstart != NULL) 01476 { 01477 Globals::sdebug << "\n::checkForValidAddress(): copying port..."; 01478 serverport = atoi(ptstart+1); 01479 Globals::sdebug << "success! << (" << serverport << ")"; 01480 } 01481 01482 //find out IP-Address of this host 01483 char name[255]; 01484 char *address, *localaddress, *serveraddress; 01485 if (gethostname(name, sizeof(name)) == 0) 01486 { 01487 //find out localhost address 01488 hostent *host = gethostbyname(name); 01489 if (host != NULL) { 01490 address = inet_ntoa(*(reinterpret_cast<in_addr*>(host->h_addr))); 01491 localaddress = semifunc::newStrCpy(address); 01492 01493 Globals::sdebug << "\n::checkForValidAddress(): localaddress copied (" << localaddress << ")"; 01494 } else { 01495 Globals::sdebug << "\n::checkForValidAddress(): FATAL cannot resolve hostname " << name; 01496 } 01497 01498 //find out address of specified server 01499 hostent *host2 = gethostbyname(servername); 01500 address = inet_ntoa(*(reinterpret_cast<in_addr*>(host2->h_addr))); 01501 serveraddress = semifunc::newStrCpy(address); 01502 01503 Globals::sdebug << "\n::checkForValidAddress(): serveraddress copied (" << serveraddress <<")"; 01504 } 01505 else 01506 { 01507 throw GetHostNameProblem(); 01508 } 01509 01510 Globals::sdebug << "\nservername=" << servername 01511 << "\nserverport=" << serverport << "\nName=" << name << "\nAddress=" << localaddress << "\nServer=" << serveraddress; 01512 01513 if ((strcmp(serveraddress, localaddress) == 0 || strcmp(serveraddress, "127.0.0.1") == 0) 01514 && serverport == sp->getProxyPort()) 01515 { 01516 01517 //CLIENT HAS SPECIFIED THE ADDRESS+PORT OF SEMIPROXY AS SERVER -> NOT ALLOWED !!! 01518 01519 //check if this is only an OPTIONS message 01520 //(which have no file specified) 01521 if (strncmp(recbuffer, "OPTIONS ", 8) == 0) 01522 { 01523 Globals::sdebug << "\n::checkForValidAddress(): OPTIONS MESSAGE"; 01524 //to OPTIONS MESSAGE only reply like a server do 01525 //WARNING: this may be not the real supported options by the server 01526 //TODO: 01527 ostringstream reply; 01528 reply << "RTSP/1.0 200 OK\r\nCSeq: " << (lastClientCSeq+1) << "\r\nPublic: " 01529 << "OPTIONS, DESCRIBE, SETUP, PLAY, PAUSE, TEARDOWN, GET_PARAMETER, SET_PARAMETER, REDIRECT" << "\r\n\r\n"; 01530 send(fdclient, reply.str().c_str(), reply.str().length(), 0); 01531 //sendMessage(this->fdclient, reply.str().c_str(), reply.str().length()); 01532 return false; 01533 } 01534 01535 01536 //if there is no other address specified in the params, close session! 01537 if (params.getHost() == NULL && params.getPort() == 0) 01538 { 01539 cerr << "\nERROR: Proxy cannot be used as server!"; 01540 removeSession(); 01541 } 01542 else 01543 { 01544 //***************create new buffer starts*************** 01545 //create new buffer with correct rtsp-uri 01546 //(if port or host specified, the original values of the URI 01547 //will be replaced by the values from the params; the params 01548 //will be erased!) 01549 char *strbuf = new char[MSG_BUFFER_SIZE]; 01550 01551 uint count = snstart-recbuffer; 01552 //copy start of message 01553 strncpy(strbuf, recbuffer, count); 01554 //if a param 'host=' was specified, copy this host 01555 if (params.getHost() != NULL) 01556 { 01557 //store reference to requested hostname for checkServerConnection() 01558 requestedHostname = params.getHost(); 01559 01560 //copy new hostname 01561 strncpy(&strbuf[count], params.getHost(), strlen(params.getHost())); 01562 count += strlen(params.getHost()); 01563 Globals::sdebug << "\n::checkForValidAddress: hostname copied: " << requestedHostname; 01564 } 01565 else 01566 { 01567 Globals::sdebug << "\n::checkForValidAddress: no explicit specified host!"; 01568 //copy hostname 01569 strncpy(&strbuf[count], snstart, snend-snstart); 01570 count += snend-snstart; 01571 } 01572 if (ptstart != NULL || params.getPort() != 0) strbuf[count++] = ':'; 01573 if (params.getPort() != 0) 01574 { 01575 //copy new port 01576 string port = semifunc::toString(params.getPort()); 01577 strncpy(&strbuf[count], port.c_str(), port.length()); 01578 count += port.length(); 01579 Globals::sdebug << "\n::checkForValidAddress: port copied: " << port; 01580 } 01581 else if (ptstart != NULL) 01582 { 01583 Globals::sdebug << "\n::checkForValidAddress: no explicit specified port!"; 01584 //copy port 01585 strncpy(&strbuf[count], ptstart+1, fnbegin-(ptstart+1)); 01586 count += fnbegin - (ptstart+1); 01587 } 01588 01589 //copy the remaining of the message 01590 uint i=0; 01591 //while (ctstart[i] != 0) strbuf[count++] = ctstart[i++]; 01592 while (fnbegin[i] != 0) strbuf[count++] = fnbegin[i++]; 01593 strbuf[++count] = 0; 01594 //***************create new buffer ends*************** 01595 01596 //Globals::sdebug << "\n::checkHostnameAndPort(): newBUFFER=" << strbuf; 01597 memcpy(recbuffer, strbuf, count); 01598 bytesRead = count; 01599 01600 delete strbuf; 01601 strbuf = NULL; 01602 01603 return true; 01604 } 01605 } 01606 01607 } 01608 } 01609 01610 return true; 01611 } 01612 01613 /***********************************************************************/ 01614 void SemiProxySession::readFromSocket(int fdsock, struct timeval &tv, fd_set &rfds) 01615 { 01616 bytesRead = 0; 01617 uint n = 0; 01618 bool abortReading = false; 01619 while (abortReading == false) 01620 { 01621 n = recv(fdsock, buffer + bytesRead, MSG_BUFFER_SIZE, 0); 01622 bytesRead += n; 01623 FD_ZERO(&rfds); 01624 select(fdsock+1, &rfds, NULL, NULL, &tv); 01625 if (!FD_ISSET(fdsock, &rfds)) abortReading = true; 01626 } 01627 } 01628 01629 /***********************************************************************/ 01630 void SemiProxySession::run() 01631 { 01632 01633 fd_set rfds; 01634 struct timeval tv; 01635 01636 //number of microseconds to sleep after a loop run 01637 ulong sleep_msecs = PROXY_SLEEP_USECS; 01638 01639 int bytesReadClient = -1; 01640 int bytesReadServer = -1; 01641 01642 //pointer to begin of buffer 01643 char *bufferstart = buffer; 01644 01645 01646 while (state!=SESSION_ERR && state!=SESSION_CLOSED) 01647 { 01648 //first check if socket has closed 01649 if (fdclient == 0) 01650 { 01651 Globals::sdebug << "\n::run(): Client has closed socket -> CLOSE SESSION"; 01652 closeSession(); 01653 break; 01654 } 01655 01656 //(maximum) waiting time for select 01657 tv.tv_sec = 0; 01658 tv.tv_usec = 10000; 01659 01660 FD_ZERO(&rfds); 01661 //only add client or server to FD_LIST, when socket has not been closed! 01662 if (bytesReadClient != 0) FD_SET(fdclient, &rfds); 01663 if (bytesReadServer != 0) FD_SET(fdserver, &rfds); 01664 //use bigger socket-number for select 01665 int maxsocknr = (fdclient < fdserver) ? fdserver : fdclient; 01666 int retval = select(maxsocknr+1, &rfds, NULL, NULL, &tv); 01667 if (retval == -1) 01668 { 01669 cerr << "\nSPS::run(): ERROR select()" << endl; 01670 } 01671 else if (retval) 01672 { 01673 01674 //************************READ FROM CLIENT start************************* 01675 if (FD_ISSET(fdclient, &rfds)) 01676 { 01677 sessionControlChannel = fdclient; 01678 01679 buffer = bufferstart; 01680 memset(buffer, 0, MSG_BUFFER_SIZE); 01681 01682 01683 //read until no more bytes (because of MTU) 01684 readFromSocket(fdclient, tv, rfds); 01685 bytesReadClient = bytesRead; 01686 01687 01688 //**readRequest does not correctly work with other players !!!** 01689 //bytesReadClient = bytesRead = readRequest(buffer, MSG_BUFFER_SIZE); 01690 01691 //on PAUSE stop time-measurement 01692 if (strstr(buffer, "PAUSE rtsp://") != NULL) 01693 { 01694 //stop time-measurement for current time-section 01695 timemsmnt->stopTS(); 01696 } 01697 01698 01699 //special handling for quicktime embedded 01700 //(TODO:CAN BE REMOVED, IF applyReqToSession can handle several callbacks) 01701 char *teardownstart; 01702 if (strncmp(buffer, "PAUSE", 5) == 0 01703 && (teardownstart = strstr(buffer, "TEARDOWN")) != NULL) 01704 { 01705 //remove pause 01706 bytesReadClient = bytesRead = bytesRead - (teardownstart - buffer); 01707 buffer = teardownstart; 01708 } 01709 char *playstart; 01710 if (strncmp(buffer, "SET_PARAMETER", 13) == 0 01711 && (playstart = strstr(buffer, "PLAY")) != NULL) 01712 { 01713 bytesReadClient = bytesRead = bytesRead - (playstart - buffer); 01714 //remove set_parameter 01715 buffer = playstart; 01716 } 01717 01718 01719 //on DESCRIBE extract full-URI 01720 if (strstr(buffer, "DESCRIBE rtsp://") != NULL) 01721 { 01722 char *fnstart, *fnend; 01723 if ((fnstart = strstr(buffer, "rtsp://")) != NULL) 01724 { 01725 if ((fnend = strstr(fnstart, " ")) != NULL) 01726 { 01727 params.setFullURI(semifunc::newStrCpy(fnstart, fnend-fnstart)); 01728 } 01729 } 01730 } 01731 01732 01733 //new extract (and remove!) ALL PARAMETERS 01734 string bufferstr (buffer); 01735 01736 01737 //look for parameters in the URI and remove them 01738 char *pmovieid, *prange, *phost; 01739 int puid, pproid, psid, pport, action, section, subtractMsecs; 01740 if ((puid = extractParameter(bufferstr, "/uid=", true)) >= 0) 01741 { 01742 params.setUserId(puid); 01743 } 01744 if ((pproid = extractParameter(bufferstr, "/proid=", true)) >= 0) 01745 { 01746 params.setProfileId(pproid); 01747 } 01748 if ((pmovieid = extractStrParameter(bufferstr, "/id=", true)) != NULL) 01749 { 01750 params.setMovieId(pmovieid); 01751 } 01752 if ((psid = extractParameter(bufferstr, "/sid=", true)) >= 0) 01753 { 01754 params.setSessionId(psid); 01755 } 01756 if ((action = extractParameter(bufferstr, "/action=", true)) >= 0) 01757 { 01758 params.setAction(action); 01759 } 01760 if ((section = extractParameter(bufferstr, "/section=", true)) >= 0) 01761 { 01762 params.setSection(section); 01763 } 01764 if ((subtractMsecs = extractParameter(bufferstr, "/sub=", true)) >= 0) 01765 { 01766 params.setSubtractSecs(subtractMsecs); 01767 } 01768 //special parameters (if no RTSP proxy can be used!) 01769 if ((prange =extractStrParameter(bufferstr, "/range=",true)) != NULL) 01770 { 01771 params.setRange(prange); 01772 } 01773 if ((phost = extractStrParameter(bufferstr, "/host=", true)) != NULL) 01774 { 01775 params.setHost(phost); 01776 } 01777 if ((pport = extractParameter(bufferstr, "/port=", true)) >= 0) 01778 { 01779 params.setPort(pport); 01780 } 01781 01782 //store back buffer (with params removed) 01783 buffer = bufferstart; 01784 memset(buffer, 0, MSG_BUFFER_SIZE); 01785 bufferstr.copy(buffer, bufferstr.length()); 01786 //buffer = (char*)bufferstr.c_str(); 01787 01788 01789 //check hostname and port (cannot be same as host+port of proxy) 01790 //NOTE: this function may change the address and port of the request!!! 01791 if (checkForValidAddress(buffer, bytesRead)) 01792 { 01793 01794 Globals::sdebug << "\nSP::run(): received from client:\n\t" << buffer; 01795 01796 if (bytesRead == 0) 01797 { 01798 //no data read; the other side has closed the connection, teardown all 01799 tearDown(TEARDOWN_ALL,true, NULL, NULL); 01800 } 01801 else if (bytesRead < 0) 01802 { 01803 cerr << "\nSPS::run(): Error reading from controlchannel" << endl; 01804 tearDown(TEARDOWN_ALL,true,NULL, NULL); 01805 state = SESSION_ERR; 01806 } 01807 else 01808 { 01809 // we have got a command from the client; parse it and forward it to the server 01810 01811 //determine protocol of message 01812 if (!prot) 01813 { 01814 if ( determineProtocol(buffer) == PROTO_UNKNOWN) 01815 { 01816 cerr << "\nSPS::run(): Unknown protocol: " << buffer << endl; 01817 } 01818 } 01819 01820 //if known protocol was recognized, perform further parsing... 01821 if (prot) 01822 { 01823 switch (prot->getID()) 01824 { 01825 case PROTO_RTSP: 01826 if (bytesRead >= MSG_BUFFER_SIZE) 01827 { 01828 cerr << "\nSPS::run(): received command exceeded buffer size" << endl; 01829 break; 01830 } 01831 break; 01832 default: 01833 cerr << "\nSPS::run(): Unknown protocol: " << buffer << endl; 01834 break; 01835 } 01836 01837 //call control-function in RTSP-object; this function will callback other functions 01838 //in this object like connect(), setup(), play(), pause(), teardown()... 01839 if (!prot->applyReqToSession(buffer, bytesRead, this, true)) 01840 { 01841 cerr << "\nSPS::run(): Failed to parse request '" << buffer << "'" << endl; 01842 } 01843 01844 } 01845 } 01846 } 01847 } 01848 //************************read from client end************************* 01849 01850 01851 01852 //************************READ FROM SERVER start************************* 01853 if (FD_ISSET(fdserver, &rfds)) 01854 { 01855 sessionControlChannel = fdserver; 01856 01857 buffer = bufferstart; 01858 memset(buffer, 0, MSG_BUFFER_SIZE); 01859 //use common receive-function, because readRequest do not 01860 //work sometimes (doesn't call options/setup/... functions) 01861 01862 //read until no more bytes (because of MTU) 01863 readFromSocket(fdserver, tv, rfds); 01864 bytesReadServer = bytesRead; 01865 //bytesReadServer = bytesRead = recv(fdserver, buffer, MSG_BUFFER_SIZE, 0); 01866 //bytesReadServer = bytesRead = readRequest(buffer, MSG_BUFFER_SIZE); 01867 01868 Globals::sdebug << "\nSP::run(): received from server (" << bytesReadServer << " bytes):\n\t" << buffer; 01869 01870 if (bytesRead == 0) 01871 { 01872 //no data read; the server has closed the connection, teardown all 01873 cerr << "\nSPS::run(): No data read from ControlChannel (SERVER has closed connection)" << endl; 01874 removeSession(); 01875 } 01876 else if (bytesRead < 0) 01877 { 01878 cerr << "\nSPS::run(): Error reading from controlchannel" << endl; 01879 removeSession(); 01880 } 01881 else 01882 { 01883 // we got a command from the server; parse it and forward to client 01884 parseResponseFromServer(buffer, bytesRead); 01885 } 01886 } 01887 //************************read from server end************************* 01888 } 01889 01890 //look if there is an outstanding position-request 01891 //this special handling is necessary, because otherwise (if another session directly 01892 //calls 'getServerTimeMs()') the response could be received in this main-loop as 01893 //instead of the recv() of 'getServerTimeMs()'. This is because concurrent execution 01894 if (state != SESSION_ERR && state != SESSION_CLOSED) { 01895 if (sendServerPositionRequest == true) 01896 { 01897 Globals::sdebug << "\n::run(" << RtspSessionID << "): outstanding server position request - STARTS"; 01898 receivedServerPosition = this->getServerTimeMs(); 01899 sendServerPositionRequest = false; 01900 Globals::sdebug << "\n::run(" << RtspSessionID << "): outstanding server position request - DONE"; 01901 } 01902 01903 else if (sendClientPositionRequest == true) 01904 { 01905 Globals::sdebug << "\n::run(" << RtspSessionID << "): outstanding client position request - STARTS"; 01906 receivedClientPosition = this->getClientTimeMs(); 01907 sendClientPositionRequest = false; 01908 Globals::sdebug << "\n::run(" << RtspSessionID << "): outstanding client position request - DONE"; 01909 } 01910 01911 #ifndef WIN32 01912 usleep(sleep_msecs); 01913 #else 01914 Sleep(sleep_msecs); 01915 #endif 01916 } 01917 01918 } 01919 01920 Globals::sdebug << "::run() closing SemiProxySession " << RtspSessionID << "\n"; 01921 } 01922 01923