00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <stdio.h>
00019 #include <stdlib.h>
00020 #include <string.h>
00021 #include <iostream>
00022 using namespace std;
00023
00024 #include <time.h>
00025
00026 #include "RioInterfaceTypes.h"
00027 #include "RioInterface.h"
00028 #include "RioMMObject.h"
00029 #include "RioError.h"
00030
00031 #include "BufferStream.h"
00032
00033 #ifdef RIO_EMUL
00034 #define tstblk(blk) ( requested[blk] == true )
00035 #define setblk(blk) ( requested[blk] = true )
00036 #define clrblk(blk) ( requested[blk] = false )
00037 bool requested[15000];
00038 #endif
00039
00040
00041 CRioMMObject::CRioMMObject( char *mmObjectName, unsigned int blocksize,
00042 RioBlock numBuffers, struct timeval average_RTT )
00043 {
00044 fullBuffers = 0;
00045 CurrentBlock = 0;
00046 TotalBlocks = 0;
00047 nMedias = 0;
00048 CurMedia = 0;
00049 BlockSize = blocksize;
00050 group_block = -1;
00051 Media = NULL;
00052 ObjSize = 0;
00053 bufferStream = NULL;
00054 RTT_average = 0;
00055 RTT_window = 0;
00056 gettimeofday( &sum_RTT, 0 );
00057
00058 nBuffers = numBuffers;
00059 Request = new RioRequest[ nBuffers ];
00060 BufferStatus = new int [nBuffers];
00061
00062 memcpy( &(this->average_RTT), &average_RTT, sizeof( struct timeval ) );
00063
00064 for( unsigned int i = 0; i < nBuffers; i++ )
00065 {
00066 Request[i].Size = BlockSize;
00067 Request[i].Block = 0;
00068 Request[i].threadid = 0;
00069 Request[i].session = 0;
00070 Request[i].fragsnumber = 0;
00071 Request[i].reqid = 0;
00072 Request[i].Status = RIO_REQUEST_FREE;
00073 Request[i].Buffer = new char[BlockSize];
00074 Request[i].User = (void*) i;
00075 Request[i].Who = this;
00076 BufferStatus[i] = BUFFEREMPTY;
00077 Request[i].CallBackFunction = CRioMMObject::CallBack;
00078 }
00079
00080 for( unsigned int i = 0; i < 30; i++ )
00081 {
00082 calculete_RTT[ i ].block = 0;
00083 }
00084
00085 strcpy( MMObjectName, mmObjectName );
00086
00087 #ifdef RIO_EMUL
00088 duration = NULL;
00089 finalize_emul = false;
00090 for(int i = 0; i < 15000; i++) clrblk( i );
00091 #endif
00092
00093 pthread_mutex_init( &PlayThreadMutex, NULL );
00094 pthread_cond_init( &PlayThreadCondition, NULL );
00095
00096 srand( time( NULL ) );
00097 }
00098
00099
00100 CRioMMObject::~CRioMMObject()
00101 {
00102 if( Request )
00103 {
00104 for( RioBlock i = 0; i < nBuffers; i++ )
00105 {
00106
00107
00108
00109
00110
00111 }
00112 delete[] Request;
00113 }
00114 delete[] BufferStatus;
00115
00116 if( nMedias )
00117 {
00118 for( unsigned int i = 0; i < nMedias; i++ )
00119 delete Media[i];
00120 delete[] Media;
00121 }
00122 }
00123
00124 void CRioMMObject::Getmyaddr( int *my_address, int *my_port )
00125 {
00126 Object.Getmyaddr( my_address, my_port );
00127 }
00128
00129 #ifdef RIO_EMUL
00130
00131 void CRioMMObject::SetBlocksDuration( int *duration_blocks )
00132 {
00133 duration = duration_blocks;
00134 }
00135
00136
00137 int *CRioMMObject::GetBlocksDuration()
00138 {
00139 return( duration );
00140 }
00141
00142
00143 bool CRioMMObject::GetFinalizeEmul()
00144 {
00145 return( finalize_emul );
00146 }
00147 #endif
00148
00149
00150 int CRioMMObject::GetNBuffers()
00151 {
00152 return nBuffers;
00153 }
00154
00155
00156 char *CRioMMObject::GetName()
00157 {
00158 return( MMObjectName );
00159 }
00160
00161
00162 CRioObject *CRioMMObject::GetObject()
00163 {
00164 return( &Object );
00165 }
00166
00167
00168 int CRioMMObject::Open( RioAccess access, CRioStream *Stream )
00169 {
00170 int rc;
00171
00172 Access = access;
00173
00174 ofstream m_log;
00175
00176 rc = Object.Open( MMObjectName, Access, average_RTT, (int) nBuffers,
00177 Stream, m_log);
00178
00179 if( rc >= 0 )
00180 {
00181
00182 rc = Object.GetSize( &ObjSize );
00183 TotalBlocks = (int)(( (ObjSize - 1)/BlockSize ) + 1);
00184 }
00185 #ifdef RIO_DEBUG2
00186 else
00187 {
00188 RioErr << "[RioMMObject] Open: N�o foi poss�vel abrir objeto "
00189 << MMObjectName << ":" << GetErrorDescription( rc ) << endl;
00190 }
00191 #endif
00192
00193 #ifndef RIO_EMUL
00194 for( unsigned int i = 0; i < nMedias; i++ )
00195 Media[i]->Open( Access, Stream );
00196 #endif
00197
00198 return( rc );
00199 }
00200
00201
00202 int CRioMMObject::Close()
00203 {
00204 int closed;
00205
00206 #ifndef RIO_EMUL
00207 for( unsigned int i = 0; i < nMedias; i++ )
00208 Media[i]->Close();
00209 #endif
00210
00211 closed = Object.Close();
00212
00213 #ifdef RIO_DEBUG2
00214 RioErr << "[CRioMMObject] Object Close [" << closed << "]" << endl;
00215 #endif
00216
00217 return( closed );
00218 }
00219
00220
00221
00222
00223
00224 bool CRioMMObject::MulticastRead( RioBlock block, RioStreamType traffic )
00225 {
00226 bool status = false;
00227 int sendack = 0;
00228
00229 #ifdef RIO_DEBUG2
00230 RioErr << "[RioMMObject MulticastRead] Pedido realizado para bloco "
00231 << block << " traffic: " << traffic << endl;
00232 #endif
00233
00234 RioResult hResult = Object.MulticastStreamRead( block, sendack, traffic );
00235
00236 if( FAILED( hResult ) )
00237 {
00238 RioErr << "[CRioMMObject] \tUnexpected MMObject error "
00239 << "while reading object." << endl;
00240 }
00241 else
00242 {
00243 status = true;
00244 }
00245
00246 #ifdef RIO_EMUL
00247 if( tstblk( block ) )
00248 {
00249 RioErr << "[RioMMObject MulticastRead] Bloco " << block << " requisitado "
00250 << "mais de uma vez" << endl;
00251 }
00252 else
00253 {
00254 setblk( block );
00255 }
00256 #endif
00257
00258 return( status );
00259 }
00260
00261
00262 bool CRioMMObject::Read( int buffer, RioBlock block, RioStreamType traffic, int ip,
00263 int port )
00264 {
00265 bool status = false;
00266 int sendack = 0;
00267
00268 if( block < TotalBlocks )
00269 {
00270 Request[buffer].Block = block;
00271 Request[buffer].Result = S_OK;
00272 Request[buffer].Status = RIO_REQUEST_FREE;
00273 Request[buffer].Who = this;
00274 Request[buffer].reqid = -1;
00275 Request[buffer].Size = BlockSize;
00276
00277 #ifdef RIO_DEBUG2
00278 RioErr << "[RioMMObject Read] Pedido realizado para bloco " << block
00279 << " traffic: ";
00280 if( traffic == UNICASTTRAFFIC )
00281 RioErr << "UNICAST";
00282 else
00283 RioErr << "MULTICAST";
00284
00285 RioErr << ", port " << port << endl;
00286 #endif
00287
00288
00289 RioResult hResult = Object.StreamRead( &Request[buffer], sendack,
00290 traffic, ip, port );
00291
00292 if( FAILED( hResult ) )
00293 {
00294 #ifdef RIO_DEBUG2
00295 RioErr << "[CRioMMObject] \tUnexpected MMObject error "
00296 << "while reading object." << endl;
00297 #endif
00298
00299 status = false;
00300
00301 }
00302 else
00303 {
00304 status = true;
00305 }
00306
00307 if( ( bufferStream != NULL ) && ( traffic == UNICASTTRAFFIC ) )
00308
00309 {
00310 bufferStream->setUnicast( block );
00311 }
00312
00313 #ifdef RIO_EMUL
00314 if( tstblk( block ) )
00315 {
00316 RioErr << "[RioMMObject Read] Bloco " << block << " requisitado "
00317 << "mais de uma vez" << endl;
00318 }
00319 else
00320 {
00321 setblk( block );
00322 }
00323 #endif
00324 }
00325
00326 return( status );
00327 }
00328
00329
00330 void CRioMMObject::Prefetch()
00331 {
00332 RioBlock Block = 0;
00333 bool status = true;
00334
00335 while( Block < nBuffers )
00336 {
00337 status = status && Read( (int)Block, Block );
00338 Block++;
00339 }
00340
00341 for( unsigned int i = 0; i < nMedias; i++ )
00342 Media[i]->Prefetch();
00343 }
00344
00345
00346 bool CRioMMObject::WaitPrefetch()
00347 {
00348 bool WaitingPrefetch;
00349 unsigned int numPrefetchBlocks;
00350
00351 PlayThreadMutexLock( "ROwtpr1" );
00352
00353 numPrefetchBlocks = (nBuffers < TotalBlocks) ? nBuffers : TotalBlocks;
00354
00355 do
00356 {
00357 WaitingPrefetch = false;
00358 for( unsigned int i = 0; i < numPrefetchBlocks; i++ )
00359 {
00360 if( BufferStatus[i] == BUFFEREMPTY )
00361 {
00362 #ifdef RIO_DEBUG2
00363 RioErr << "WaitPrefetch - Buffer Empty. Posicao: " << i << endl;
00364 #endif
00365
00366 WaitingPrefetch = true;
00367 }
00368 }
00369
00370 if( WaitingPrefetch )
00371 PlayThreadWaitCondition( "ROwtpr1" );
00372 }
00373 while( WaitingPrefetch );
00374
00375 PlayThreadMutexUnlock( "ROwtpr1" );
00376
00377 #ifdef RIO_DEBUG2
00378 RioErr << "Saindo da waitprefetch " << endl;
00379 #endif
00380
00381 #ifndef RIO_EMUL
00382 for( unsigned int i = 0; i < nMedias; i++ )
00383 Media[i]->WaitPrefetch();
00384 #endif
00385
00386 return( true );
00387 }
00388
00389 int CRioMMObject::CanStart( void )
00390 {
00391 int status = Stream.CanStart();
00392
00393 return status;
00394 }
00395
00396 void CRioMMObject::AllocMedia( int qtdMedias )
00397 {
00398 if( nMedias )
00399 delete[] Media;
00400
00401 CurMedia = 0;
00402 nMedias = qtdMedias;
00403 if( nMedias )
00404 Media = new CRioMMObject *[ nMedias ];
00405 }
00406
00407
00408 bool CRioMMObject::AddMedia( CRioMMObject *media )
00409 {
00410 bool status;
00411
00412 status = false;
00413 if( CurMedia < nMedias )
00414 {
00415 Media[ CurMedia++ ] = media;
00416 status = true;
00417 }
00418
00419 return( status );
00420 }
00421
00422
00423 void CRioMMObject::SyncMedia( void *param )
00424 {
00425 for( unsigned int i = 0; i < nMedias; i++ )
00426 Media[i]->SyncMedia( param );
00427 }
00428
00429
00430 void CRioMMObject::Stop()
00431 {
00432 unsigned int i;
00433
00434 for( i = 0; i < nBuffers; i++ )
00435 {
00436 Request[i].User = (void*) i;
00437 BufferStatus[i] = BUFFEREMPTY;
00438 }
00439
00440 #ifndef RIO_EMUL
00441 for( i = 0; i < nMedias; i++ )
00442 Media[i]->Stop();
00443 #endif
00444 }
00445
00446 #ifdef RIO_EMUL
00447 void CRioMMObject::Quit()
00448 {}
00449 #endif
00450
00451
00452 int CRioMMObject::ProcessSignal( int sig, void *data )
00453 {
00454 return( 0 );
00455 }
00456
00457 void CRioMMObject::SetBufferStatus( RioBlock block, int status )
00458 {
00459 PlayThreadMutexLock( "sebufst1" );
00460
00461 BufferStatus[ block % nBuffers ] = status;
00462 if( status == BUFFERFULL )
00463 {
00464 fullBuffers++;
00465 PlayThreadReleaseCondition( "sebufst1" );
00466 }
00467 else
00468 fullBuffers--;
00469
00470 PlayThreadMutexUnlock( "sebufst1" );
00471 }
00472
00473
00474 int CRioMMObject::GetBufferStatus( RioBlock block )
00475 {
00476 return( BufferStatus[ block % nBuffers ] );
00477 }
00478
00479 int CRioMMObject::GetFullBuffers()
00480 {
00481 return fullBuffers;
00482 }
00483
00484
00485 void CRioMMObject::CallBack( struct RioRequest *request )
00486 {
00487 CRioMMObject *ptr;
00488
00489 #ifdef RIO_DEBUG2
00490 RioErr << "[RioMMObject] CallBack" << endl;
00491 #endif
00492
00493
00494
00495
00496 {
00497 #ifdef RANDOM_LOSS
00498 if( rand()<(RAND_MAX/2) )
00499 {
00500 }
00501 else
00502 #endif
00503 {
00504 ptr = (CRioMMObject *)request->Who;
00505 ptr->SetBufferStatus( (int) (long long int) request->User, BUFFERFULL );
00506
00507 #ifdef RIO_DEBUG2
00508 RioErr << "(Callback) BLOCK: " << request->User << endl;
00509 #endif
00510 }
00511 }
00512 }
00513
00514
00515 int CRioMMObject::GetNMedias()
00516 {
00517 return nMedias;
00518 }
00519
00520 CRioMMObject *CRioMMObject::GetMedia( unsigned n )
00521 {
00522 return Media[ n ];
00523 }
00524
00525
00526 void CRioMMObject::inputLog( int action, unsigned int block, unsigned int size )
00527 {
00528 }
00529
00530 unsigned int CRioMMObject::getCurrentPlayingBlock()
00531 {
00532 return 0;
00533 }
00534
00535 void CRioMMObject::PlayThreadMutexLock( string msg )
00536 {
00537 #ifdef RIO_DEBUG2
00538 if( msg.length() )
00539 RioErr << "[PlayThreadMutexLock] " << msg << "... " << endl;
00540 #endif
00541
00542 pthread_mutex_lock( &PlayThreadMutex );
00543
00544 #ifdef RIO_DEBUG2
00545 if( msg.length() )
00546 RioErr << "..." << msg << " OK!" << endl;
00547 #endif
00548 }
00549
00550 void CRioMMObject::PlayThreadMutexUnlock( string msg )
00551 {
00552 #ifdef RIO_DEBUG2
00553 if( msg.length() )
00554 RioErr << "[PlayThreadMutexUnlock] " << msg << endl;
00555 #endif
00556
00557 pthread_mutex_unlock( &PlayThreadMutex );
00558 }
00559
00560 void CRioMMObject::PlayThreadWaitCondition( string msg )
00561 {
00562 #ifdef RIO_DEBUG2
00563 if( msg.length() )
00564 RioErr << "[PlayThreadWaitCondition] " << msg << "... " << endl;
00565 #endif
00566
00567 pthread_cond_wait( &PlayThreadCondition, &PlayThreadMutex );
00568
00569 #ifdef RIO_DEBUG2
00570 if( msg.length() )
00571 RioErr << "..." << msg << " OK!" << endl;
00572 #endif
00573 }
00574
00575 int CRioMMObject::PlayThreadWaitCondition( struct timespec *t, string msg )
00576 {
00577 #ifdef RIO_DEBUG2
00578 if( msg.length() )
00579 RioErr << "[PlayThreadWaitCondition] " << msg << "... " << endl;
00580 #endif
00581
00582 int rc = pthread_cond_timedwait( &PlayThreadCondition,
00583 &PlayThreadMutex, t );
00584
00585 #ifdef RIO_DEBUG2
00586 if( msg.length() )
00587 RioErr << "..." << msg << " OK!" << endl;
00588 #endif
00589
00590 return rc;
00591 }
00592
00593 void CRioMMObject::PlayThreadReleaseCondition( string msg )
00594 {
00595 pthread_cond_broadcast( &PlayThreadCondition );
00596 }
00597
00598 #ifdef RIO_DEBUG2
00599 void CRioMMObject::printDebug( short debugCode, string message )
00600 {
00601 Object.printDebug( debugCode, message );
00602 }
00603 #endif