DataChannel Class Reference

a DataChannel object reads data from an input. <short description=""> More...

#include <DataChannel.hpp>

Inheritance diagram for DataChannel:

VThread List of all members.

Public Member Functions

 DataChannel (IO *input, ESInfo *es)
 creates a DataChannel only a dummy adaptor is set
 DataChannel (IO *input, ESInfo *es, Adaptor *adapt)
 creates a DataChannel object that reads data from an input.
void setAdaptor (Adaptor *adapt)
 changes the adaptor to a new one, works even while the thread is working.
void setESInfo (ESInfo *new_es)
 changes the connected ESInfo
void setFinalFrameNumber (long frameNumber)
 set the frame number, where the DC-thread should stop streaming.
void initiateThreadStop (bool forceTearDown)
 waits until the DC-thread is stopped
void setInput (IO *new_io)
bool play (u32 clientId, double prefetch)
 Moves a previously inserted cliented from the paused list to the active list returns false when the DataChannel has already ended.
bool mute (u32 clientId)
 mutes a client.
bool unmute (u32 clientId)
 unmutes a MUTED client.
bool pause (u32 clientId)
 pauses a client.
bool insert (DataSink *c)
 inserts the client into the paused queue.
char * enableCaching (bool omitHeader, const char *outDir, bool proxyMode=true, const Url *url=NULL)
 stores the result of the global Adaptor stream to the disk.
void visualizeCaching ()
 renders the result of the global Adaptor stream to the screen.
bool teardown (u32 clientId, bool immediate=false)
 returns false, when the client was not found in the active or paused queue.
bool tearDownFlagSet () const
int getNumberOfActiveDataSinks ()
 returns the number of active clients
IOgetInput ()
AdaptorgetAdaptor () const
void run ()
 starts the thread.
void setSessionId (int i)
uint getSessionId () const
const ESInfogetESInfo () const
DataSinkgetDataSink (u32 clientId)
 returns the DataSink with id
const DataSinkgetActiveDataSink (u32 clientId)
list< u32 > getListOfActiveClientIds ()
list< u32 > getListOfPausedClientIds ()
void setOutputStatistics (Statistics *stats)
 this is BIG HACK...
StatisticsgetOutputStatistics ()
 this is BIG HACK...

Protected Attributes

int sessionId
IOinput
list< DataSink * > active
 list of receivers
list< DataSink * > paused
 list of paused receivers
DataSinklocalViewer
 local viewer
VMutex lockDataSink
 Semaphore used to sync access to the active clients.
VMutex lockInput
 Semaphore used to sync access to the Input IO.
VMutex lockAdaptor
 a semaphore to lock the adaptor
Adaptoradaptor
 each DataChannel can have an Adaptor, per default a Forwarder is set here
list< Frame * > sendQueue
bool channelOpen
 specifies if new DataSinks can be added
bool tearDown
bool immediateClose
ESInfoes
Statisticsoutstats
long currentFrameNumber
long finalFrameNumber

Detailed Description

a DataChannel object reads data from an input. <short description="">

Note that you must at least add one DataSink via play before the DataChannel can actually send data. Each DataChannel has an Adaptor, after sending the data through the the Adaptor, the result of the adaptation is distributed to each DataSink. To each DataSink, there has to be attached an IO output

Author:
Michael Kropfberger and Peter Schojer
Version:
Id
DataChannel.hpp,v 1.23 2006/01/26 09:50:45 mkropfbe Exp

Definition at line 74 of file DataChannel.hpp.


Constructor & Destructor Documentation

DataChannel::DataChannel IO input,
ESInfo es,
Adaptor adapt
 

creates a DataChannel object that reads data from an input.

Note that you must at least add one DataSink via play before the DataChannel can actually send data. Each DataChannel has an Adaptor, after sending the data through the the Adaptor, the result of the adaptation is distributed to each DataSink. Definition at line 76 of file DataChannel.cpp.

00077 { 00078 init(in, es, adapt); 00079 };


Member Function Documentation

char * DataChannel::enableCaching bool  omitHeader,
const char *  outDir,
bool  proxyMode = true,
const Url url = NULL
 

stores the result of the global Adaptor stream to the disk.

