DataChannel.cpp

00001 /*********************************************************************** 00002 * * 00003 * ViTooKi * 00004 * * 00005 * title: DataChannel.cpp * 00006 * * 00007 * * 00008 * * 00009 * ITEC institute of the University of Klagenfurt (Austria) * 00010 * http://www.itec.uni-klu.ac.at * 00011 * * 00012 * * 00013 * For more information visit the ViTooKi homepage: * 00014 * http://ViTooKi.sourceforge.net * 00015 * vitooki-user@lists.sourceforge.net * 00016 * vitooki-devel@lists.sourceforge.net * 00017 * * 00018 * This file is part of ViTooKi, a free video toolkit. * 00019 * ViTooKi is free software; you can redistribute it and/or * 00020 * modify it under the terms of the GNU General Public License * 00021 * as published by the Free Software Foundation; either version 2 * 00022 * of the License, or (at your option) any later version. * 00023 * * 00024 * This program is distributed in the hope that it will be useful, * 00025 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00026 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * 00027 * GNU General Public License for more details. * 00028 * * 00029 * You should have received a copy of the GNU General Public License * 00030 * along with this program; if not, write to the Free Software * 00031 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, * 00032 * MA 02111-1307, USA. * 00033 * * 00034 ***********************************************************************/ 00035 00036 /*********************************************************************** 00037 * * 00038 * REVISION HISTORY: * 00039 * * 00040 * * 00041 * * 00042 ***********************************************************************/ 00043 00044 #include "DataChannel.hpp" 00045 00046 #include "DataSink.hpp" 00047 #include "Frame.hpp" 00048 #include "ContainerInfo.hpp" 00049 #include "ReferenceCounter.hpp" 00050 00051 #include "Adaptor.hpp" 00052 #include "AdaptorChain.hpp" 00053 #include "adaptors/Forwarder.hpp" 00054 #include "adaptors/Visualizer.hpp" 00055 #include "adaptors/MP4Decoder.hpp" 00056 #include "VideoAdaptorChain.hpp" 00057 #include "adaptors/FrameRotate.hpp" 00058 00059 #include "IO.hpp" 00060 #include "io/MPGStreamIO.hpp" 00061 #include "io/DevNull.hpp" 00062 00063 00064 00066 DataChannel::DataChannel(IO * in, ESInfo * es) 00067 { 00068 Adaptor *a = NULL; 00069 init(in, es, a); 00070 }; 00071 00076 DataChannel::DataChannel(IO * in, ESInfo * es, Adaptor * adapt) 00077 { 00078 init(in, es, adapt); 00079 }; 00080 00081 void 00082 DataChannel::init(IO * &in, ESInfo * &e, Adaptor * &adapt) 00083 { 00084 assert(e->getMediaTimeScale()!=0); 00085 es = e; 00086 sessionId = 0; 00087 input = in; 00088 channelOpen = true; 00089 immediateClose=false; 00090 tearDown=false; 00091 lockInput.initialize("lockInput"); 00092 lockDataSink.initialize("lockDataSink"); 00093 lockAdaptor.initialize("lockAdaptor"); 00094 localViewer=NULL; 00095 adaptor = adapt; 00096 if (!adaptor) { 00097 adaptor = new Forwarder(); 00098 } 00099 if(es->getUsageCounter()) 00100 es->getUsageCounter()->increase(); 00101 outstats = NULL; 00102 00103 currentFrameNumber = 0; 00104 finalFrameNumber = -1; //so there is no limit 00105 }; 00106 00107 00108 /* is always called unter &lockInput */ 00109 void DataChannel::deleteInput() { 00110 if(input) { 00111 input->close(); 00112 if(input->running()) 00113 input->wait(); 00114 } 00115 // empty send queue 00116 list <Frame*>::iterator si; 00117 while(!sendQueue.empty()) { 00118 si=sendQueue.begin(); 00119 if(*si) { 00120 delete (*si); 00121 (*si)=NULL; 00122 } 00123 sendQueue.pop_front(); 00124 } 00125 } 00126 00127 DataChannel::~DataChannel() 00128 { 00129 closeAllClients(true); 00130 00131 lockDataSink.lock(); 00132 this->tearDown=true; 00133 00134 lockInput.lock(); 00135 deleteInput(); 00136 lockInput.release(); 00137 00138 // delete clients 00139 dprintf_full("DataChannel Destruct: delete active clients\n"); 00140 list < DataSink * >::iterator li; 00141 while (!active.empty()) { 00142 li = active.begin(); 00143 if (*li) { 00144 (*li)->close(true); 00145 delete (*li); 00146 (*li) = NULL; 00147 } 00148 active.pop_front(); 00149 } 00150 dprintf_full("DataChannel Destruct: delete paused clients\n"); 00151 while (!paused.empty()) { 00152 li = paused.begin(); 00153 if (*li) { 00154 delete (*li); 00155 (*li) = NULL; 00156 } 00157 paused.pop_front(); 00158 } 00159 lockDataSink.release(); 00160 00161 lockInput.destroy(); 00162 lockDataSink.destroy(); 00163 lockAdaptor.destroy(); 00164 00165 00166 00167 dprintf_full("DataChannel Destruct: delete viewer\n"); 00168 if(localViewer) 00169 delete localViewer; 00170 dprintf_full("DataChannel Destruct: delete input\n"); 00171 if(input) 00172 delete input; 00173 00174 if(es && es->getUsageCounter()) 00175 es->getUsageCounter()->decrease(); 00176 00177 if(adaptor) { 00178 delete adaptor; 00179 dprintf_full("DataChannel Destruct: delete adaptor\n"); 00180 } 00181 dprintf_full("DataChannel Destruct: done\n"); 00182 }; 00183 00184 // what happens if the last client is paused? 00185 // we will wait till play or teardown arrives 00186 00187 void DataChannel::run() 00188 { 00189 bool resetSendHeader; 00190 00191 dprintf_full("DataChannel::run: DC.Thread started: %s\n", input->getURL()); 00192 resetSendHeader=false; 00193 00194 if (!active.empty() || localViewer) { 00195 if (input->getState()!=IO::OPEN && !input->open()) { 00196 dprintf_err("DataChannel::run: failed to open input\r\n"); 00197 return; 00198 } 00199 // read data from input 00200 dprintf_full("DataChannel::run: Input is open\n"); 00201 00202 Frame *frm = NULL; 00203 //only continue if OPEN, PAUSED, MUTED, PREBUFFERING or STREAMEOF 00204 //STREAMEOF is not a problem, as long as eg. the Rtp Class still has buffered data! 00205 IO::State mystate = input->getState(); 00206 00207 while (! ( (mystate == IO::CLOSED) || (mystate == IO::CLOSING) || 00208 (mystate == IO::STREAMERR) ) //|| (input->getState() == IO::STREAMEOF) ) 00209 && !tearDown && ((finalFrameNumber == -1) || (currentFrameNumber < finalFrameNumber)) ) { 00210 lockInput.lock(); 00211 lockDataSink.lock(); 00212 if(mystate == IO::OPENING) 00213 00214 #ifndef WINCE 00215 msleep(10); 00216 #else 00217 Sleep(5); 00218 #endif 00219 if(active.empty() && !localViewer) { 00220 lockInput.release(); 00221 lockDataSink.release(); 00222 return; 00223 } 00224 lockDataSink.release(); 00225 00226 //read frame 00227 frm = input->getFrame(); 00228 00229 if( frm == NULL ) { 00230 dprintf_full("DataChannel::run no frm got via getFrame\n"); 00231 if(mystate != IO::OPEN) { 00232 if(frm) { 00233 delete frm; 00234 frm=NULL; 00235 } 00236 } 00237 lockInput.release(); 00238 } else { 00239 dprintf_full("DataChannel::run passing frame to adaptor\n"); 00240 // we got data 00241 // adapt it with the global adaptor (chain) 00242 lockAdaptor.lock(); 00243 list < Frame * >result; 00244 00245 //adapt (i.e. decode) frame 00246 result = adaptor->adapt(frm); 00247 00248 lockAdaptor.release(); 00249 00250 if (!result.empty() ) { 00251 list < Frame * >::iterator rLi; 00252 // now append the received frame(s) to the sendqueue 00253 if(result.size()==1) { 00254 rLi=result.begin(); 00255 if ( frm && (*rLi) != frm) { 00256 // caching adaptors have to deep-copy the frame 00257 dprintf_full("DataChannel::run Frame was adapted and orig frame will be deleted now...\n"); 00258 delete frm; frm=NULL; 00259 } 00260 else { 00261 // frame is in result 00262 dprintf_full("DataChannel::run frame is in result\n"); 00263 frm=NULL; 00264 } 00265 } 00266 //append results to sendQueue 00267 while (!result.empty()) { 00268 rLi = result.begin(); 00269 00270 if (!(*rLi)->isMarkedForDelete()) //may be dropped in ESSynchronizer 00271 sendQueue.push_back((*rLi)); 00272 else { 00273 dprintf_full("DataChannel::run adaptor result was marked for delete... dropping!\n"); 00274 delete (*rLi); (*rLi)=NULL; 00275 } 00276 result.pop_front(); 00277 } 00278 } else { 00279 dprintf_full("DataChannel::run adaptor result was empty!!! dropped, so delete frm CTS %i...\n",frm->getAU()->cts); 00280 //ATTENTION: DC only deletes frames coming from inputIO, not eg. adaptor-decoded and then dropped ones! 00281 //so make sure, to markForDelete() 00282 delete frm; frm=NULL; 00283 } 00284 lockInput.release(); 00285 00286 } // end frame != NULL 00287 00288 // result now empty 00289 // check if we have data in the sendqueue 00290 list < Frame * >::iterator rLi; 00291 00292 if (sendQueue.empty() && (input->getState() == IO::STREAMEOF) ) 00293 break; //give up, no more data coming, nothing left to forward 00294 00295 // must be an if not while -> avoids bursts 00296 if (!sendQueue.empty() && !tearDown) { 00297 // distribute it to all active clients 00298 lockDataSink.lock(); 00299 rLi = sendQueue.begin(); 00300 00301 //set this frame to be deleted after all output-DataSinks, this may be changed by eg. buffering output-IOs 00302 (*rLi)->markForDelete(); 00303 00304 if(!active.empty()) { 00305 00306 dprintf_full("DataChannel::run: READ FRAME %p (currentFrameNumber %li)\n",(*rLi),currentFrameNumber); 00307 currentFrameNumber++; 00308 00309 list < DataSink * >::iterator li = active.begin(); 00310 00311 while (li != active.end()) { 00312 if(*rLi) { 00313 dprintf_full("Sending one frame %p (TS %i size %i) to one client\n",(*rLi), 00314 (*rLi)->getAU()->cts,(*rLi)->getAU()->size); 00315 00316 if (!(*li)->send((*rLi))) { //finally, writeFrame of output class 00317 // sth went wrong 00318 dprintf_err("DataChannel::run: Sending of frame failed\n"); 00319 tearDown = true; 00320 } 00321 } 00322 ++li; 00323 } 00324 dprintf_full("DataChannel::run: SENT one FRAME %p to all clients\n",(*rLi)); 00325 } 00326 // when we have no client and not even a localViewer -> exit 00327 else if(!localViewer) { 00328 lockDataSink.release(); 00329 if((*rLi)) { 00330 delete (*rLi); 00331 (*rLi)=NULL; 00332 } 00333 return; 00334 } 00335 if(localViewer) 00336 localViewer->send( (*rLi) ); 00337 lockDataSink.release(); 00338 00339 if ((*rLi)->isMarkedForDelete()) { 00340 dprintf_full("DataChannel::run cleanup sendQueue with frame %p\n",(*rLi)); 00341 if((*rLi)) 00342 delete (*rLi); 00343 } //else: ignore and let the buffering output-IO do the delete on it's own! 00344 00345 *rLi=NULL; 00346 sendQueue.pop_front(); 00347 00348 } else { 00349 dprintf_full("DataChannel::run sendQueue was empty!!! teardown %i\n",tearDown); 00350 } 00351 00352 if(frm) { 00353 dprintf_full("DataChannel::run cleanup incoming frame %p\n",frm); 00354 delete frm; 00355 frm=NULL; 00356 } 00357 00358 mystate = input->getState(); 00359 } // while() end reading 00360 00361 //lockDataSink.lock(); 00362 channelOpen = false; 00363 //lockDataSink.release(); 00364 dprintf_full("DataChannel::run: Finished Reading (state %i) currentFrameNumber %li\n", 00365 input->getState(),currentFrameNumber); 00366 if(input->getState()==IO::STREAMEOF) 00367 resetSendHeader=true; 00368 00369 input->close(true); 00370 00371 if (adaptor && (strcmp(adaptor->getName(),"Forwarder") != 0)) { //maybe there is some cached data? 00372 list < Frame * >result; 00373 dprintf_full("DataChannel::run: Closing Adaptor %s\n",adaptor->getName()); 00374 result = adaptor->close(); 00375 00376 list < Frame * >::iterator rLi; 00377 if (result.size() > 0) { 00378 dprintf_full("DataChannel::run: adaptor->close() returned %i frames\n",result.size()); 00379 // now append the received frame(s) to the sendqueue 00380 while (!result.empty()) { 00381 rLi = result.begin(); 00382 sendQueue.push_back((*rLi)); 00383 result.pop_front(); 00384 } 00385 // now we have read all data, we have to ensure that everything is sent 00386 dprintf_full("DataChannel::run: Sending %i cached frames\n",sendQueue.size()); 00387 while (!sendQueue.empty() && !tearDown) { 00388 // distribute it to all active clients 00389 rLi = sendQueue.begin(); 00390 //set this frame to be deleted after all output-DataSinks, this may be changed by eg. buffering output-IOs 00391 (*rLi)->markForDelete(); 00392 00393 dprintf_full("DataChannel::run: READ cached FRAME\n"); 00394 lockDataSink.lock(); 00395 if(!active.empty()) { 00396 list < DataSink * >::iterator li = active.begin(); 00397 while (li != active.end() && !tearDown) { 00398 dprintf_full("DataChannel::run: Sending one frame to one client\n"); 00399 if (!(*li)->send((*rLi))) { 00400 // sth went wrong 00401 dprintf_err("DataChannel::run: Sending of frame failed\n"); 00402 } 00403 ++li; 00404 } 00405 } 00406 if(localViewer) 00407 localViewer->send( (*rLi) ); 00408 00409 dprintf_full("DataChannel::run: SEND cached FRAME\n"); 00410 // finished with sending one frame 00411 lockDataSink.release(); 00412 00413 if ((*rLi)->isMarkedForDelete()) { 00414 dprintf_full("DataChannel::run cleanup sendQueue with frame %p\n",(*rLi)); 00415 if((*rLi)) 00416 delete (*rLi); 00417 } //else: ignore and let the buffering output-IO do the delete on it's own! 00418 00419 (*rLi)=NULL; 00420 sendQueue.pop_front(); 00421 } // end sending all frames 00422 } 00423 // if a teardown was received while sending, empty sendqueue 00424 while (!sendQueue.empty()) { 00425 rLi = sendQueue.begin(); 00426 dprintf_full("DataChannel::run, emptying sendqueue because of teardown: frame %p\n",(*rLi)); 00427 delete(*rLi); 00428 00429 (*rLi)=NULL; 00430 sendQueue.pop_front(); 00431 } 00432 }// if(adaptor) 00433 00434 // not necessary with BitField, but we better do it 00435 es->setCompleteState(true); 00436 } 00437 else { 00438 dprintf_small("DataChannel::run: Thread Started with no clients\n"); 00439 input->close(true); 00440 } 00441 channelOpen=true; 00442 00443 closeAllClients(false); 00444 00445 00446 dprintf_small("DataChannel::run: EXIT!\n"); 00447 this->exit(); 00448 } //DataChannel::run 00449 00450 00455 void DataChannel::setAdaptor(Adaptor * adapt) 00456 { 00457 lockAdaptor.lock(); 00458 delete adaptor; 00459 adaptor = adapt; 00460 if (!adaptor) { 00461 adaptor = new Forwarder(); 00462 } 00463 lockAdaptor.release(); 00464 }; 00465 00466 void DataChannel::setESInfo(ESInfo *new_es) { 00467 if(es && es->getUsageCounter()) 00468 es->getUsageCounter()->decrease(); 00469 00470 es = new_es; 00471 } 00472 00473 void DataChannel::setFinalFrameNumber(long frameNumber) { 00474 finalFrameNumber = frameNumber; 00475 } 00476 00477 void DataChannel::initiateThreadStop(bool forceTearDown) { 00478 if (forceTearDown) 00479 tearDown=true; 00480 //if not forceTearDown, thread will stop when finalFrameNumber is reached 00481 while ( this->running() || !this->finished()) { 00482 msleep(40); 00483 dprintf_full("DataChannel::initiateThreadStop waiting on DataChannel::run finished... sendQueue (%i elems)\n",sendQueue.size()); 00484 } 00485 if (forceTearDown) 00486 tearDown=false; 00487 } 00488 00489 void DataChannel::setInput(IO *new_io) { 00490 dprintf_full("DataChannel::setInput\n"); 00491 00492 /* 00493 list <int> ids = getListOfActiveClientIds(); 00494 for(list <int>::iterator id = ids.begin(); id != ids.end(); ++id) { 00495 pause((*id)); 00496 getDataSink((*id))->getOutput()->setESInfo(new_es); 00497 } 00498 setESInfo(new_es); 00499 */ 00500 00501 00502 lockInput.lock(); 00503 00504 assert(active.empty()); //no-one is playing 00505 deleteInput(); 00506 input = new_io; 00507 currentFrameNumber=input->getCurrentFrameNumber(); 00508 00509 list < DataSink * >::iterator li = paused.begin(); 00510 li = paused.begin(); 00511 while (li != paused.end()) { 00512 (*li)->getOutput()->setResendFrameHeader(true); 00513 ++li; 00514 } 00515 lockInput.release(); 00516 } 00517 00518 int DataChannel::getNumberOfActiveDataSinks(){ 00519 int result=0; 00520 lockDataSink.lock(); 00521 result=active.size(); 00522 lockDataSink.release(); 00523 return result; 00524 }; 00525 00527 bool DataChannel::play(u32 clientId, double prefetch) 00528 { 00529 dprintf_full("DataChannel::play(u32 clientId=%u prefetch %f)\r\n",clientId,prefetch); 00530 if(paused.empty()) { 00531 dprintf_err("DataChannel::play: (Empty) Failed to play non existing client %u\r\n",clientId); 00532 return false; 00533 } 00534 if(!input) { 00535 dprintf_err("bool DataChannel::play: No input is set!\r\n"); 00536 return false; 00537 } 00538 00539 //FIXME: unpausing will only work for one DC with ONE client 00540 input->play(prefetch); //open the io stream (if not yet open) 00541 00542 lockDataSink.lock(); 00543 00544 // if (channelOpen && !tearDown) { 00545 00546 00547 // localDataSinkAdaptors are not allowed to modify the ES directly 00548 // FIXME: one DC -> one CLient 00549 // Problem here is the global Adaptor and 00550 // the local Adaptor, we could only clone an ES, when 00551 // the global adaptor is initialized! 00552 // the next three lines also break TC support 00553 // client receives an invalid (old) ESheader 00554 // through Rtp when the server adapts the stream 00555 // for the client 00556 // ESInfo *es2 = es->clone(es->getContainerInfo()); 00557 //c->getAdaptor()->setESInfo(es2); 00558 //c->getOutput()->setESInfo(es2); 00559 00560 00561 // remove the client from the paused list, 00562 00563 list<DataSink*>::iterator cLi=paused.begin(); 00564 while(cLi!=paused.end()) { 00565 if( (*cLi)->getDataSinkId()==clientId) { 00566 active.push_front((*cLi)); 00567 (*cLi)->playOutput(prefetch); 00568 paused.remove(*cLi); 00569 lockDataSink.release(); 00570 dprintf_full("DataChannel::play: Started client %u\r\n",clientId); 00571 return true; 00572 } 00573 cLi++; 00574 } 00575 lockDataSink.release(); 00576 dprintf_err("DataChannel::play: Failed to play non existing paused client %u\r\n",clientId); 00577 return false; 00578 00579 // } 00580 dprintf_err("DataChannel::play: Failed to play client %u. channelOpen=%i, tearDown=%i\r\n", 00581 clientId,channelOpen,tearDown); 00582 lockDataSink.release(); 00583 return false; 00584 }; 00585 00587 char *DataChannel::enableCaching(bool omitHeader, const char* dir, bool proxyMode, const Url* url) 00588 { 00589 00590 char *out = new char[1024]; 00591 IO* output=NULL; 00592 out[0] = 0; 00593 lockDataSink.lock(); 00594 if (channelOpen && es) { 00595 00596 char intnum[32]; 00597 out[0] = 0; 00598 intnum[0] = 0; 00599 sprintf(intnum, "%llu", es->getStreamId()); 00600 if(!dir) { 00601 output=new DevNull(); 00602 } 00603 else { 00604 00605 strcpy(out,dir); 00606 00607 if(!(out[strlen(out)-1]=='/' || out[strlen(out)-1]=='\\' )) { 00608 int xy=strlen(out); 00609 out[xy]=PATHSEPARATORCHAR; 00610 out[xy+1]='\0'; 00611 } 00612 00613 00614 // problem: recursive servers -> eliminate path just get filename 00615 const char* tmp=es->getContainerInfo()->getLocalFile(); 00616 const char* t2=NULL; 00617 char* dummy=NULL; 00618 if(tmp!=NULL) { 00619 if((t2=strrchr(tmp,PATHSEPARATORCHAR))!=NULL) { 00620 t2++; 00621 } 00622 else 00623 t2=tmp; 00624 } 00625 if(url) { 00626 if(proxyMode) { 00627 dummy=Url::transformUrlToLocalFile(url); 00628 strcat(out,dummy); 00629 delete[] dummy; 00630 } 00631 else { 00632 dummy=Url::transformPathToLocalFile(url); 00633 strcat(out,dummy); 00634 delete[] dummy; 00635 } 00636 } 00637 else { 00638 00639 strcat(out,"localfile"); 00640 if(t2) { 00641 strcat(out,"_"); 00642 strcat(out, t2); 00643 } 00644 } 00645 00646 if (es->isVisualStream()) 00647 strcat(out, ".video."); 00648 else if (es->isAudioStream()) 00649 strcat(out, ".audio."); 00650 else if(es->isSceneDescriptionStream()) 00651 strcat(out, ".bifs."); 00652 else if(es->isODStream()) 00653 strcat(out, ".ODstream."); 00654 else 00655 strcat(out, ".generic."); 00656 00657 strcat(out, intnum); 00658 // check if file exists!!!! 00659 FILE* fp=NULL; 00660 while((fp=fopen(out,"r")) != 0) { 00661 // merde! file exists 00662 sprintf(out,"%s%u",out,(unsigned int)(rand()%10)); 00663 fclose(fp);fp=NULL; 00664 } 00665 if(fp) 00666 fclose(fp); 00667 // create a writer, out is deep-copied, also in QFile! 00668 output = new MPGStreamIO(es, out, true, omitHeader); 00669 if (!output->open()) { 00670 dprintf_err("DataChannel::enableCaching: Failed to open output channel\n"); 00671 } 00672 } 00673 00674 char tmp2[12]; 00675 strcpy(tmp2, "127.0.0.1"); 00676 DataSink *c = new DataSink(0,output, tmp2, 0, 0); 00677 active.push_front(c); 00678 } 00679 lockDataSink.release(); 00680 00681 return out; 00682 }; 00683 00685 void DataChannel::visualizeCaching() 00686 { 00687 lockDataSink.lock(); 00688 if (channelOpen && es->isVisualStream()) { 00689 if(!this->localViewer) { 00690 char *tmp2 = new char[12]; 00691 00692 strcpy(tmp2, "127.0.0.1"); 00693 00694 AdaptorChain *chain = new VideoAdaptorChain(); 00695 chain->setESInfo((ESInfo*)es); 00696 #if defined (BUILD_ARCH_INTEL) || defined (WIN32) //assuming display with 24 or 32 bit (4byte) depth 00697 // YV12 is super-fast with Visualizer::SDL! 00698 UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceYV12; 00699 // UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceRGB32; 00700 // UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceGRAY8; 00701 // UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceRGB565; 00702 #elif BUILD_ARCH_ARM //iPAQ only has 16 bit display! 00703 UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceRGB565; 00704 #endif 00705 chain->addAdaptor(new MP4Decoder((VideoESInfo*)es, false, cs, MP4Decoder::FFMPEG)); 00706 //chain->addAdaptor(new FrameRotate((VideoESInfo*)es, 90)); //add after decoder 00707 00708 //chain->addAdaptor(new Visualizer((VideoESInfo*)es, Visualizer::X11, cs)); 00709 chain->addAdaptor(new Visualizer((VideoESInfo*)es, Visualizer::SDL, cs)); 00710 this->localViewer = new DataSink(1,new DevNull(), tmp2, 0, chain); 00711 delete tmp2; 00712 } 00713 } 00714 lockDataSink.release(); 00715 }; 00716 00718 bool DataChannel::mute(u32 clientId) 00719 { 00720 dprintf_full("DataChannel::mute(u32 clientId=%u)\r\n",clientId); 00721 if(active.empty()) 00722 return false; 00723 00724 lockDataSink.lock(); 00725 00726 list<DataSink*>::iterator cLi=active.begin(); 00727 while(cLi!=active.end()) { 00728 if( (*cLi)->getDataSinkId()==clientId) { 00729 (*cLi)->muteOutput(); 00730 lockDataSink.release(); 00731 return true; 00732 } 00733 cLi++; 00734 } 00735 lockDataSink.release(); 00736 return false; 00737 }; 00738 00740 bool DataChannel::unmute(u32 clientId) 00741 { 00742 dprintf_full("DataChannel::unmute(u32 clientId=%u)\r\n",clientId); 00743 if(active.empty()) 00744 return false; 00745 00746 lockDataSink.lock(); 00747 00748 list<DataSink*>::iterator cLi=active.begin(); 00749 while(cLi!=active.end()) { 00750 if( (*cLi)->getDataSinkId()==clientId) { 00751 (*cLi)->playOutput(0.0); 00752 lockDataSink.release(); 00753 return true; 00754 } 00755 cLi++; 00756 } 00757 lockDataSink.release(); 00758 return false; 00759 }; 00760 00762 bool DataChannel::pause(u32 clientId) 00763 { 00764 if(active.empty()) { 00765 dprintf_full("DataChannel::pause(u32 clientId=%u): NO ACTIVE CLIENTS!\r\n",clientId); 00766 return false; 00767 } 00768 00769 //pause the input stream 00770 input->pause(); 00771 00772 00773 lockDataSink.lock(); 00774 // remove the client from the active list 00775 list<DataSink*>::iterator cLi=active.begin(); 00776 while(cLi!=active.end()) { 00777 dprintf_full("DataChannel::pause found client %p Id=%u, looking for Id %u)\n", 00778 (*cLi), (*cLi)->getDataSinkId(),clientId); 00779 if( (*cLi)->getDataSinkId()==clientId) { 00780 paused.push_front((*cLi)); 00781 (*cLi)->pauseOutput(); 00782 active.remove(*cLi); 00783 lockDataSink.release(); 00784 return true; 00785 } 00786 cLi++; 00787 } 00788 00789 lockDataSink.release(); 00790 return false; 00791 }; 00792 00794 bool DataChannel::insert(DataSink* c) 00795 { 00796 dprintf_full("DataChannel::insert(DataSink* c=%p,c->clientId=%u\r\n)\r\n",c, (c?c->getDataSinkId():0)); 00797 lockDataSink.lock(); 00798 paused.push_front(c); 00799 lockDataSink.release(); 00800 return true; 00801 }; 00802 00806 DataSink* DataChannel::getDataSink(u32 clientId) 00807 { 00808 DataSink* result=NULL; 00809 lockDataSink.lock(); 00810 if(!paused.empty()) { 00811 list < DataSink * >::iterator li = paused.begin(); 00812 while(li != paused.end()) { 00813 if((*li)->getDataSinkId()==clientId) { 00814 result=(*li); 00815 li=paused.end(); 00816 00817 } 00818 else 00819 ++li; 00820 } 00821 } 00822 lockDataSink.release(); 00823 return result; 00824 }; 00825 const DataSink* DataChannel::getActiveDataSink(u32 clientId) 00826 { 00827 DataSink* result=NULL; 00828 lockDataSink.lock(); 00829 if(!active.empty()) { 00830 list < DataSink * >::iterator li = active.begin(); 00831 while(li != active.end()) { 00832 if((*li)->getDataSinkId()==clientId) { 00833 result=(*li); 00834 li=active.end(); 00835 00836 } 00837 else 00838 ++li; 00839 } 00840 } 00841 lockDataSink.release(); 00842 return result; 00843 }; 00844 00845 list <u32> DataChannel::getListOfActiveClientIds() { 00846 list<u32> idlist; 00847 lockDataSink.lock(); 00848 if(!active.empty()) { 00849 list < DataSink * >::iterator li = active.begin(); 00850 while(li != active.end()) { 00851 idlist.push_back((*li)->getDataSinkId()); 00852 ++li; 00853 } 00854 } 00855 lockDataSink.release(); 00856 return idlist; 00857 } 00858 00859 list <u32> DataChannel::getListOfPausedClientIds() { 00860 list<u32> idlist; 00861 lockDataSink.lock(); 00862 if(!paused.empty()) { 00863 list < DataSink * >::iterator li = paused.begin(); 00864 while(li != paused.end()) { 00865 idlist.push_back((*li)->getDataSinkId()); 00866 ++li; 00867 } 00868 } 00869 lockDataSink.release(); 00870 return idlist; 00871 } 00872 00873 00874 void DataChannel::closeAllClients(bool immediateClose) 00875 { 00876 bool resetSendHeader=false; 00877 // now close the clients (incl. their adaptors) 00878 lockDataSink.lock(); 00879 tearDown=true; 00880 channelOpen=false; 00881 if(input->getState()==IO::STREAMEOF) 00882 resetSendHeader=true; 00883 if(!active.empty() || localViewer) { 00884 list < DataSink * >::iterator li = active.begin(); 00885 while (li!=active.end()) { 00886 00887 // check if we have reached the STREAMEOF at the input 00888 // if yes, we have to enable resendFrameHeader 00889 if(resetSendHeader) { 00890 (*li)->getOutput()->setResendFrameHeader(true); 00891 dprintf_full("DataChannel::closeAllClients resetting resendFrameHeader\r\n"); 00892 } 00893 (*li)->close(immediateClose); // dataSink->close: this may take a while... 00894 ++li; 00895 } 00896 } 00897 dprintf_full("DataChannel::closeAllClients: closed all client outs\n"); 00898 if(localViewer) 00899 localViewer->close(); 00900 lockDataSink.release(); 00901 }; 00902 00903 00904 bool DataChannel::teardown(u32 clientId,bool immediate) 00905 { 00906 dprintf_full("DataChannel::teardown(u32 clientId=%u,bool immediate=%i)\r\n",clientId,immediate); 00907 lockDataSink.lock(); 00908 immediateClose=immediate; 00909 if(active.empty()) { 00910 dprintf_full("DataChannel::teardown: Closing input\r\n"); 00911 if (input) 00912 input->close(immediate); 00913 tearDown=true; 00914 channelOpen=false; 00915 } 00916 // remove the client from the active list, 00917 list < DataSink * >::iterator li = active.begin(); 00918 while (li != active.end()) { 00919 if (clientId == (*li)->getDataSinkId()) { 00920 dprintf_full("DataChannel::teardown: Deleted client from active list\r\n"); 00921 delete *li; 00922 *li=NULL; 00923 active.erase(li); 00924 if(active.empty()) { 00925 dprintf_full("DataChannel::teardown: Closing input from active list \r\n"); 00926 if (input) 00927 input->close(immediate); 00928 00929 tearDown=true; 00930 channelOpen=false; 00931 } 00932 lockDataSink.release(); 00933 return true; 00934 } 00935 ++li; 00936 } //while 00937 // if not found on active, remove the client from the pause list, 00938 li = paused.begin(); 00939 while (li != paused.end()) { 00940 if (clientId == (*li)->getDataSinkId()) { 00941 dprintf_full("DataChannel::teardown: Deleted client from paused list\r\n"); 00942 delete *li; 00943 *li=NULL; 00944 paused.erase(li); 00945 if(paused.empty()) { 00946 dprintf_full("DataChannel::teardown: Closing input from paused list \r\n"); 00947 if (input) 00948 input->close(immediate); 00949 00950 tearDown=true; 00951 channelOpen=false; 00952 } 00953 lockDataSink.release(); 00954 return true; 00955 } 00956 ++li; 00957 } //while 00958 00959 // client really not found 00960 dprintf_err("DataChannel::teardown DataSink not found\r\n"); 00961 fflush(stdout); 00962 lockDataSink.release(); 00963 00964 return false; 00965 00966 }; 00967 00968 00969 void DataChannel::setOutputStatistics(Statistics *stats) { 00970 //just overwrite old stats, if exist (stats will be deleted elsewhere) 00971 outstats = stats; 00972 } 00973 00974 Statistics *DataChannel::getOutputStatistics() { 00975 return outstats; 00976 }