Parameters:
proxyMode tells how the filename is generated. if set to true the serverName is included in the local file name which makes it impossible to reuse demuxed files when the serverName changes
return returns a pointer to the new outName. This will create internally a DataSink with ID 1. Use this id to control the DataSink e.g. pause(1), teardown(1) etc.
Definition at line 587 of file DataChannel.cpp.

References active, channelOpen, ESInfo::getContainerInfo(), ContainerInfo::getLocalFile(), ESInfo::getStreamId(), ESInfo::isAudioStream(), ESInfo::isODStream(), ESInfo::isSceneDescriptionStream(), ESInfo::isVisualStream(), VMutex::lock(), lockDataSink, and VMutex::release().

Referenced by ContainerDemux::demultiplexAndUpdateIO().

00588 { 00589 00590 char *out = new char[1024]; 00591 IO* output=NULL; 00592 out[0] = 0; 00593 lockDataSink.lock(); 00594 if (channelOpen && es) { 00595 00596 char intnum[32]; 00597 out[0] = 0; 00598 intnum[0] = 0; 00599 sprintf(intnum, "%llu", es->getStreamId()); 00600 if(!dir) { 00601 output=new DevNull(); 00602 } 00603 else { 00604 00605 strcpy(out,dir); 00606 00607 if(!(out[strlen(out)-1]=='/' || out[strlen(out)-1]=='\\' )) { 00608 int xy=strlen(out); 00609 out[xy]=PATHSEPARATORCHAR; 00610 out[xy+1]='\0'; 00611 } 00612 00613 00614 // problem: recursive servers -> eliminate path just get filename 00615 const char* tmp=es->getContainerInfo()->getLocalFile(); 00616 const char* t2=NULL; 00617 char* dummy=NULL; 00618 if(tmp!=NULL) { 00619 if((t2=strrchr(tmp,PATHSEPARATORCHAR))!=NULL) { 00620 t2++; 00621 } 00622 else 00623 t2=tmp; 00624 } 00625 if(url) { 00626 if(proxyMode) { 00627 dummy=Url::transformUrlToLocalFile(url); 00628 strcat(out,dummy); 00629 delete[] dummy; 00630 } 00631 else { 00632 dummy=Url::transformPathToLocalFile(url); 00633 strcat(out,dummy); 00634 delete[] dummy; 00635 } 00636 } 00637 else { 00638 00639 strcat(out,"localfile"); 00640 if(t2) { 00641 strcat(out,"_"); 00642 strcat(out, t2); 00643 } 00644 } 00645 00646 if (es->isVisualStream()) 00647 strcat(out, ".video."); 00648 else if (es->isAudioStream()) 00649 strcat(out, ".audio."); 00650 else if(es->isSceneDescriptionStream()) 00651 strcat(out, ".bifs."); 00652 else if(es->isODStream()) 00653 strcat(out, ".ODstream."); 00654 else 00655 strcat(out, ".generic."); 00656 00657 strcat(out, intnum); 00658 // check if file exists!!!! 00659 FILE* fp=NULL; 00660 while((fp=fopen(out,"r")) != 0) { 00661 // merde! file exists 00662 sprintf(out,"%s%u",out,(unsigned int)(rand()%10)); 00663 fclose(fp);fp=NULL; 00664 } 00665 if(fp) 00666 fclose(fp); 00667 // create a writer, out is deep-copied, also in QFile! 00668 output = new MPGStreamIO(es, out, true, omitHeader); 00669 if (!output->open()) { 00670 dprintf_err("DataChannel::enableCaching: Failed to open output channel\n"); 00671 } 00672 } 00673 00674 char tmp2[12]; 00675 strcpy(tmp2, "127.0.0.1"); 00676 DataSink *c = new DataSink(0,output, tmp2, 0, 0); 00677 active.push_front(c); 00678 } 00679 lockDataSink.release(); 00680 00681 return out; 00682 };

DataSink * DataChannel::getDataSink u32  clientId  ) 
 

returns the DataSink with id

Parameters:
clientId. searches only in paused. You are not allowed to change clients that are currently playing
Definition at line 806 of file DataChannel.cpp.

References VMutex::lock(), lockDataSink, paused, and VMutex::release().

00807 { 00808 DataSink* result=NULL; 00809 lockDataSink.lock(); 00810 if(!paused.empty()) { 00811 list < DataSink * >::iterator li = paused.begin(); 00812 while(li != paused.end()) { 00813 if((*li)->getDataSinkId()==clientId) { 00814 result=(*li); 00815 li=paused.end(); 00816 00817 } 00818 else 00819 ++li; 00820 } 00821 } 00822 lockDataSink.release(); 00823 return result; 00824 };

Statistics * DataChannel::getOutputStatistics  ) 
 

this is BIG HACK...

by that, you get only one output stats from the first datasink out (eg. RTP) since the code isnt able to cope with more than one networked DataSink anyway, it should workDefinition at line 974 of file DataChannel.cpp.

00974 { 00975 return outstats; 00976 }

void DataChannel::initiateThreadStop bool  forceTearDown  ) 
 

waits until the DC-thread is stopped

Parameters:
forceTearDown: true will immedately stop the DC-thread, false will block until the DC is done with input or has reached finalFrameNumber
Definition at line 477 of file DataChannel.cpp.
00477 { 00478 if (forceTearDown) 00479 tearDown=true; 00480 //if not forceTearDown, thread will stop when finalFrameNumber is reached 00481 while ( this->running() || !this->finished()) { 00482 msleep(40); 00483 dprintf_full("DataChannel::initiateThreadStop waiting on DataChannel::run finished... sendQueue (%i elems)\n",sendQueue.size()); 00484 } 00485 if (forceTearDown) 00486 tearDown=false; 00487 }

bool DataChannel::insert DataSink c  ) 
 

inserts the client into the paused queue.

This is the first mandatory step, if you want to start playing a client.Definition at line 794 of file DataChannel.cpp.

References DataSink::getDataSinkId(), VMutex::lock(), lockDataSink, paused, and VMutex::release().

Referenced by ContainerDemux::createDataChannel().

00795 { 00796 dprintf_full("DataChannel::insert(DataSink* c=%p,c->clientId=%u\r\n)\r\n",c, (c?c->getDataSinkId():0)); 00797 lockDataSink.lock(); 00798 paused.push_front(c); 00799 lockDataSink.release(); 00800 return true; 00801 };

bool DataChannel::mute u32  clientId  ) 
 

mutes a client.

The client stays on the active list, and received frames are still passed through the global DC adaptors (eg. statistics), but it will bypass the client adaptors (eg. decoding). But the frames will still be handed over to the I/O output classes' writeFrame for sync. To return to PLAY state, call DataChannel::unmute(clientId) Definition at line 718 of file DataChannel.cpp.

References active, VMutex::lock(), lockDataSink, and VMutex::release().

00719 { 00720 dprintf_full("DataChannel::mute(u32 clientId=%u)\r\n",clientId); 00721 if(active.empty()) 00722 return false; 00723 00724 lockDataSink.lock(); 00725 00726 list<DataSink*>::iterator cLi=active.begin(); 00727 while(cLi!=active.end()) { 00728 if( (*cLi)->getDataSinkId()==clientId) { 00729 (*cLi)->muteOutput(); 00730 lockDataSink.release(); 00731 return true; 00732 } 00733 cLi++; 00734 } 00735 lockDataSink.release(); 00736 return false; 00737 };

bool DataChannel::pause u32  clientId  ) 
 

pauses a client.

The client is searched in the active list. If it is found there, it is moved to the pause queue. Otherwise false is returned. Note that when you pause the last client, the thread will terminate and has to be restarted where it will continue from the last frame. Definition at line 762 of file DataChannel.cpp.

References active, VMutex::lock(), lockDataSink, IO::pause(), paused, and VMutex::release().

00763 { 00764 if(active.empty()) { 00765 dprintf_full("DataChannel::pause(u32 clientId=%u): NO ACTIVE CLIENTS!\r\n",clientId); 00766 return false; 00767 } 00768 00769 //pause the input stream 00770 input->pause(); 00771 00772 00773 lockDataSink.lock(); 00774 // remove the client from the active list 00775 list<DataSink*>::iterator cLi=active.begin(); 00776 while(cLi!=active.end()) { 00777 dprintf_full("DataChannel::pause found client %p Id=%u, looking for Id %u)\n", 00778 (*cLi), (*cLi)->getDataSinkId(),clientId); 00779 if( (*cLi)->getDataSinkId()==clientId) { 00780 paused.push_front((*cLi)); 00781 (*cLi)->pauseOutput(); 00782 active.remove(*cLi); 00783 lockDataSink.release(); 00784 return true; 00785 } 00786 cLi++; 00787 } 00788 00789 lockDataSink.release(); 00790 return false; 00791 };

bool DataChannel::play u32  clientId,
double  prefetch
 

Moves a previously inserted cliented from the paused list to the active list returns false when the DataChannel has already ended.

In this case the client is not added. Note that for at least one client play must be called PRIOR starting the DataChannel, otherwise the thread will terminate immediately. You can only play clients which were inserted previously. Definition at line 527 of file DataChannel.cpp.

References active, channelOpen, VMutex::lock(), lockDataSink, paused, IO::play(), and VMutex::release().

Referenced by ContainerDemux::createDataChannel().

00528 { 00529 dprintf_full("DataChannel::play(u32 clientId=%u prefetch %f)\r\n",clientId,prefetch); 00530 if(paused.empty()) { 00531 dprintf_err("DataChannel::play: (Empty) Failed to play non existing client %u\r\n",clientId); 00532 return false; 00533 } 00534 if(!input) { 00535 dprintf_err("bool DataChannel::play: No input is set!\r\n"); 00536 return false; 00537 } 00538 00539 //FIXME: unpausing will only work for one DC with ONE client 00540 input->play(prefetch); //open the io stream (if not yet open) 00541 00542 lockDataSink.lock(); 00543 00544 // if (channelOpen && !tearDown) { 00545 00546 00547 // localDataSinkAdaptors are not allowed to modify the ES directly 00548 // FIXME: one DC -> one CLient 00549 // Problem here is the global Adaptor and 00550 // the local Adaptor, we could only clone an ES, when 00551 // the global adaptor is initialized! 00552 // the next three lines also break TC support 00553 // client receives an invalid (old) ESheader 00554 // through Rtp when the server adapts the stream 00555 // for the client 00556 // ESInfo *es2 = es->clone(es->getContainerInfo()); 00557 //c->getAdaptor()->setESInfo(es2); 00558 //c->getOutput()->setESInfo(es2); 00559 00560 00561 // remove the client from the paused list, 00562 00563 list<DataSink*>::iterator cLi=paused.begin(); 00564 while(cLi!=paused.end()) { 00565 if( (*cLi)->getDataSinkId()==clientId) { 00566 active.push_front((*cLi)); 00567 (*cLi)->playOutput(prefetch); 00568 paused.remove(*cLi); 00569 lockDataSink.release(); 00570 dprintf_full("DataChannel::play: Started client %u\r\n",clientId); 00571 return true; 00572 } 00573 cLi++; 00574 } 00575 lockDataSink.release(); 00576 dprintf_err("DataChannel::play: Failed to play non existing paused client %u\r\n",clientId); 00577 return false; 00578 00579 // } 00580 dprintf_err("DataChannel::play: Failed to play client %u. channelOpen=%i, tearDown=%i\r\n", 00581 clientId,channelOpen,tearDown); 00582 lockDataSink.release(); 00583 return false; 00584 };

void DataChannel::run  )  [virtual]
 

starts the thread.

if for no client play was called previously, the thread terminates immediately. If clients were set, the thread runs till the input reaches End-of-stream. When this state is reached, all buffered data is sent, then the active queue is emptied. The clients are not destroyed.

Implements VThread.

Definition at line 187 of file DataChannel.cpp.

References active, Adaptor::adapt(), adaptor, channelOpen, Adaptor::close(), IO::close(), Frame::getAU(), IO::getFrame(), Adaptor::getName(), IO::getState(), IO::getURL(), localViewer, VMutex::lock(), lockAdaptor, lockDataSink, lockInput, IO::open(), VMutex::release(), DataSink::send(), and ESInfo::setCompleteState().

00188 { 00189 bool resetSendHeader; 00190 00191 dprintf_full("DataChannel::run: DC.Thread started: %s\n", input->getURL()); 00192 resetSendHeader=false; 00193 00194 if (!active.empty() || localViewer) { 00195 if (input->getState()!=IO::OPEN && !input->open()) { 00196 dprintf_err("DataChannel::run: failed to open input\r\n"); 00197 return; 00198 } 00199 // read data from input 00200 dprintf_full("DataChannel::run: Input is open\n"); 00201 00202 Frame *frm = NULL; 00203 //only continue if OPEN, PAUSED, MUTED, PREBUFFERING or STREAMEOF 00204 //STREAMEOF is not a problem, as long as eg. the Rtp Class still has buffered data! 00205 IO::State mystate = input->getState(); 00206 00207 while (! ( (mystate == IO::CLOSED) || (mystate == IO::CLOSING) || 00208 (mystate == IO::STREAMERR) ) //|| (input->getState() == IO::STREAMEOF) ) 00209 && !tearDown && ((finalFrameNumber == -1) || (currentFrameNumber < finalFrameNumber)) ) { 00210 lockInput.lock(); 00211 lockDataSink.lock(); 00212 if(mystate == IO::OPENING) 00213 00214 #ifndef WINCE 00215 msleep(10); 00216 #else 00217 Sleep(5); 00218 #endif 00219 if(active.empty() && !localViewer) { 00220 lockInput.release(); 00221 lockDataSink.release(); 00222 return; 00223 } 00224 lockDataSink.release(); 00225 00226 //read frame 00227 frm = input->getFrame(); 00228 00229 if( frm == NULL ) { 00230 dprintf_full("DataChannel::run no frm got via getFrame\n"); 00231 if(mystate != IO::OPEN) { 00232 if(frm) { 00233 delete frm; 00234 frm=NULL; 00235 } 00236 } 00237 lockInput.release(); 00238 } else { 00239 dprintf_full("DataChannel::run passing frame to adaptor\n"); 00240 // we got data 00241 // adapt it with the global adaptor (chain) 00242 lockAdaptor.lock(); 00243 list < Frame * >result; 00244 00245 //adapt (i.e. decode) frame 00246 result = adaptor->adapt(frm); 00247 00248 lockAdaptor.release(); 00249 00250 if (!result.empty() ) { 00251 list < Frame * >::iterator rLi; 00252 // now append the received frame(s) to the sendqueue 00253 if(result.size()==1) { 00254 rLi=result.begin(); 00255 if ( frm && (*rLi) != frm) { 00256 // caching adaptors have to deep-copy the frame 00257 dprintf_full("DataChannel::run Frame was adapted and orig frame will be deleted now...\n"); 00258 delete frm; frm=NULL; 00259 } 00260 else { 00261 // frame is in result 00262 dprintf_full("DataChannel::run frame is in result\n"); 00263 frm=NULL; 00264 } 00265 } 00266 //append results to sendQueue 00267 while (!result.empty()) { 00268 rLi = result.begin(); 00269 00270 if (!(*rLi)->isMarkedForDelete()) //may be dropped in ESSynchronizer 00271 sendQueue.push_back((*rLi)); 00272 else { 00273 dprintf_full("DataChannel::run adaptor result was marked for delete... dropping!\n"); 00274 delete (*rLi); (*rLi)=NULL; 00275 } 00276 result.pop_front(); 00277 } 00278 } else { 00279 dprintf_full("DataChannel::run adaptor result was empty!!! dropped, so delete frm CTS %i...\n",frm->getAU()->cts); 00280 //ATTENTION: DC only deletes frames coming from inputIO, not eg. adaptor-decoded and then dropped ones! 00281 //so make sure, to markForDelete() 00282 delete frm; frm=NULL; 00283 } 00284 lockInput.release(); 00285 00286 } // end frame != NULL 00287 00288 // result now empty 00289 // check if we have data in the sendqueue 00290 list < Frame * >::iterator rLi; 00291 00292 if (sendQueue.empty() && (input->getState() == IO::STREAMEOF) ) 00293 break; //give up, no more data coming, nothing left to forward 00294 00295 // must be an if not while -> avoids bursts 00296 if (!sendQueue.empty() && !tearDown) { 00297 // distribute it to all active clients 00298 lockDataSink.lock(); 00299 rLi = sendQueue.begin(); 00300 00301 //set this frame to be deleted after all output-DataSinks, this may be changed by eg. buffering output-IOs 00302 (*rLi)->markForDelete(); 00303 00304 if(!active.empty()) { 00305 00306 dprintf_full("DataChannel::run: READ FRAME %p (currentFrameNumber %li)\n",(*rLi),currentFrameNumber); 00307 currentFrameNumber++; 00308 00309 list < DataSink * >::iterator li = active.begin(); 00310 00311 while (li != active.end()) { 00312 if(*rLi) { 00313 dprintf_full("Sending one frame %p (TS %i size %i) to one client\n",(*rLi), 00314 (*rLi)->getAU()->cts,(*rLi)->getAU()->size); 00315 00316 if (!(*li)->send((*rLi))) { //finally, writeFrame of output class 00317 // sth went wrong 00318 dprintf_err("DataChannel::run: Sending of frame failed\n"); 00319 tearDown = true; 00320 } 00321 } 00322 ++li; 00323 } 00324 dprintf_full("DataChannel::run: SENT one FRAME %p to all clients\n",(*rLi)); 00325 } 00326 // when we have no client and not even a localViewer -> exit 00327 else if(!localViewer) { 00328 lockDataSink.release(); 00329 if((*rLi)) { 00330 delete (*rLi); 00331 (*rLi)=NULL; 00332 } 00333 return; 00334 } 00335 if(localViewer) 00336 localViewer->send( (*rLi) ); 00337 lockDataSink.release(); 00338 00339 if ((*rLi)->isMarkedForDelete()) { 00340 dprintf_full("DataChannel::run cleanup sendQueue with frame %p\n",(*rLi)); 00341 if((*rLi)) 00342 delete (*rLi); 00343 } //else: ignore and let the buffering output-IO do the delete on it's own! 00344 00345 *rLi=NULL; 00346 sendQueue.pop_front(); 00347 00348 } else { 00349 dprintf_full("DataChannel::run sendQueue was empty!!! teardown %i\n",tearDown); 00350 } 00351 00352 if(frm) { 00353 dprintf_full("DataChannel::run cleanup incoming frame %p\n",frm); 00354 delete frm; 00355 frm=NULL; 00356 } 00357 00358 mystate = input->getState(); 00359 } // while() end reading 00360 00361 //lockDataSink.lock(); 00362 channelOpen = false; 00363 //lockDataSink.release(); 00364 dprintf_full("DataChannel::run: Finished Reading (state %i) currentFrameNumber %li\n", 00365 input->getState(),currentFrameNumber); 00366 if(input->getState()==IO::STREAMEOF) 00367 resetSendHeader=true; 00368 00369 input->close(true); 00370 00371 if (adaptor && (strcmp(adaptor->getName(),"Forwarder") != 0)) { //maybe there is some cached data? 00372 list < Frame * >result; 00373 dprintf_full("DataChannel::run: Closing Adaptor %s\n",adaptor->getName()); 00374 result = adaptor->close(); 00375 00376 list < Frame * >::iterator rLi; 00377 if (result.size() > 0) { 00378 dprintf_full("DataChannel::run: adaptor->close() returned %i frames\n",result.size()); 00379 // now append the received frame(s) to the sendqueue 00380 while (!result.empty()) { 00381 rLi = result.begin(); 00382 sendQueue.push_back((*rLi)); 00383 result.pop_front(); 00384 } 00385 // now we have read all data, we have to ensure that everything is sent 00386 dprintf_full("DataChannel::run: Sending %i cached frames\n",sendQueue.size()); 00387 while (!sendQueue.empty() && !tearDown) { 00388 // distribute it to all active clients 00389 rLi = sendQueue.begin(); 00390 //set this frame to be deleted after all output-DataSinks, this may be changed by eg. buffering output-IOs 00391 (*rLi)->markForDelete(); 00392 00393 dprintf_full("DataChannel::run: READ cached FRAME\n"); 00394 lockDataSink.lock(); 00395 if(!active.empty()) { 00396 list < DataSink * >::iterator li = active.begin(); 00397 while (li != active.end() && !tearDown) { 00398 dprintf_full("DataChannel::run: Sending one frame to one client\n"); 00399 if (!(*li)->send((*rLi))) { 00400 // sth went wrong 00401 dprintf_err("DataChannel::run: Sending of frame failed\n"); 00402 } 00403 ++li; 00404 } 00405 } 00406 if(localViewer) 00407 localViewer->send( (*rLi) ); 00408 00409 dprintf_full("DataChannel::run: SEND cached FRAME\n"); 00410 // finished with sending one frame 00411 lockDataSink.release(); 00412 00413 if ((*rLi)->isMarkedForDelete()) { 00414 dprintf_full("DataChannel::run cleanup sendQueue with frame %p\n",(*rLi)); 00415 if((*rLi)) 00416 delete (*rLi); 00417 } //else: ignore and let the buffering output-IO do the delete on it's own! 00418 00419 (*rLi)=NULL; 00420 sendQueue.pop_front(); 00421 } // end sending all frames 00422 } 00423 // if a teardown was received while sending, empty sendqueue 00424 while (!sendQueue.empty()) { 00425 rLi = sendQueue.begin(); 00426 dprintf_full("DataChannel::run, emptying sendqueue because of teardown: frame %p\n",(*rLi)); 00427 delete(*rLi); 00428 00429 (*rLi)=NULL; 00430 sendQueue.pop_front(); 00431 } 00432 }// if(adaptor) 00433 00434 // not necessary with BitField, but we better do it 00435 es->setCompleteState(true); 00436 } 00437 else { 00438 dprintf_small("DataChannel::run: Thread Started with no clients\n"); 00439 input->close(true); 00440 } 00441 channelOpen=true; 00442 00443 closeAllClients(false); 00444 00445 00446 dprintf_small("DataChannel::run: EXIT!\n"); 00447 this->exit(); 00448 } //DataChannel::run

void DataChannel::setAdaptor Adaptor adapt  ) 
 

changes the adaptor to a new one, works even while the thread is working.

If null is passed as parameter, a forwarder is set Definition at line 455 of file DataChannel.cpp.

References adaptor, VMutex::lock(), lockAdaptor, and VMutex::release().

Referenced by ContainerDemux::createDataChannel().

00456 { 00457 lockAdaptor.lock(); 00458 delete adaptor; 00459 adaptor = adapt; 00460 if (!adaptor) { 00461 adaptor = new Forwarder(); 00462 } 00463 lockAdaptor.release(); 00464 };

void DataChannel::setFinalFrameNumber long  frameNumber  ) 
 

set the frame number, where the DC-thread should stop streaming.

this is needed for reaching the switchPoint for stream switchingDefinition at line 473 of file DataChannel.cpp.

00473 { 00474 finalFrameNumber = frameNumber; 00475 }

void DataChannel::setOutputStatistics Statistics stats  ) 
 

this is BIG HACK...

by that, we set the stats of the FIRST dataSink output (eg. RTP), since the code isnt able to cope with more than one networked DataSink anyway :(Definition at line 969 of file DataChannel.cpp.

00969 { 00970 //just overwrite old stats, if exist (stats will be deleted elsewhere) 00971 outstats = stats; 00972 }

bool DataChannel::teardown u32  clientId,
bool  immediate = false
 

returns false, when the client was not found in the active or paused queue.

If c was found, the client will be deleted! Definition at line 904 of file DataChannel.cpp.

References active, channelOpen, IO::close(), VMutex::lock(), lockDataSink, paused, and VMutex::release().

00905 { 00906 dprintf_full("DataChannel::teardown(u32 clientId=%u,bool immediate=%i)\r\n",clientId,immediate); 00907 lockDataSink.lock(); 00908 immediateClose=immediate; 00909 if(active.empty()) { 00910 dprintf_full("DataChannel::teardown: Closing input\r\n"); 00911 if (input) 00912 input->close(immediate); 00913 tearDown=true; 00914 channelOpen=false; 00915 } 00916 // remove the client from the active list, 00917 list < DataSink * >::iterator li = active.begin(); 00918 while (li != active.end()) { 00919 if (clientId == (*li)->getDataSinkId()) { 00920 dprintf_full("DataChannel::teardown: Deleted client from active list\r\n"); 00921 delete *li; 00922 *li=NULL; 00923 active.erase(li); 00924 if(active.empty()) { 00925 dprintf_full("DataChannel::teardown: Closing input from active list \r\n"); 00926 if (input) 00927 input->close(immediate); 00928 00929 tearDown=true; 00930 channelOpen=false; 00931 } 00932 lockDataSink.release(); 00933 return true; 00934 } 00935 ++li; 00936 } //while 00937 // if not found on active, remove the client from the pause list, 00938 li = paused.begin(); 00939 while (li != paused.end()) { 00940 if (clientId == (*li)->getDataSinkId()) { 00941 dprintf_full("DataChannel::teardown: Deleted client from paused list\r\n"); 00942 delete *li; 00943 *li=NULL; 00944 paused.erase(li); 00945 if(paused.empty()) { 00946 dprintf_full("DataChannel::teardown: Closing input from paused list \r\n"); 00947 if (input) 00948 input->close(immediate); 00949 00950 tearDown=true; 00951 channelOpen=false; 00952 } 00953 lockDataSink.release(); 00954 return true; 00955 } 00956 ++li; 00957 } //while 00958 00959 // client really not found 00960 dprintf_err("DataChannel::teardown DataSink not found\r\n"); 00961 fflush(stdout); 00962 lockDataSink.release(); 00963 00964 return false; 00965 00966 };

bool DataChannel::unmute u32  clientId  ) 
 

unmutes a MUTED client.

This returns the client into PLAY state Definition at line 740 of file DataChannel.cpp.

References active, VMutex::lock(), lockDataSink, and VMutex::release().

00741 { 00742 dprintf_full("DataChannel::unmute(u32 clientId=%u)\r\n",clientId); 00743 if(active.empty()) 00744 return false; 00745 00746 lockDataSink.lock(); 00747 00748 list<DataSink*>::iterator cLi=active.begin(); 00749 while(cLi!=active.end()) { 00750 if( (*cLi)->getDataSinkId()==clientId) { 00751 (*cLi)->playOutput(0.0); 00752 lockDataSink.release(); 00753 return true; 00754 } 00755 cLi++; 00756 } 00757 lockDataSink.release(); 00758 return false; 00759 };

void DataChannel::visualizeCaching  ) 
 

renders the result of the global Adaptor stream to the screen.

Creates internally a DataSink with ID 0. Use this id to control the visualizing DataSinkDefinition at line 685 of file DataChannel.cpp.

References AdaptorChain::addAdaptor(), channelOpen, UncompressedVideoFrame::ColorSpace, ESInfo::isVisualStream(), localViewer, VMutex::lock(), lockDataSink, VMutex::release(), and Adaptor::setESInfo().

Referenced by ContainerDemux::createDataChannel().

00686 { 00687 lockDataSink.lock(); 00688 if (channelOpen && es->isVisualStream()) { 00689 if(!this->localViewer) { 00690 char *tmp2 = new char[12]; 00691 00692 strcpy(tmp2, "127.0.0.1"); 00693 00694 AdaptorChain *chain = new VideoAdaptorChain(); 00695 chain->setESInfo((ESInfo*)es); 00696 #if defined (BUILD_ARCH_INTEL) || defined (WIN32) //assuming display with 24 or 32 bit (4byte) depth 00697 // YV12 is super-fast with Visualizer::SDL! 00698 UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceYV12; 00699 // UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceRGB32; 00700 // UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceGRAY8; 00701 // UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceRGB565; 00702 #elif BUILD_ARCH_ARM //iPAQ only has 16 bit display! 00703 UncompressedVideoFrame::ColorSpace cs = UncompressedVideoFrame::ColorSpaceRGB565; 00704 #endif 00705 chain->addAdaptor(new MP4Decoder((VideoESInfo*)es, false, cs, MP4Decoder::FFMPEG)); 00706 //chain->addAdaptor(new FrameRotate((VideoESInfo*)es, 90)); //add after decoder 00707 00708 //chain->addAdaptor(new Visualizer((VideoESInfo*)es, Visualizer::X11, cs)); 00709 chain->addAdaptor(new Visualizer((VideoESInfo*)es, Visualizer::SDL, cs)); 00710 this->localViewer = new DataSink(1,new DevNull(), tmp2, 0, chain); 00711 delete tmp2; 00712 } 00713 } 00714 lockDataSink.release(); 00715 };


The documentation for this class was generated from the following files: