00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "StreamManager.h"
00039 #include "SystemManager.h"
00040 #include "SessionManager.h"
00041 #include "NetMgr.h"
00042 #include "timer.h"
00043 #include "Event.h"
00044
00045 #include "RioError.h"
00046
00047 #include <string.h>
00048 #include <errno.h>
00049
00050 #include <values.h>
00051 #include <stdio.h>
00052 #include <stdlib.h>
00053 #include "ObjMapMgr.h"
00054 #include <math.h>
00055 #include <sys/socket.h>
00056 #include <netinet/in.h>
00057 #include <arpa/inet.h>
00058 #include "iostream"
00059
00060 double getInterval( struct timeval initial, struct timeval final );
00061 extern void printMemUsed();
00062
00063 const char* const LOGSCACT = "SampleCACTime.log";
00064 const char* const LOGSAPT = "SampleAdmissionProcessTime.log";
00065 const char* const LOGSGENREQLT = "SampleGenReqListTime.log";
00066 const char* const LOGSSORTLT = "SampleSortListTime.log";
00067 const char* const LOGSBUFFERT = "SampleBufferTime.log";
00068 const char* const LOGSSIMULT = "SampleSimulationTime.log";
00069 const char* const LOGSWAITINGT = "SampleWaitingTime.log";
00070 const char* const LOGEBUFFERT = "AverageBufferTime.log";
00071 const char* const LOGECACT = "AverageCACTime.log";
00072 const char* const LOGEAPT = "AverageAdmissionProcessTime.log";
00073 const char* const LOGEGENREQLT = "AverageGenReqListTime.log";
00074 const char* const LOGESORTLT = "AverageSortListTime.log";
00075 const char* const LOGESIMULT = "AverageSimulationTime.log";
00076 const char* const LOGEWAITINGT = "AverageWaitingTime.log";
00077
00078
00079 const char* const LOGFILE = "RIOStreamManager.log";
00080
00081 const u64 STREAM_MANAGER_TIMER_INTERVAL = 100000;
00082
00083 extern CTimer Timer;
00084 extern CEventManager EventManager;
00085
00086
00087
00088 const int RELEASE_TIME_TOLERANCE= 100000;
00089
00090
00091
00092
00093
00094 void CRequestQueue::Put( EventDataRequest* Request )
00095 {
00096 m_Queue.Put( (LocalQueueEle*) Request);
00097 }
00098
00099
00100
00101 EventDataRequest* CRequestQueue::Get()
00102 {
00103 return(EventDataRequest*) m_Queue.Get();
00104 }
00105
00106
00107
00108
00109
00110 RioStream* CStreamList::First()
00111 {
00112 return( RioStream* ) m_Queue.First();
00113 }
00114
00115
00116
00117 void CStreamList::Put( RioStream* Stream )
00118 {
00119 m_Queue.Put(( LocalQueueEle* ) Stream );
00120 }
00121
00122
00123
00124 void CStreamList::Remove( RioStream* Stream )
00125 {
00126 m_Queue.Remove(( LocalQueueEle* ) Stream );
00127 }
00128
00129
00130
00131
00132 CStreamManager::CStreamManager()
00133 {
00134 m_BlockSize = 0;
00135 m_MaxStreams = 0;
00136 m_used = 0;
00137 m_UsedRate = 0.0;
00138 m_TotalRate = 0.0;
00139 m_NRTReservedRate = 0.0;
00140 m_MaxActiveRequests = 0;
00141 m_MaxRTRequests = 0;
00142 m_MaxNRTRequests = 0;
00143 m_ExtraStreamRequests = 0;
00144 m_HoldingNonRealTime = false;
00145 m_ActiveRequests = 0;
00146 m_nDisks = 0;
00147 m_TimerId = 0;
00148
00149
00150
00151
00152 m_VectorStream = NULL;
00153 m_NextNRTStream = NULL;
00154 m_MutexTable = new CMutex;
00155 m_ObjectManager = NULL;
00156 m_SystemManager = NULL;
00157 m_Router = NULL;
00158 m_NetMgr = NULL;
00159 m_initialized = false;
00160
00161
00162 m_FileRoot = NULL;
00163 m_UseServerSideBuffers = false;
00164 m_UseNewCAC = false;
00165 m_CollectMeasures = false;
00166 m_MaxPendingRequests = 0;
00167 m_MaxNumberOfDisks = 0;
00168 m_NumberOfBuffersForEachClient = 0;
00169 m_BurstSizeOfEachClient = 0;
00170 m_MaxIntervalEmpty = 0.0;
00171 m_NumberOfEmptyTimes = 0;
00172 pthread_mutex_init( &MutexModifyClientsList , NULL );
00173 pthread_mutex_init( &MutexRunCAC , NULL );
00174 pthread_mutex_init( &MutexUpdateMeasures , NULL );
00175 pthread_mutex_init( &MutexUpdateNumberOfWaitingClients, NULL );
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191 m_lastID = 0;
00192
00193 memset( m_EstimatedDiskServiceTime, 0,
00194 sizeof( m_EstimatedDiskServiceTime ) );
00195 memset( m_EstimatedDiskServiceTimeOfDisk, 0,
00196 sizeof( m_EstimatedDiskServiceTimeOfDisk ) );
00197 memset( m_EstimatedDiskResponseTime, 0,
00198 sizeof( m_EstimatedDiskResponseTime ) );
00199 memset( m_EstimatedDiskResponseTimeOfDisk, 0,
00200 sizeof( m_EstimatedDiskResponseTimeOfDisk ) );
00201 memset( m_DeviationDiskResponseTime, 0,
00202 sizeof( m_DeviationDiskResponseTime ) );
00203 memset( m_DeviationDiskResponseTimeOfDisk, 0,
00204 sizeof( m_DeviationDiskResponseTimeOfDisk ) );
00205 memset( m_EstimatedDiskQueueTime, 0,
00206 sizeof( m_EstimatedDiskQueueTime ) );
00207 memset( m_EstimatedDiskQueueTimeOfDisk, 0,
00208 sizeof( m_EstimatedDiskQueueTimeOfDisk ) );
00209
00210 m_AverageCACTime = 0.0;
00211 m_VarianceCACTime = 0.0;
00212 m_NumberOfSamples = 0;
00213 m_AverageAdmissionProcessTime = 0.0;
00214 m_VarianceAdmissionProcessTime = 0.0;
00215 m_AverageSimulationTime = 0.0;
00216 m_VarianceSimulationTime = 0.0;
00217 m_AverageBufferTime = 0.0;
00218 m_VarianceBufferTime = 0.0;
00219 m_AverageGenReqListTime = 0.0;
00220 m_VarianceGenReqListTime = 0.0;
00221 m_AverageSortListTime = 0.0;
00222 m_VarianceSortListTime = 0.0;
00223 m_AverageWaitingTime = 0.0;
00224 m_VarianceWaitingTime = 0.0;
00225
00226 memset( m_AverageCACTimeAccClients, 0,
00227 sizeof( m_AverageCACTimeAccClients ));
00228 memset( m_VarianceCACTimeAccClients, 0,
00229 sizeof( m_VarianceCACTimeAccClients ));
00230 memset( m_SamplesCACTimeAccClients, 0,
00231 sizeof( m_SamplesCACTimeAccClients ));
00232
00233 memset( m_AverageAdmissionProcessTimeAccClients, 0,
00234 sizeof( m_AverageAdmissionProcessTimeAccClients ));
00235 memset( m_VarianceAdmissionProcessTimeAccClients, 0,
00236 sizeof( m_VarianceAdmissionProcessTimeAccClients ));
00237 memset( m_SamplesAdmissionProcessTimeAccClients, 0,
00238 sizeof( m_SamplesAdmissionProcessTimeAccClients ));
00239
00240 memset( m_AverageSimulationTimeAccClients, 0,
00241 sizeof( m_AverageSimulationTimeAccClients ));
00242 memset( m_VarianceSimulationTimeAccClients, 0,
00243 sizeof( m_VarianceSimulationTimeAccClients ));
00244 memset( m_SamplesSimulationTimeAccClients, 0,
00245 sizeof( m_SamplesSimulationTimeAccClients ));
00246
00247 memset( m_AverageSortListTimeAccClients, 0,
00248 sizeof( m_AverageSortListTimeAccClients ));
00249 memset( m_VarianceSortListTimeAccClients, 0,
00250 sizeof( m_VarianceSortListTimeAccClients ));
00251 memset( m_SamplesSortListTimeAccClients, 0,
00252 sizeof( m_SamplesSortListTimeAccClients ));
00253
00254 memset( m_AverageWaitingTimeAccClients, 0,
00255 sizeof( m_AverageWaitingTimeAccClients ));
00256 memset( m_VarianceWaitingTimeAccClients, 0,
00257 sizeof( m_VarianceWaitingTimeAccClients ));
00258 memset( m_SamplesWaitingTimeAccClients, 0,
00259 sizeof( m_SamplesWaitingTimeAccClients ));
00260
00261 memset( m_AverageWaitingTimeAccQueue, 0,
00262 sizeof( m_AverageWaitingTimeAccQueue ));
00263 memset( m_VarianceWaitingTimeAccQueue, 0,
00264 sizeof( m_VarianceWaitingTimeAccQueue ));
00265 memset( m_SamplesWaitingTimeAccQueue, 0,
00266 sizeof( m_SamplesWaitingTimeAccQueue ));
00267
00268 memset( m_AverageBufferTimeAccClients, 0,
00269 sizeof( m_AverageBufferTimeAccClients ));
00270 memset( m_VarianceBufferTimeAccClients, 0,
00271 sizeof( m_VarianceBufferTimeAccClients ));
00272 memset( m_SamplesBufferTimeAccClients, 0,
00273 sizeof( m_SamplesBufferTimeAccClients ));
00274
00275 m_EstimatedTimeParameter = 0.0;
00276 m_NetworkRate = 0.0;
00277 m_AdmittedClientsList = NULL;
00278 m_NumberOfAdmittedClients = 0;
00279 m_NumberOfWaitingClients = 0;
00280
00281 memset( ClientBuffer, 0, sizeof( ClientBuffer ) );
00282 memset( PlayBlock, 0, sizeof( PlayBlock ) );
00283 memset( TotalBlocks, 0, sizeof( TotalBlocks ) );
00284 memset( NextBlockToSend, 0, sizeof( NextBlockToSend ) );
00285 memset( ServerBuffer, 0, sizeof( ServerBuffer ) );
00286 memset( NReqWhenEmptyBuffer, 0, sizeof( NReqWhenEmptyBuffer ) );
00287 memset( NHiccups, 0, sizeof( NHiccups ) );
00288 memset( Initial_time, 0, sizeof( Initial_time ) );
00289 memset( ClientRTT, 0, sizeof( ClientRTT ) );
00290 memset( TimeBufferEmpty, 0, sizeof( TimeBufferEmpty ) );
00291 memset( MaxIntervalBufferEmpty, 0, sizeof( MaxIntervalBufferEmpty ) );
00292
00293
00294
00295 m_LogsDirectory = NULL;
00296
00297
00298
00299 m_NetInterface = NULL;
00300 }
00301
00302
00303 CStreamManager::~CStreamManager()
00304 {
00305 m_ObjectManager = 0;
00306 m_Router = 0;
00307
00308
00309 Timer.FreeTimer( m_TimerId );
00310
00311 Timer.Stop();
00312
00313 delete m_MutexTable;
00314
00315 if( m_NetMgr )
00316 {
00317 delete m_NetMgr;
00318 m_NetMgr = 0;
00319 }
00320
00321
00322 if( m_NetInterface != NULL )
00323 {
00324 delete m_NetInterface;
00325 m_NetInterface = NULL;
00326 }
00327
00328 if( m_VectorStream != 0 )
00329 {
00330 delete[] m_VectorStream;
00331 m_VectorStream = 0;
00332 }
00333 delete[] m_FileRoot;
00334 m_FileRoot = 0;
00335 if( m_log.is_open() ) m_log.close();
00336 }
00337
00338
00339
00340
00341
00342 int CStreamManager::Initialize( StreamManagerConfig * Config )
00343 {
00344
00345 char LogFileName[ MaxPathSize ];
00346
00347 m_LogsDirectory = Config->LogsDirectory;
00348
00349 #ifdef RIO_DEBUG1
00350 RioErr << "### [CStreamManager - Initialize] Start" << endl;
00351 #endif
00352
00353 if( Config->GenerateLogs )
00354 {
00355
00356 strcpy( LogFileName, m_LogsDirectory );
00357 strcat( LogFileName, LOGFILE );
00358 m_log.open( LogFileName );
00359
00360 if( !m_log.is_open() )
00361 {
00362 return ERROR_STREAMMANAGER + ERROR_LOGFILE;
00363 }
00364 }
00365
00366
00367 if( m_initialized )
00368 {
00369 RioErr << "Initialize(): Tried to initialize component already"
00370 " initialized" << endl;
00371 return ERROR_STREAMMANAGER + ERROR_INITIALIZED;
00372 }
00373
00374
00375 if( !m_MutexTable->IsOpen() )
00376 {
00377 RioErr << "Initialize(): Failed to create mutex" << endl;
00378 RioErr << " SYSTEM ERROR: " << strerror(errno) << endl;
00379 return ERROR_STREAMMANAGER + ERROR_CREATE_MUTEX;
00380 }
00381
00382 #ifdef RIO_DEBUG2
00383 RioErr << "### [CStreamManager - Initialize] Antes de new m_VectorStream"
00384 << endl;
00385 #endif
00386
00387
00388 m_VectorStream = new RioStream [Config->MaxStreams];
00389 if( m_VectorStream == 0 )
00390 {
00391 RioErr << "StreamManager.Start: Failed to alocate memory for Streams"
00392 << endl;
00393 return ERROR_STREAMMANAGER + ERROR_MEMORY;
00394
00395 }
00396
00397
00398 for( int i = 0; i < Config->MaxStreams; i++ )
00399 {
00400 m_VectorStream[i].Initialize( this );
00401 m_FreeList.Put( &m_VectorStream[i] );
00402 }
00403
00404 #ifdef RIO_DEBUG2
00405 RioErr << "### [CStreamManager - Initialize] Depois de new m_VectorStream"
00406 << endl;
00407 #endif
00408
00409
00410
00411
00412
00413 m_NetMgr = new NetMgr( true );
00414 if( !m_NetMgr )
00415 {
00416 delete[] m_VectorStream;
00417 m_VectorStream = 0;
00418 RioErr << "StreamManager.Start: new NetMgr() failed" << endl;
00419 return ERROR_STREAMMANAGER + ERROR_MEMORY;
00420 }
00421
00422
00423
00424 try
00425 {
00426 m_NetInterface = new CNetInterface;
00427 }
00428 catch( bad_alloc& ba )
00429 {
00430 delete[] m_NetMgr;
00431 m_NetMgr = NULL;
00432 RioErr << "StreamManager.Start: new CNetInterface failed with error "
00433 << ba.what() << endl;
00434 return ERROR_STREAMMANAGER + ERROR_MEMORY;
00435 }
00436
00437
00438
00439 #ifdef RIO_DEBUG_FILE
00440 char tmpName[ MaxPathSize ];
00441 char tmpDomain[ MaxPathSize ];
00442
00443
00444 strcpy( LogFileName, m_LogsDirectory );
00445
00446 gethostname( tmpName, MaxPathSize - 1 );
00447
00448 getdomainname( tmpDomain, MaxPathSize - 1 );
00449
00450
00451 strcat( LogFileName, "RIOServerEmul_" );
00452 strcat( LogFileName, tmpName );
00453
00454
00455 if( strstr( tmpName, tmpDomain ) == NULL )
00456 {
00457 strcat( LogFileName, "." );
00458 strcat( LogFileName, tmpDomain );
00459 }
00460 strcat( LogFileName, ".log" );
00461
00462 int rc = m_NetMgr->Start( htons( RIOSERVERUDPPORT ), Config->BlockSize,
00463 FRAGMENTSIZE, LogFileName );
00464
00465
00466 int rc2 = m_NetInterface->Start( htons( RIOSERVERUDPPORT ),
00467 Config->ClientsTimeOut,
00468 NETTCPTIMEOUTSECONDS,
00469 NETTCPTIMEOUTSECONDS, NULL, false,
00470 LogFileName );
00471
00472 #else
00473
00474 int rc = m_NetMgr->Start( htons( RIOSERVERUDPPORT ), Config->BlockSize,
00475 FRAGMENTSIZE );
00476
00477
00478 int rc2 = m_NetInterface->Start( htons( RIOSERVERUDPPORT ),
00479 Config->ClientsTimeOut,
00480 NETTCPTIMEOUTSECONDS,
00481 NETTCPTIMEOUTSECONDS );
00482
00483 #endif
00484
00485 if( rc )
00486 {
00487 RioErr << "StreamManager.Start: NetMgr.Start failed rc "
00488 << (int *) rc << endl;
00489 return rc;
00490 }
00491
00492 if( rc2 )
00493 {
00494 RioErr << "StreamManager.Start: NetInterface.Start failed rc2 "
00495 << (int *) rc2 << endl;
00496 return rc2;
00497 }
00498
00499 m_TimerId = Timer.GetTimer( Timeout );
00500 if( m_TimerId < 0 )
00501 {
00502 RioErr << "Initialize(): Failed to Get timer" << endl;
00503 return m_TimerId;
00504 }
00505
00506
00507 m_ObjectManager = Config->ObjectManager;
00508 m_SystemManager = Config->SystemManager;
00509
00510
00511 m_Router = Config->Router;
00512 m_used = 0;
00513 m_BlockSize = Config->BlockSize;
00514 m_MaxStreams = Config->MaxStreams;
00515 m_TotalRate = Config->TotalRate;
00516 m_NRTReservedRate = Config->NRTReservedRate;
00517
00518 m_FileRoot = new char[ strlen( Config->FileRoot ) + 1 ];
00519 strcpy( m_FileRoot, Config->FileRoot );
00520 m_MaxPendingRequests = Config->MaxPending;
00521 m_NumberOfBuffersForEachClient = Config->NumberOfBuffersForEachClient;
00522 m_BurstSizeOfEachClient = Config->BurstSizeOfEachClient;
00523 m_EstimatedTimeParameter = Config->EstimatedTimeParameter;
00524 m_NetworkRate = Config->NetworkRate;
00525 m_UseServerSideBuffers = Config->UseServerSideBuffers;
00526 m_UseNewCAC = Config->UseNewCAC;
00527 m_CollectMeasures = Config->CollectMeasures;
00528 m_NumberOfEmptyTimes = Config->NumberOfEmptyTimes;
00529 m_MaxIntervalEmpty = Config->MaxIntervalEmpty;
00530 m_MaxNumberOfDisks = m_Router->GetMaxNumberOfDisks();
00531
00532 if( m_CollectMeasures )
00533 {
00534
00535
00536 strcpy( LogFileName, m_LogsDirectory );
00537 strcat( LogFileName, LOGSCACT );
00538 m_logSCACT.open( LogFileName );
00539 strcpy( LogFileName, m_LogsDirectory );
00540 strcat( LogFileName, LOGECACT );
00541 m_logECACT.open( LogFileName );
00542 strcpy( LogFileName, m_LogsDirectory );
00543 strcat( LogFileName, LOGSAPT );
00544 m_logSAPT.open( LogFileName );
00545 strcpy( LogFileName, m_LogsDirectory );
00546 strcat( LogFileName, LOGEAPT );
00547 m_logEAPT.open( LogFileName );
00548 strcpy( LogFileName, m_LogsDirectory );
00549 strcat( LogFileName, LOGSSIMULT );
00550 m_logSSIMULT.open( LogFileName );
00551 strcpy( LogFileName, m_LogsDirectory );
00552 strcat( LogFileName, LOGESIMULT );
00553 m_logESIMULT.open( LogFileName );
00554 strcpy( LogFileName, m_LogsDirectory );
00555 strcat( LogFileName, LOGSGENREQLT );
00556 m_logSGENREQLT.open( LogFileName );
00557 strcpy( LogFileName, m_LogsDirectory );
00558 strcat( LogFileName, LOGEGENREQLT );
00559 m_logEGENREQLT.open( LogFileName );
00560 strcpy( LogFileName, m_LogsDirectory );
00561 strcat( LogFileName, LOGSBUFFERT );
00562 m_logSBUFFERT.open( LogFileName );
00563 strcpy( LogFileName, m_LogsDirectory );
00564 strcat( LogFileName, LOGEBUFFERT );
00565 m_logEBUFFERT.open( LogFileName );
00566 strcpy( LogFileName, m_LogsDirectory );
00567 strcat( LogFileName, LOGSSORTLT );
00568 m_logSSORTLT.open( LogFileName );
00569 strcpy( LogFileName, m_LogsDirectory );
00570 strcat( LogFileName, LOGESORTLT );
00571 m_logESORTLT.open( LogFileName );
00572 strcpy( LogFileName, m_LogsDirectory );
00573 strcat( LogFileName, LOGSWAITINGT );
00574 m_logSWAITINGT.open( LogFileName );
00575 strcpy( LogFileName, m_LogsDirectory );
00576 strcat( LogFileName, LOGEWAITINGT );
00577 m_logEWAITINGT.open( LogFileName );
00578
00579 m_logSCACT << "# Measures in msec." << endl;
00580 m_logECACT << "# Measures in msec." << endl;
00581 m_logSSIMULT << "# Measures in msec." << endl;
00582 m_logESIMULT << "# Measures in msec." << endl;
00583 m_logSGENREQLT << "# Measures in msec." << endl;
00584 m_logEGENREQLT << "# Measures in msec." << endl;
00585 m_logSBUFFERT << "# Measures in msec." << endl;
00586 m_logEBUFFERT << "# Measures in msec." << endl;
00587 m_logSSORTLT << "# Measures in msec." << endl;
00588 m_logESORTLT << "# Measures in msec." << endl;
00589 m_logSWAITINGT << "# Measures in msec." << endl;
00590 m_logEWAITINGT << "# Measures in msec." << endl;
00591 }
00592
00593
00594 m_UsedRate = 0.0;
00595 m_HoldingNonRealTime = false;
00596 m_ActiveRequests = 0;
00597 m_NextNRTStream = 0;
00598 m_nDisks = Config->nDisks;
00599
00600 m_ExtraStreamRequests = Config->ExtraStreamRequests;
00601
00602
00603
00604
00605 m_MaxActiveRequests = (Config->MaxDiskQueueSize + 2);
00606
00607
00608 m_MaxActiveRequests *= Config->nDisks;
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620 m_MaxRTRequests += m_MaxActiveRequests * 3 +
00621 ( m_ExtraStreamRequests * 2 * m_MaxStreams );
00622
00623 m_MaxNRTRequests += m_MaxActiveRequests * 3 +
00624 ( m_ExtraStreamRequests * 2 * m_MaxStreams );
00625
00626 #ifdef RIO_DEBUG2
00627 if( m_log.is_open() )
00628 {
00629 m_log <<"m_MaxRTRequests = m_MaxNRTRequests = " << m_MaxRTRequests
00630 << endl <<"m_MaxActiveRequests = " << m_MaxActiveRequests << endl
00631 << "m_NumberOfEmptyTimes = " << m_NumberOfEmptyTimes * 1.0 << endl
00632 <<"m_MaxIntervalEmpty = " << m_MaxIntervalEmpty << endl;
00633 }
00634 #endif
00635
00636
00637 #ifdef RIO_DEBUG2
00638 RioErr << "### [CStreamManager - Initialize] Antes de criar os eventos "
00639 << "EventTypeRTDataRequest - numero de eventos " << m_MaxRTRequests
00640 << endl;
00641 #endif
00642
00643 EventManager.Initialize( EventTypeRTDataRequest, m_MaxRTRequests );
00644
00645 #ifdef RIO_DEBUG2
00646 RioErr << "### [CStreamManager - Initialize] Antes de criar os eventos "
00647 << "EventTypeNRTDataRequest - numero de eventos " << m_MaxNRTRequests
00648 << endl;
00649 #endif
00650
00651 EventManager.Initialize( EventTypeNRTDataRequest, m_MaxNRTRequests );
00652
00653 #ifdef RIO_DEBUG2
00654 RioErr << "### [CStreamManager - Initialize] Depois de criar os eventos"
00655 << endl;
00656 #endif
00657
00658 m_initialized = true;
00659
00660
00661 struct timeval interval;
00662 interval.tv_sec = 0;
00663 interval.tv_usec = STREAM_MANAGER_TIMER_INTERVAL;
00664
00665
00666 Timer.StartTimer( m_TimerId, interval, TimerTypePeriodic, this );
00667
00668 #ifdef RIO_DEBUG1
00669 RioErr << "### [CStreamManager - Initialize] End" << endl;
00670 #endif
00671
00672 return S_OK;
00673 }
00674
00675
00676 int CStreamManager::Open( RioStreamTraffic *Traffic, RioStream **Stream,
00677 SOCKADDR_IN RemoteAddress )
00678 {
00679
00680 if( !m_initialized )
00681 {
00682 RioErr << "Open(): Component not initialized" << endl;
00683 return( ERROR_STREAMMANAGER + ERROR_NOT_INITIALIZED);
00684 }
00685
00686 RioTrafficType type = Traffic->Type;
00687 RioStreamDirection direction = Traffic->Direction;
00688
00689
00690 if(( direction != RioStreamDirectionRead ) && ( type != RIO_TRAFFIC_NRT ))
00691 {
00692 return ERROR_STREAMMANAGER + ERROR_INVALID_DIRECTION;
00693 }
00694
00695 m_MutexTable->Wait();
00696
00697
00698 if( m_used >= m_MaxStreams )
00699 {
00700 m_MutexTable->Release();
00701 RioErr << "StreamManager: max number of streams !!!" << endl;
00702 return ERROR_STREAMMANAGER + ERROR_MAX_STREAMS;
00703 }
00704
00705 double rate = 0.0;
00706
00707
00708 if( type == RIO_TRAFFIC_CBR )
00709 {
00710 rate = Traffic->TrafficCBR.Rate;
00711
00712 if( m_log.is_open() )
00713 {
00714 char *info = myInfo();
00715 m_log << endl
00716 << "-------------" << info << "---------------"
00717 << endl
00718 << "StreamManager: active streams " << m_used
00719 << ". New traffic CBR. Used rate " << m_UsedRate
00720 << " request rate " << rate << endl;
00721 free( info );
00722 }
00723
00724
00725
00726 if( ( m_UsedRate + rate ) > ( m_TotalRate - m_NRTReservedRate ) )
00727 {
00728 m_MutexTable->Release();
00729
00730 if( m_log.is_open() )
00731 m_log << "StreamManager: Could not admit new user." << endl;
00732
00733 return ERROR_STREAMMANAGER + ERROR_STREAM_REFUSED;
00734 }
00735 }
00736 else
00737 {
00738 if( ( type != RIO_TRAFFIC_NRT ) && ( type != RIO_TRAFFIC_VBR ) )
00739 {
00740 m_MutexTable->Release();
00741 return ERROR_STREAMMANAGER + ERROR_INVALID_TRAFFIC;
00742 }
00743 if( type == RIO_TRAFFIC_VBR )
00744 {
00745 rate = Traffic->TrafficVBR.Rate;
00746 }
00747
00748 if( m_log.is_open() )
00749 {
00750 char *info = myInfo();
00751 m_log << endl
00752 << "-------------" << info << "---------------"
00753 << endl
00754 << "StreamManager: active streams " << m_used
00755 << ". New traffic NRT or VBR." << endl;
00756 free( info );
00757 }
00758 }
00759
00760 if( Traffic->LogicalBlockSize != m_BlockSize )
00761 {
00762 if( m_log.is_open() )
00763 m_log << "[StreamManager] Error: Invalid BlockSize." << endl;
00764
00765 m_MutexTable->Release();
00766 return ERROR_STREAMMANAGER + ERROR_INVALID_BLOCKSIZE;
00767 }
00768
00769
00770 RioStream *sp = m_FreeList.First();
00771 m_FreeList.Remove( sp );
00772
00773
00774 m_used++;
00775
00776
00777 int MaxRequests;
00778 int UserMaxRequests;
00779
00780
00781
00782
00783
00784
00785 if( type == RIO_TRAFFIC_NRT )
00786 {
00787 MaxRequests = ( int ) ( m_MaxActiveRequests * 1.1 );
00788 UserMaxRequests = m_MaxActiveRequests;
00789 }
00790
00791
00792
00793
00794
00795 else
00796 {
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809 MaxRequests = 50;
00810 UserMaxRequests = 50;
00811
00812 }
00813
00814
00815 if(type == RIO_TRAFFIC_CBR )
00816 {
00817
00818 m_UsedRate += rate;
00819 }
00820
00821
00822
00823 sp->s_Rate = rate;
00824 sp->s_Direction = Traffic->Direction;
00825
00826 sp->m_RemoteAddress = RemoteAddress;
00827 sp->m_Status = RioStream::StreamStatusOpened;
00828 sp->m_Type = type;
00829 sp->m_nRequests = 0;
00830 sp->m_nQueue = 0;
00831 sp->m_MaxRequests = MaxRequests;
00832 sp->m_UserMaxRequests = UserMaxRequests;
00833
00834 sp->m_AverageRTTtoClient_msec = 0;
00835 sp->m_ClientBufferSize = 0;
00836 sp->m_ServerBufferStatus = 0;
00837 sp->m_RequestList = NULL;
00838 sp->m_PosAtClientsList = NULL;
00839 sp->m_Buffer = NULL;
00840 sp->m_TokensToSend = 0;
00841 sp->m_PrefetchedAllBlocks = false;
00842 sp->m_StartingToSend = true;
00843 sp->m_TokensToDisk = 0;
00844 sp->m_WaitingQueueSize = 0;
00845 sp->m_TotalBlocks = 0;
00846 sp->m_SampleGenReqListTime = 0;
00847 sp->m_SampleSortListTime = 0;
00848 sp->m_SampleSimulationTime = 0;
00849 sp->m_SampleCACTime = 0;
00850 sp->m_SampleAdmissionProcessTime = 0;
00851 sp->m_SampleWaitingTime = 0;
00852 sp->m_WaitingQueueSize = 0;
00853
00854 sp->m_Id = m_lastID = (m_lastID + 1 ) % 65000;
00855
00856
00857 sp->s_streamid = (int) m_used;
00858 sp->m_RequestCount = 0;
00859 sp->m_QueuedCount = 0;
00860 sp->m_NextRelease.tv_sec = 0;
00861 sp->m_NextRelease.tv_usec = 0;
00862
00863 long double arrival_interval = (( m_BlockSize / rate ) * 1000000.0 );
00864 sp->m_ArrivalInterval.tv_sec = (u64) ( arrival_interval / 1000000 );
00865 sp->m_ArrivalInterval.tv_usec = (u64) ( (u64) arrival_interval % 1000000 );
00866
00867 #ifdef RIO_DEBUG2
00868 char Logfile[ MaxPathSize ];
00869 sprintf( Logfile, "%sStream.%d.log", m_LogsDirectory, sp->m_Id );
00870 sp->m_log.open( Logfile );
00871 if( sp->m_log.is_open() )
00872 {
00873 sp->m_log << "Arrival Interval " << arrival_interval/1000.0 << " msec"
00874 << endl << "MaxRequests: " << sp->m_MaxRequests
00875 << " UserMaxRequests: " << sp->m_UserMaxRequests << endl;
00876 }
00877 #endif
00878
00879 #ifdef RIO_DEBUG2
00880 char LogRfile[ MaxPathSize ];
00881 sprintf( LogRfile, "%sStream.%d.requests", m_LogsDirectory, sp->m_Id);
00882 sp->m_log_requests.open( LogRfile );
00883 if( sp->m_log_requests.is_open() )
00884 sp->m_log_requests << "Block\tEstimatedTime\tTime"<<endl;
00885 #endif
00886
00887 *Stream = sp;
00888
00889 m_MutexTable->Release();
00890
00891 return S_OK;
00892 }
00893
00894
00895 void CStreamManager::PostITEvent( MonitorEvent *event )
00896 {
00897 m_SystemManager->PostITEvent( event );
00898 }
00899
00900 void CStreamManager::Timeout( void* Param )
00901 {
00902 CStreamManager* Mgr = ( CStreamManager* ) Param;
00903
00904
00905 Mgr->m_MutexTable->Wait();
00906
00907
00908
00909 RioStream* next;
00910 RioStream* stream = Mgr->m_RTHoldList.First();
00911
00912 while( stream != 0 )
00913 {
00914 next = stream->Next;
00915 stream->ProcessRTQueue();
00916 stream = next;
00917 }
00918
00919
00920
00921 stream = Mgr->m_NextNRTStream;
00922 if( stream == 0 )
00923 {
00924 stream = Mgr->m_NRTHoldList.First();
00925 }
00926
00927 while((stream != 0) && (Mgr->m_ActiveRequests < Mgr->m_MaxActiveRequests))
00928 {
00929 next = stream->Next;
00930 stream->ProcessNRTQueue();
00931 stream = next;
00932
00933 if( stream == 0 )
00934 {
00935 stream = Mgr->m_NRTHoldList.First();
00936 }
00937 }
00938
00939
00940 Mgr->m_NextNRTStream = stream;
00941
00942
00943 Mgr->m_MutexTable->Release();
00944 }
00945
00946
00947
00948
00949
00950
00951
00952
00953 void CStreamManager::SetNumberOfDisks( unsigned int NumberOfDisks )
00954 {
00955 m_nDisks = NumberOfDisks;
00956 }
00957
00958
00959
00960
00961 RioStream::RioStream( )
00962 {
00963 s_mgr = NULL;
00964 s_streamid = 0;
00965 s_Direction = RioStreamDirectionRead;
00966 s_Rate = 0.0;
00967 m_Status = StreamStatusClosed;
00968 m_Type = RIO_TRAFFIC_INVALID;
00969 m_nRequests = 0;
00970 m_MaxRequests = 0;
00971 m_UserMaxRequests = 0;
00972 m_nQueue = 0;
00973 memset( (void *)&m_NextRelease, 0, sizeof( m_NextRelease ) );
00974 memset( (void *)&m_ArrivalInterval, 0, sizeof( m_ArrivalInterval ) );
00975 m_RequestCount = 0;
00976 m_QueuedCount = 0;
00977
00978 memset( (void *)&m_LastArrivalTime , 0, sizeof( m_LastArrivalTime ) );
00979 memset( (void *)&m_TokensToDiskClock, 0, sizeof( m_TokensToDiskClock ) );
00980 memset( (void *)&m_TimeToBuffer , 0, sizeof( m_TimeToBuffer ) );
00981
00982 m_FirstArrivalTime = 0.0;
00983 m_SampleGenReqListTime = 0.0;
00984 m_SampleSortListTime = 0.0;
00985 m_SampleSimulationTime = 0.0;
00986 m_SampleCACTime = 0.0;
00987 m_SampleAdmissionProcessTime = 0.0;
00988 m_SampleWaitingTime = 0.0;
00989 m_WaitingQueueSize = 0;
00990 m_TotalBlocks = 0;
00991 m_RequestList = NULL;
00992 m_Id = 0;
00993 m_PosAtClientsList = NULL;
00994 m_Buffer = NULL;
00995 m_AverageRTTtoClient_msec = 0.0;
00996 m_ClientBufferSize = 0;
00997 m_ServerBufferStatus = 0;
00998 m_TokensToSend = 0;
00999 m_PrefetchedAllBlocks = false;
01000 m_FinishedMovie = false;
01001 m_TokensToDisk = 0;
01002 m_StartingToSend = true;
01003 m_MinimumBlockToSend = 0;
01004 pthread_mutex_init( &MutexTokensToSend , NULL );
01005 pthread_mutex_init( &MutexTokensToDisk , NULL );
01006 pthread_mutex_init( &MutexNextBlockToBeFetched, NULL );
01007 pthread_mutex_init( &MutexMinBlock , NULL );
01008 pthread_mutex_init( &MutexBlock , NULL );
01009 pthread_mutex_init( &MutexUpdateEvent , NULL );
01010 pthread_cond_init( &ConditionBlockArrival , NULL );
01011
01012 Next = NULL;
01013 Previous = NULL;
01014
01015
01016 }
01017
01018
01019 RioStream::~RioStream()
01020 {
01021 Close();
01022 }
01023
01024
01025 void RioStream::Initialize( CStreamManager *mgr )
01026 {
01027 s_mgr = mgr;
01028 }
01029
01030
01031 int RioStream::Close()
01032 {
01033 int i = 0;
01034
01035
01036
01037
01038
01039 if( m_Status == StreamStatusClosed )
01040 return S_OK;
01041
01042 s_mgr->m_MutexTable->Wait();
01043
01044 m_Status = StreamStatusClosed;
01045
01046 if( s_mgr->m_log.is_open() )
01047 s_mgr->m_log << "Closing Stream ( " << m_Id << " )"
01048 << " Requests " << m_nRequests
01049 << " Queue " << m_nQueue
01050 << " Request Count " << m_RequestCount
01051 << " Queued Count " << m_QueuedCount << endl;
01052
01053 EventDataRequest* request;
01054
01055
01056 while( m_nQueue > 0 )
01057 {
01058 request = m_Queue.Get();
01059 m_nQueue--;
01060 m_nRequests--;
01061 EventManager.Free( ( Event* )request );
01062 }
01063
01064 if( s_mgr->m_RTHoldList.First() == this )
01065 s_mgr->m_RTHoldList.Remove( s_mgr->m_RTHoldList.First() );
01066
01067 if( ( m_PosAtClientsList != NULL ) && ( s_mgr->m_UseServerSideBuffers ) )
01068 {
01069
01070
01071 for( i = 0 ; i < s_mgr->GetNumberOfBuffersForEachClient(); i++ )
01072 {
01073 cancelBlockAtBuffer ( i );
01074 }
01075 }
01076
01077
01078
01079
01080 if( m_nRequests > 0 )
01081 {
01082 if( s_mgr->m_log.is_open() )
01083 s_mgr->m_log << "Should wait for request completion before closing: "
01084 << m_nRequests << " requests pending" << endl;
01085 s_mgr->m_CloseList.Put( this );
01086 }
01087 else
01088 {
01089
01090 s_mgr->m_used--;
01091
01092 if( m_Type == RIO_TRAFFIC_CBR )
01093 {
01094 s_mgr->m_UsedRate -= s_Rate;
01095 }
01096
01097 if( ( m_PosAtClientsList != NULL ) &&
01098 ( s_mgr->m_UseServerSideBuffers )
01099 )
01100 {
01101 s_mgr->RemoveClient( m_PosAtClientsList );
01102 if( m_Buffer )
01103 {
01104 delete m_Buffer;
01105 m_Buffer = NULL;
01106 }
01107 }
01108 #ifdef RIO_DEBUG2
01109 RioErr << "Closing mlog." << endl;
01110 m_log.close();
01111 #endif
01112 #ifdef RIO_DEBUG2
01113 RioErr << "Closing mlogreq." << endl;
01114 m_log_requests.close();
01115 #endif
01116
01117
01118 s_mgr->m_FreeList.Put( this );
01119 if( s_mgr->m_log.is_open() )
01120 s_mgr->m_log << "Close: Stream CLOSED. Requests: " << m_RequestCount
01121 << ". Queued requests: " << m_QueuedCount
01122 << ". Active streams " << s_mgr->m_used<< endl;
01123 }
01124
01125 s_mgr->m_MutexTable->Release();
01126
01127 struct in_addr clientip;
01128 clientip.s_addr = m_RemoteAddress.sin_addr.s_addr;
01129 UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip),
01130 htons( m_RemoteAddress.sin_port ), m_nRequests );
01131 s_mgr->PostITEvent( (MonitorEvent *)it_event );
01132
01133 return S_OK;
01134 }
01135
01136 int RioStream::getDirection()
01137 {
01138 return s_Direction;
01139 }
01140
01141
01142 int RioStream::OpenObject( char* ObjectName, RioAccess Access,
01143 RioStreamObj **StreamObj )
01144 {
01145 int rc;
01146
01147
01148 if( ( s_Direction == RioStreamDirectionRead ) &&
01149 ( Access & RIO_WRITE_ACCESS ))
01150 {
01151 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
01152 }
01153 if( ( s_Direction == RioStreamDirectionWrite ) &&
01154 ( Access & RIO_READ_ACCESS ))
01155 {
01156 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
01157 }
01158
01159 if( m_Status != StreamStatusOpened )
01160 {
01161 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
01162 }
01163
01164 RioStreamObj *op = new RioStreamObj( s_mgr, this );
01165 if( op == 0 )
01166 {
01167 return ERROR_STREAMMANAGER + ERROR_MEMORY;
01168 }
01169
01170 op->o_object = 0;
01171
01172 rc = s_mgr->m_ObjectManager->Open( ObjectName, Access, &op->o_object );
01173
01174 if( rc == 0 )
01175 {
01176 *StreamObj = op;
01177 return S_OK;
01178 }
01179 delete op;
01180 return rc;
01181 }
01182
01183
01184 int RioStream::MaxRequests()
01185 {
01186 if( m_Status != StreamStatusOpened )
01187 {
01188 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
01189 }
01190
01191 return m_UserMaxRequests;
01192 }
01193
01194
01195
01196 RioTrafficType RioStream::GetType()
01197 {
01198 return m_Type;
01199 }
01200
01201
01202 uint RioStream::GetId()
01203 {
01204 return m_Id;
01205 }
01206
01207
01208 int RioStream::Get_streamid()
01209 {
01210 return s_streamid;
01211 }
01212
01213
01214
01215 int RioStream::DataReq( u32 reqid, u16 operation,
01216 u32 ipaddr, u16 port,
01217 u32 block, u32 repbits,
01218 RioStreamObj* StreamObject )
01219 {
01220
01221 struct timeval ArrivalTime = Timer.CurrentTime();
01222
01223 #ifdef RIO_DEBUG2
01224 if( m_log.is_open() )
01225 {
01226 struct in_addr clientip;
01227 double interval = getInterval( m_LastArrivalTime, ArrivalTime );
01228 clientip.s_addr = ipaddr;
01229 char *info = myInfo( false );
01230 m_log << endl << info
01231 << "Stream:DataReq => Req ip " << inet_ntoa(clientip)
01232 << " port " << port << " reqid " << reqid
01233 << " block " << block << " TokensToSend " << m_TokensToSend
01234 << " (m_nRequests " << m_nRequests + 1 << ")"
01235 << " arrival " << ArrivalTime.tv_sec << " sec "
01236 << ArrivalTime.tv_usec << " usec "
01237 << "interval " << interval << " msec"
01238 << " m_MaxRequests " << m_MaxRequests << endl;
01239 free( info );
01240 }
01241 #endif
01242
01243
01244 if( m_Status == StreamStatusClosed )
01245 {
01246 #ifdef RIO_DEBUG2
01247 if( m_log.is_open() )
01248 m_log << "Stream:DataReq:ERROR_STREAM_NOT_OPENED!!!" << endl;
01249 #endif
01250
01251 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
01252 }
01253
01254
01255 if( m_nRequests >= m_MaxRequests )
01256 {
01257 #ifdef RIO_DEBUG2
01258 if( m_log.is_open() )
01259 m_log << "Stream:DataReq:Max Number of Active Requests!!! "
01260 << m_nRequests << ">=" << m_MaxRequests << endl;
01261 #endif
01262
01263 RioErr << "Stream:DataReq:Max Number of Active Requests!!! "
01264 << m_nRequests << ">=" << m_MaxRequests << endl;
01265
01266 return ERROR_STREAMMANAGER + ERROR_MAX_STREAM_REQUESTS;
01267 }
01268
01269 m_nRequests++;
01270 m_RequestCount++;
01271
01272
01273 EventDataRequest* event;
01274
01275 if( m_Type == RIO_TRAFFIC_NRT )
01276 {
01277 event = (EventDataRequest*)EventManager.New( EventTypeNRTDataRequest );
01278 }
01279 else
01280 {
01281 event = (EventDataRequest*)EventManager.New( EventTypeRTDataRequest );
01282 }
01283
01284 if( operation == READ )
01285 {
01286 if( m_Type == RIO_TRAFFIC_NRT )
01287 {
01288 event->Request.Operation = NonRealTimeRead;
01289 }
01290 else
01291 {
01292 event->Request.Operation = RealTimeRead;
01293 }
01294 }
01295 else
01296 {
01297 event->Request.Operation = NonRealTimeWrite;
01298 }
01299
01300 event->Request.streamobj = StreamObject;
01301 event->Request.rioobject = StreamObject->o_object;
01302 event->Request.Reqid = reqid;
01303 event->Request.Target.IPaddress = ipaddr;
01304 event->Request.Target.Port = port;
01305 event->Request.Block = block;
01306 event->Request.RepBits = repbits;
01307 event->Request.Status = 0;
01308 event->Request.BufferId = -2;
01309
01310
01311 if( m_Type == RIO_TRAFFIC_NRT )
01312 {
01313
01314
01315 if( s_mgr->m_HoldingNonRealTime ||
01316 ( s_mgr->m_ActiveRequests >= s_mgr->m_MaxActiveRequests ))
01317 {
01318
01319
01320 NRTHold( event );
01321 }
01322
01323 else
01324 {
01325
01326 s_mgr->m_ActiveRequests++;
01327
01328
01329 #ifdef RIO_DEBUG2
01330 if( m_log.is_open() )
01331 m_log << "Stream:DataReq:Enviando pedido de NRT ao router"
01332 << endl;
01333 #endif
01334 s_mgr->m_Router->Put( (Event*)event );
01335 }
01336 }
01337
01338 else
01339 {
01340
01341
01342 if( ( m_PosAtClientsList != NULL ) &&
01343 ( s_mgr->m_UseServerSideBuffers )
01344 )
01345 {
01346 m_LastArrivalTime = ArrivalTime;
01347
01348 bool changeSequence = false;
01349 bool blockWasAtBuffer = false;
01350 bool usedEvent = false;
01351
01352 incTokensToSend();
01353 updateTokensToDisk( ArrivalTime );
01354
01355 #ifdef RIO_DEBUG2
01356 if( m_log.is_open() )
01357 m_log << " Looking for block at buffer ... ( tokenstodisk "
01358 << getTokensToDisk() << ")"<< endl;
01359 #endif
01360
01361
01362
01363 for( int i = 0; i < s_mgr->GetNumberOfBuffersForEachClient(); i++ )
01364 {
01365 if( ( m_Buffer[i].event != NULL ) &&
01366 ( (u32)m_Buffer[i].event->Request.Block == block ))
01367 {
01368 #ifdef RIO_DEBUG2
01369 if( m_log.is_open() )
01370 m_log << " Found it with BufferId "
01371 << m_Buffer[i].event->Request.BufferId
01372 << " updates ip, port and reqid... " << endl;
01373 #endif
01374
01375 blockWasAtBuffer = true;
01376
01377
01378 getMutexUpdateEvent();
01379
01380
01381 m_Buffer[i].event->Request.Reqid = reqid;
01382 m_Buffer[i].event->Request.Target.IPaddress = ipaddr;
01383 m_Buffer[i].event->Request.Target.Port = port;
01384
01385 incMinimumBlockToSend();
01386
01387 if( m_Buffer[i].event->Request.BufferId == -1 )
01388 {
01389 #ifdef RIO_DEBUG2
01390
01391 if( m_log.is_open() )
01392 m_log << " prefetching and waiting "<< endl;
01393 #endif
01394 releaseMutexUpdateEvent();
01395 }
01396 else if( m_Buffer[i].event->Request.Status != 0 )
01397 {
01398
01399 m_Buffer[i].event->Request.Operation =
01400 RealTimeCancelBlock;
01401 releaseMutexUpdateEvent();
01402 s_mgr->m_Router->Put( (Event*)m_Buffer[i].event );
01403 decTokensToSend();
01404 decServerBufferStatus();
01405
01406 #ifdef RIO_DEBUG2
01407 if( m_log.is_open() )
01408 m_log << " sent to Router -> error at prefetch, "
01409 << " cancel block" << endl;
01410 #endif
01411 }
01412 else
01413 {
01414
01415 m_Buffer[i].event->Request.Operation=RealTimeSendBlock;
01416 releaseMutexUpdateEvent();
01417 s_mgr->m_Router->Put( (Event*)m_Buffer[i].event );
01418 #ifdef RIO_DEBUG2
01419 if( m_log.is_open() )
01420 m_log << " sent to Router "<< endl;
01421 #endif
01422 }
01423
01424 m_Buffer[i].event = NULL;
01425 }
01426 }
01427
01428
01429 if( block != m_PosAtClientsList->nextBlockToBeRequested->block )
01430 {
01431
01432 changeSequence = true;
01433
01434 #ifdef RIO_DEBUG2
01435 if( m_log.is_open() )
01436 m_log << " sequence changed from "
01437 << m_PosAtClientsList->nextBlockToBeRequested->block
01438 << " to " << block << endl;
01439 #endif
01440
01441 while( ( block >
01442 m_PosAtClientsList->nextBlockToBeRequested->block ) &&
01443 ( m_PosAtClientsList->nextBlockToBeRequested->next !=
01444 NULL )
01445 )
01446 m_PosAtClientsList->nextBlockToBeRequested =
01447 m_PosAtClientsList->nextBlockToBeRequested->next;
01448
01449 while( ( block <
01450 m_PosAtClientsList->nextBlockToBeRequested->block ) &&
01451 ( m_PosAtClientsList->nextBlockToBeRequested->prev !=
01452 NULL )
01453 )
01454 m_PosAtClientsList->nextBlockToBeRequested =
01455 m_PosAtClientsList->nextBlockToBeRequested->prev;
01456
01457 #ifdef RIO_DEBUG2
01458 if( m_log.is_open() )
01459 m_log << " nbtoberequested "
01460 << m_PosAtClientsList->nextBlockToBeRequested->block
01461 << endl;
01462 #endif
01463
01464 if( blockWasAtBuffer == false )
01465 {
01466 #ifdef RIO_DEBUG2
01467 if( m_log.is_open() )
01468 m_log << " block was not at buffer. "<< endl;
01469 #endif
01470
01471
01472 usedEvent = true;
01473 decTokensToSend();
01474 if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) )
01475 {
01476 #ifdef RIO_DEBUG2
01477 if( m_log.is_open() )
01478 m_log << " (to Router) send block "
01479 << event->Request.Block
01480 << " bufferid "
01481 << event->Request.BufferId
01482 << " m_ActiveRequests "
01483 << s_mgr->m_ActiveRequests + 1
01484 << endl;
01485 #endif
01486
01487 s_mgr->m_Router->Put( (Event*)event );
01488 s_mgr->m_ActiveRequests++;
01489 decTokensToDisk();
01490 }
01491 else
01492 {
01493 #ifdef RIO_DEBUG2
01494 if( m_log.is_open() )
01495 m_log << " (to queue) send block "
01496 << event->Request.Block
01497 << " bufferid "
01498 << event->Request.BufferId << endl;
01499 #endif
01500
01501 RTHold( event );
01502 }
01503 }
01504 }
01505
01506 if( m_PosAtClientsList->nextBlockToBeRequested->next != NULL )
01507 {
01508 m_PosAtClientsList->nextBlockToBeRequested =
01509 m_PosAtClientsList->nextBlockToBeRequested->next;
01510 }
01511 else
01512 {
01513
01514 #ifdef RIO_DEBUG2
01515 if( m_log.is_open() )
01516 m_log << " last block - prefetch all true "<< endl;
01517 #endif
01518
01519 m_FinishedMovie = true;
01520 m_PrefetchedAllBlocks = true;
01521
01522 if( s_mgr->GetNumberOfBuffersForEachClient() != 0 )
01523 {
01524 EventManager.Free( (Event *) event );
01525 }
01526 }
01527
01528 #ifdef RIO_DEBUG2
01529 if( m_log.is_open() ) m_log << " updated nbtoberequested "
01530 << m_PosAtClientsList->nextBlockToBeRequested->block
01531 << endl;
01532 #endif
01533
01534 if( m_PrefetchedAllBlocks == true )
01535 {
01536 m_nRequests--;
01537 }
01538
01539
01540 if( s_mgr->GetNumberOfBuffersForEachClient() == 0 )
01541 {
01542 updateNextBlockToBeFetched(
01543 m_PosAtClientsList->nextBlockToBeRequested);
01544 }
01545 else
01546 {
01547 if( changeSequence && ( m_FinishedMovie == false ) )
01548 {
01549 #ifdef RIO_DEBUG2
01550 if( m_log.is_open() )
01551 m_log << " updating nbtobefetched ... ";
01552 #endif
01553
01554 if( ( block >=
01555 m_PosAtClientsList->nextBlockToBeFetched->block ) ||
01556 ( ( block + s_mgr->GetNumberOfBuffersForEachClient()) <
01557 ( m_PosAtClientsList->nextBlockToBeFetched->block -
01558 s_mgr->GetNumberOfBuffersForEachClient()
01559 )
01560 )
01561 )
01562 {
01563
01564 for( int i=0;
01565 i < s_mgr->GetNumberOfBuffersForEachClient();
01566 i++
01567 )
01568 {
01569 cancelBlockAtBuffer ( i );
01570 m_nRequests++;
01571 }
01572 updateNextBlockToBeFetched(
01573 m_PosAtClientsList->nextBlockToBeRequested );
01574 }
01575 else
01576 if( ( block + s_mgr->GetNumberOfBuffersForEachClient() ) <
01577 m_PosAtClientsList->nextBlockToBeFetched->block )
01578 {
01579 updateNextBlockToBeFetched(
01580 m_PosAtClientsList->nextBlockToBeRequested );
01581 }
01582 setMinimumBlockToSend(
01583 m_PosAtClientsList->nextBlockToBeRequested->block );
01584 if( m_PosAtClientsList->nextBlockToBeFetched->next == NULL )
01585 {
01586 m_PrefetchedAllBlocks = true;
01587 }
01588 else
01589 {
01590 m_PrefetchedAllBlocks = false;
01591 }
01592 }
01593 }
01594
01595
01596
01597 if( ( s_mgr->GetNumberOfBuffersForEachClient() == 0 ) &&
01598 ( changeSequence == false ))
01599 {
01600 decTokensToSend();
01601 if( m_StartingToSend == true )
01602 {
01603 #ifdef RIO_DEBUG2
01604 if( m_log.is_open() )
01605 m_log << "Stream:DataReq:Sending prefetching blocks to"
01606 << " router"<< endl;
01607 #endif
01608
01609 s_mgr->m_Router->Put( (Event*)event );
01610 s_mgr->m_ActiveRequests++;
01611 }
01612 else
01613 {
01614 if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) )
01615 {
01616 #ifdef RIO_DEBUG2
01617 if( m_log.is_open() )
01618 m_log << "Stream:DataReq:Sending prefetching blocks "
01619 << "to router 2"<< endl;
01620 #endif
01621
01622 s_mgr->m_Router->Put( (Event*)event );
01623 s_mgr->m_ActiveRequests++;
01624 decTokensToDisk();
01625 }
01626 else
01627 {
01628 RTHold( event );
01629 }
01630 }
01631 }
01632 int i = 0;
01633 int pos = -1;
01634 bool prefetch;
01635
01636 while(( i < s_mgr->GetNumberOfBuffersForEachClient()) &&
01637 ( m_PrefetchedAllBlocks == false ))
01638 {
01639 prefetch = false;
01640 if( m_Buffer[i].event == NULL )
01641 {
01642 prefetch = true;
01643 }
01644 else
01645 if( (( m_Buffer[i].blockInfo->block < getMinimumBlockToSend() )
01646 ||( m_Buffer[i].blockInfo->block >=
01647 ( m_PosAtClientsList->nextBlockToBeRequested->block
01648 + s_mgr->GetNumberOfBuffersForEachClient())))
01649 && changeSequence )
01650 {
01651 cancelBlockAtBuffer( i );
01652 m_nRequests++;
01653 prefetch = true;
01654 }
01655 else if( ( m_Buffer[i].blockInfo->block >=
01656 m_PosAtClientsList->nextBlockToBeFetched->block )
01657 && ( changeSequence ))
01658 {
01659 if( pos == -1 )
01660 {
01661 pos = i;
01662 }
01663 else
01664 {
01665 if( m_Buffer[pos].blockInfo->block <
01666 m_Buffer[i].blockInfo->block )
01667 pos = i;
01668 }
01669 }
01670 if( prefetch == true )
01671 {
01672 if( usedEvent )
01673 {
01674 event = (EventDataRequest*)
01675 EventManager.New( EventTypeRTDataRequest );
01676 }
01677 else
01678 usedEvent = true;
01679
01680 prefetchBlock( i, event, StreamObject );
01681 updateNextBlockToBeFetched( NULL );
01682 }
01683 i++;
01684 }
01685
01686 if( ( pos != -1 ) &&
01687 ( m_Buffer[pos].blockInfo->block >=
01688 m_PosAtClientsList->nextBlockToBeFetched->block ))
01689 {
01690 updateNextBlockToBeFetched( m_Buffer[pos].blockInfo->next );
01691 }
01692
01693 if( usedEvent == false )
01694 {
01695
01696 EventManager.Free( (Event *) event );
01697 }
01698
01699 }
01700 else
01701 {
01702
01703
01704
01705
01706
01707 struct timeval tolerance_time;
01708 tolerance_time.tv_sec =ArrivalTime.tv_sec;
01709 tolerance_time.tv_usec=ArrivalTime.tv_usec+RELEASE_TIME_TOLERANCE;
01710 if( tolerance_time.tv_usec > 1000000 )
01711 {
01712 tolerance_time.tv_sec += 1;
01713 tolerance_time.tv_usec -= 1000000;
01714 }
01715
01716 #ifdef RIO_DEBUG2
01717 if( m_log.is_open() ) m_log << "Stream:DataReq "
01718 << " m_nQueue " << m_nQueue << endl
01719 << " tolerance_time.tv_sec "
01720 << ( unsigned ) tolerance_time.tv_sec
01721 << " m_NextRelease.tv_sec "
01722 << ( unsigned ) m_NextRelease.tv_sec
01723 << endl
01724 << " tolerance_time.tv_usec "
01725 << ( unsigned ) tolerance_time.tv_usec
01726 << " m_NextRelease.tv_usec "
01727 << ( unsigned ) m_NextRelease.tv_usec
01728 << endl;
01729 #endif
01730
01731
01732
01733 if( ( m_nQueue == 0 ) &&
01734 (( tolerance_time.tv_sec > m_NextRelease.tv_sec ) ||
01735 (( tolerance_time.tv_sec == m_NextRelease.tv_sec ) &&
01736 ( tolerance_time.tv_usec >= m_NextRelease.tv_usec ))))
01737 {
01738
01739
01740
01741 if( ( ArrivalTime.tv_sec > m_NextRelease.tv_sec ) ||
01742 (( ArrivalTime.tv_sec == m_NextRelease.tv_sec) &&
01743 (ArrivalTime.tv_usec > m_NextRelease.tv_usec)) )
01744 {
01745 m_NextRelease.tv_sec = ArrivalTime.tv_sec +
01746 m_ArrivalInterval.tv_sec;
01747 m_NextRelease.tv_usec = ArrivalTime.tv_usec +
01748 m_ArrivalInterval.tv_usec;
01749 }
01750 else
01751 {
01752 m_NextRelease.tv_sec += m_ArrivalInterval.tv_sec;
01753 m_NextRelease.tv_usec += m_ArrivalInterval.tv_usec;
01754 }
01755 if( m_NextRelease.tv_usec > 1000000 )
01756 {
01757 m_NextRelease.tv_sec += 1;
01758 m_NextRelease.tv_usec -= 1000000;
01759 }
01760
01761
01762
01763 s_mgr->m_ActiveRequests++;
01764
01765
01766 #ifdef RIO_DEBUG2
01767 if( m_log.is_open() )
01768 m_log << "Stream:DataReq:Enviando pedido de bloco ao router"
01769 << endl;
01770 #endif
01771 s_mgr->m_Router->Put( (Event*)event );
01772 }
01773
01774 else
01775 {
01776
01777
01778 #ifdef RIO_DEBUG2
01779 if( m_log.is_open() )
01780 m_log << "Stream:DataReq:Putting request on hold" << endl;
01781 #endif
01782
01783 RTHold( event );
01784 }
01785
01786 m_LastArrivalTime = ArrivalTime;
01787 }
01788 }
01789
01790 struct in_addr clientip;
01791 clientip.s_addr = m_RemoteAddress.sin_addr.s_addr;
01792 UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip),
01793 htons( m_RemoteAddress.sin_port ), m_nRequests );
01794 s_mgr->PostITEvent( (MonitorEvent *)it_event );
01795
01796 return S_OK;
01797 }
01798
01799
01800 void RioStream::RTHold( EventDataRequest* event )
01801 {
01802
01803
01804 if( m_nQueue == 0 )
01805 {
01806 #ifdef RIO_DEBUG2
01807 if( m_log.is_open() )
01808 m_log << "Stream:DataReq:colocando first na fila " << endl;
01809 #endif
01810 s_mgr->m_RTHoldList.Put( this );
01811 }
01812
01813 m_Queue.Put( event );
01814 m_nQueue++;
01815 m_QueuedCount++;
01816 }
01817
01818
01819 void RioStream::NRTHold( EventDataRequest* event )
01820 {
01821
01822
01823 if( m_nQueue == 0 )
01824 {
01825 s_mgr->m_NRTHoldList.Put( this );
01826 }
01827
01828 m_Queue.Put( event );
01829 m_nQueue++;
01830 m_QueuedCount++;
01831 }
01832
01833
01834 void RioStream::ProcessRTQueue()
01835 {
01836
01837 EventDataRequest* request;
01838
01839 struct timeval t = Timer.CurrentTime();
01840 t.tv_usec += RELEASE_TIME_TOLERANCE;
01841 if( t.tv_usec > 1000000 )
01842 {
01843 t.tv_sec += 1;
01844 t.tv_usec -= 1000000;
01845 }
01846
01847 if( m_PosAtClientsList != NULL )
01848 {
01849 struct timeval clocknow = Timer.CurrentTime();
01850
01851
01852 updateTokensToDisk( clocknow );
01853
01854
01855 while(( m_nQueue > 0 ) && ( getTokensToDisk() > 0 ))
01856 {
01857 #ifdef RIO_DEBUG2
01858 if( m_log.is_open() )
01859 m_log << "Stream:DataReq 1 - ProcessRTQueue "
01860 << " Recuperando da fila."
01861 << endl;
01862 #endif
01863
01864 request = m_Queue.Get();
01865 m_nQueue--;
01866
01867
01868 s_mgr->m_ActiveRequests++;
01869
01870
01871 #ifdef RIO_DEBUG2
01872 if( m_log.is_open() ) m_log << "Stream:DataReq - ProcessRTQueue "
01873 << " Enviando pedidos recuperados da fila para o router111 "
01874 << endl;
01875 #endif
01876 s_mgr->m_Router->Put( (Event*)request );
01877 decTokensToDisk();
01878 }
01879 }
01880 else
01881 {
01882
01883 while( ( m_nQueue > 0 ) &&
01884 ( ( t.tv_sec > m_NextRelease.tv_sec) ||
01885 ( ( t.tv_sec == m_NextRelease.tv_sec ) &&
01886 ( t.tv_usec >= m_NextRelease.tv_usec)
01887 )
01888 )
01889 )
01890 {
01891
01892 #ifdef RIO_DEBUG2
01893 if( m_log.is_open() )
01894 m_log << "Stream:DataReq 2 - ProcessRTQueue "
01895 << " Recuperando da fila."
01896 << endl;
01897 #endif
01898 request = m_Queue.Get();
01899 m_nQueue--;
01900
01901
01902 m_NextRelease.tv_sec += m_ArrivalInterval.tv_sec;
01903 m_NextRelease.tv_usec += m_ArrivalInterval.tv_usec;
01904 if( m_NextRelease.tv_usec > 1000000 )
01905 {
01906 m_NextRelease.tv_sec += 1;
01907 m_NextRelease.tv_usec -= 1000000;
01908 }
01909
01910
01911 s_mgr->m_ActiveRequests++;
01912
01913
01914 #ifdef RIO_DEBUG2
01915 if( m_log.is_open() )
01916 m_log << "Stream:DataReq - ProcessRTQueue "
01917 << " Enviando pedidos recuperados da fila para o router "
01918 << endl;
01919 #endif
01920 s_mgr->m_Router->Put( (Event*)request );
01921 }
01922 }
01923
01924
01925 if( m_nQueue == 0 )
01926 s_mgr->m_RTHoldList.Remove( this );
01927 }
01928
01929
01930 void RioStream::ProcessNRTQueue()
01931 {
01932 EventDataRequest* request;
01933
01934
01935
01936 if( m_nQueue > 0 )
01937 {
01938
01939 request = m_Queue.Get();
01940 m_nQueue--;
01941
01942
01943 s_mgr->m_ActiveRequests++;
01944
01945
01946 s_mgr->m_Router->Put( (Event*)request );
01947 }
01948
01949
01950 if( m_nQueue == 0 )
01951 s_mgr->m_NRTHoldList.Remove( this );
01952 }
01953
01954
01955 void RioStream::RequestCompleted( Event* Request )
01956 {
01957 DataRequest* Req = & ((( EventDataRequest* )Request )->Request );
01958
01959
01960 DataRequestOperation Operation = Req->Operation;
01961 CommunicationAddress Target = Req->Target;
01962 int Reqid = Req->Reqid;
01963 RioResult Status = Req->Status;
01964 NATData result;
01965
01966 unsigned int StorageNumber;
01967
01968
01969
01970 s_mgr->m_Router->GetNumberOfStorageNodes( &StorageNumber );
01971
01972
01973 EventManager.Free( Request );
01974
01975 m_nRequests--;
01976 s_mgr->m_ActiveRequests--;
01977
01978 #ifdef RIO_DEBUG2
01979 struct in_addr client_address;
01980 client_address.s_addr = Target.IPaddress;
01981 RioErr << "[RioStream] RequestCompleted Target: "
01982 << "IPAddr = " << inet_ntoa(client_address)
01983 << ", Port = " << Target.Port << endl;
01984 #endif
01985
01986
01987 if( m_Status == StreamStatusClosed )
01988 {
01989 if( m_nRequests == 0 )
01990 {
01991 s_mgr->m_CloseList.Remove( this );
01992 s_mgr->m_used--;
01993
01994 if( m_Type == RIO_TRAFFIC_CBR )
01995 {
01996 s_mgr->m_UsedRate -= s_Rate;
01997 }
01998 if( (m_PosAtClientsList != NULL) &&
01999 (s_mgr->m_UseServerSideBuffers)
02000 )
02001 {
02002 s_mgr->RemoveClient( m_PosAtClientsList );
02003 if( m_Buffer )
02004 {
02005 delete m_Buffer;
02006 m_Buffer = NULL;
02007 }
02008 }
02009 #ifdef RIO_DEBUG2
02010 RioErr << "Closing mlog" << endl;
02011 m_log.close();
02012 #endif
02013 #ifdef RIO_DEBUG2
02014 RioErr << "Closing mlogreq" << endl;
02015 m_log_requests.close();
02016 #endif
02017 s_mgr->m_FreeList.Put( this );
02018 if( s_mgr->m_log.is_open() )
02019 s_mgr->m_log << "RequestCompleted: Stream CLOSED. "
02020 << " Requests: " << m_RequestCount
02021 << ". Queued requests: " << m_QueuedCount << endl;
02022 }
02023 }
02024 else
02025 {
02026
02027 #ifdef RIO_DEBUG2
02028 RioErr << "RioStream::RequestCompleted Status = " << Status
02029 << ", Operation = " << Operation << ", ReqID = " << Reqid
02030 << endl;
02031 #endif
02032
02033
02034 if( ( Operation == NonRealTimeRead ) ||
02035 ( Operation == RealTimeRead ) ||
02036 ( Operation == RealTimeCancelBlock )
02037 )
02038 {
02039
02040 if( Status != 0 )
02041 {
02042 #ifdef RIO_DEBUG2
02043 if( m_log.is_open() )
02044 m_log << "RequestCompleted SendResult Error " << endl;
02045 #endif
02046 RioErr << "RequestCompleted SendResult Error " << endl;
02047
02048 NATData input( Target.IPaddress, Target.Port );
02049
02050
02051
02052
02053
02054 if( s_mgr->m_Router->m_NAT_map.findElement( input,
02055 StorageNumber + 1 ) )
02056 result = s_mgr->m_Router->m_NAT_map.getElement( input,
02057 StorageNumber + 1 );
02058 else
02059 result = s_mgr->m_Router->m_NAT_map.getElement( input, 0 );
02060 if( result.nat_addr != 0 )
02061 {
02062
02063
02064
02065 if( s_mgr->m_NetInterface->FindIPAndPort( result.nat_addr,
02066 result.nat_port )
02067 )
02068 s_mgr->m_NetInterface->SendResult( result.nat_addr,
02069 result.nat_port,
02070 Reqid, Status );
02071 else
02072 s_mgr->m_NetMgr->SendResult( result.nat_addr,
02073 result.nat_port,
02074 Reqid, Status );
02075 }
02076 else
02077 {
02078
02079
02080
02081 if( s_mgr->m_NetInterface->FindIPAndPort( Target.IPaddress,
02082 Target.Port ) )
02083 s_mgr->m_NetInterface->SendResult( Target.IPaddress,
02084 Target.Port,
02085 Reqid, Status );
02086 else
02087 s_mgr->m_NetMgr->SendResult( Target.IPaddress,
02088 Target.Port,
02089 Reqid, Status );
02090 }
02091 }
02092 }
02093
02094 else if( Operation == NonRealTimeWrite )
02095 {
02096 NATData input( Target.IPaddress, Target.Port );
02097
02098
02099
02100
02101 if( s_mgr->m_Router->m_NAT_map.findElement( input,
02102 StorageNumber + 1 ) )
02103 result = s_mgr->m_Router->m_NAT_map.getElement( input,
02104 StorageNumber + 1 );
02105 else
02106 result = s_mgr->m_Router->m_NAT_map.getElement( input, 0 );
02107 if( result.nat_addr != 0 )
02108 {
02109 Target.IPaddress = result.nat_addr;
02110 Target.Port = result.nat_port;
02111 }
02112 if( result.nat_addr != 0 )
02113 {
02114
02115
02116
02117 if( s_mgr->m_NetInterface->FindIPAndPort( result.nat_addr,
02118 result.nat_port ) )
02119 s_mgr->m_NetInterface->SendResult( result.nat_addr,
02120 result.nat_port,
02121 Reqid, Status );
02122 else
02123 s_mgr->m_NetMgr->SendResult( result.nat_addr,
02124 result.nat_port,
02125 Reqid, Status );
02126 }
02127 else
02128 {
02129 if( s_mgr->m_NetInterface->FindIPAndPort( Target.IPaddress,
02130 Target.Port ) )
02131 s_mgr->m_NetInterface->SendResult( Target.IPaddress,
02132 Target.Port, Reqid,
02133 Status );
02134 else
02135 s_mgr->m_NetMgr->SendResult( Target.IPaddress, Target.Port,
02136 Reqid, Status );
02137 }
02138 }
02139 }
02140
02141 struct in_addr clientip;
02142 clientip.s_addr = m_RemoteAddress.sin_addr.s_addr;
02143 UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip),
02144 htons( m_RemoteAddress.sin_port ), m_nRequests );
02145 s_mgr->PostITEvent( (MonitorEvent *)it_event );
02146 }
02147
02148
02149
02150
02151
02152
02153
02154
02155
02156
02157 int RioStream::OpenObject( char* ObjectName,
02158 RioAccess Access,
02159 struct timeval RTT,
02160 int BufferSize,
02161 RioStreamObj **StreamObj )
02162 {
02163 int rc;
02164
02165
02166 if( (s_Direction == RioStreamDirectionRead )&&( Access & RIO_WRITE_ACCESS) )
02167 {
02168 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
02169 }
02170 if( (s_Direction == RioStreamDirectionWrite )&&( Access & RIO_READ_ACCESS) )
02171 {
02172 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
02173 }
02174
02175 if( m_Status != StreamStatusOpened )
02176 {
02177 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
02178 }
02179
02180 RioStreamObj *op = new RioStreamObj( s_mgr, this );
02181 if( op == 0 )
02182 {
02183 return ERROR_STREAMMANAGER + ERROR_MEMORY;
02184 }
02185
02186 op->o_object = 0;
02187
02188 rc = s_mgr->m_ObjectManager->Open( ObjectName, Access, &op->o_object );
02189
02190 if( rc == 0 )
02191 {
02192 *StreamObj = op;
02193
02194
02195 if( (GetType() == RIO_TRAFFIC_VBR )&&( s_mgr->m_UseServerSideBuffers ) )
02196 {
02197 struct timeval initial_time, final_time;
02198 double time;
02199
02200 initial_time = Timer.CurrentTime();
02201
02202 m_ClientBufferSize = BufferSize;
02203 m_AverageRTTtoClient_msec = RTT.tv_sec * 1000.0 +
02204 RTT.tv_usec / 1000.0;
02205
02206
02207 char infoFileName[255]="", mmobjectname[255]="";
02208 strcpy( mmobjectname, ObjectName );
02209 char *pos = strrchr( mmobjectname, '.' );
02210 strcpy( pos+1, "cactimes" );
02211 strcpy( infoFileName, s_mgr->m_FileRoot );
02212 strcat( infoFileName, "/" ) ;
02213 strcat( infoFileName, mmobjectname ) ;
02214
02215 m_FirstArrivalTime = initial_time.tv_sec * 1000.0 +
02216 initial_time.tv_usec / 1000.0;
02217
02218 #ifdef RIO_DEBUG2
02219 if( m_log.is_open() )
02220 m_log << "OpenObject: Video : " << ObjectName
02221 << endl << "OpenObject: CAC info: " << infoFileName
02222 << endl << "OpenObject: FirstArrivalTime "
02223 << m_FirstArrivalTime << endl;
02224 #endif
02225
02226 m_WaitingQueueSize = s_mgr->UpdateNumberOfWaitingClients( 1 );
02227
02228 pthread_mutex_lock( &s_mgr->MutexRunCAC );
02229
02230 if( ( m_RequestList = getInfoAboutRequestList( infoFileName,
02231 op->o_object )) != NULL )
02232 {
02233 if( s_mgr->m_UseNewCAC )
02234 {
02235
02236 m_PosAtClientsList = s_mgr->CanAdmit( m_RequestList,
02237 this,
02238 m_AverageRTTtoClient_msec,
02239 m_ClientBufferSize );
02240 final_time = Timer.CurrentTime();
02241 time = getInterval( initial_time, final_time );
02242 m_SampleCACTime = time;
02243 int index;
02244
02245 if( m_PosAtClientsList != NULL )
02246 {
02247 index = 0;
02248 }
02249 else
02250 {
02251 index = 1;
02252 }
02253
02254 m_SampleAdmissionProcessTime = m_SampleGenReqListTime+
02255 m_SampleSortListTime+
02256 m_SampleSimulationTime;
02257
02258 m_SampleWaitingTime = m_SampleCACTime -
02259 m_SampleAdmissionProcessTime;
02260
02261 s_mgr->SetAverageWaitingTime( m_SampleWaitingTime,
02262 this, index,
02263 m_WaitingQueueSize );
02264 s_mgr->SetAverageAdmissionProcessTime(
02265 m_SampleAdmissionProcessTime,
02266 this, index );
02267
02268 s_mgr->SetAverageCACTime( time, this, index );
02269
02270 if( s_mgr->m_log.is_open() )
02271 {
02272 s_mgr->m_log << "Time to generate list: "
02273 << m_SampleGenReqListTime << " msec."
02274 << endl;
02275 s_mgr->m_log << "Time to sort list: "
02276 << m_SampleSortListTime << " msec."
02277 << endl;
02278 s_mgr->m_log << "Time to simulate list: "
02279 << m_SampleSimulationTime << " msec."
02280 << endl;
02281 s_mgr->m_log << "Time to admission: "
02282 << m_SampleCACTime << " msec."
02283 << endl;
02284 s_mgr->m_log << "Waiting time: "
02285 << m_SampleWaitingTime << " msec."
02286 << endl;
02287 s_mgr->m_log << "Waiting queue size: "
02288 << m_WaitingQueueSize << endl;
02289 }
02290
02291 #ifdef RIO_DEBUG2
02292 if( m_log.is_open() )
02293 m_log << "Time to admission: " << time << " msec."
02294 << endl;
02295 #endif
02296 if( m_PosAtClientsList != NULL )
02297 {
02298 if( s_mgr->m_log.is_open() )
02299 s_mgr->m_log << "OpenObject: New client admitted. "
02300 << "(RTT " << m_AverageRTTtoClient_msec
02301 << " msec)" << endl;
02302
02303 #ifdef RIO_DEBUG2
02304 if( m_log.is_open() )
02305 m_log << "OpenObject: New client admitted." << endl;
02306 #endif
02307 }
02308 else
02309 {
02310 if( s_mgr->m_log.is_open() )
02311 s_mgr->m_log << "OpenObject: Could not admit new "
02312 << "client" << endl;
02313
02314 #ifdef RIO_DEBUG2
02315 if( m_log.is_open() )
02316 m_log << "OpenObject: Could not admit new client "
02317 << endl;
02318 #endif
02319 if( s_mgr->m_log.is_open() )
02320 s_mgr->m_log << "Deleting StreamObj " << op << endl;
02321
02322 delete op;
02323 return -1;
02324 }
02325 }
02326 else
02327 {
02328 m_PosAtClientsList = s_mgr->InsertNewClient( m_RequestList,
02329 this );
02330 s_mgr->UpdateNumberOfWaitingClients(-1);
02331 pthread_mutex_unlock( &s_mgr->MutexRunCAC );
02332 }
02333
02334
02335 m_StartingToSend = true;
02336
02337 setMinimumBlockToSend(
02338 m_PosAtClientsList->nextBlockToBeRequested->block );
02339 m_FinishedMovie = false;
02340
02341
02342
02343 if( s_mgr->GetNumberOfBuffersForEachClient() > 0 )
02344 {
02345 m_Buffer = new ClientBuffer[
02346 s_mgr->GetNumberOfBuffersForEachClient()];
02347 if( m_Buffer == 0 )
02348 {
02349 RioErr << "Could not allocate buffer."<< endl;
02350 return -1;
02351 }
02352 else
02353 {
02354 for( int i = 0;
02355 i < s_mgr->GetNumberOfBuffersForEachClient();
02356 i++
02357 )
02358 {
02359 prefetchBlock( i,
02360 (EventDataRequest*)EventManager.New(
02361 EventTypeRTDataRequest ),
02362 op);
02363 m_nRequests++;
02364 updateNextBlockToBeFetched( NULL );
02365 }
02366 }
02367 }
02368 m_TimeToBuffer = Timer.CurrentTime();
02369
02370 }
02371 else
02372 {
02373 RioErr << "Could not find " << infoFileName
02374 << ", allocate memory or read from file."
02375 << "Not using new control admission and buffers."
02376 << endl;
02377
02378 pthread_mutex_unlock( &s_mgr->MutexRunCAC );
02379 return S_OK;
02380 }
02381 }
02382 return S_OK;
02383 }
02384
02385 if( s_mgr->m_log.is_open() )
02386 s_mgr->m_log << "Deleting StreamObj " << op << endl;
02387
02388 delete op;
02389 return rc;
02390 }
02391
02392
02393
02394
02395 void RioStream::getMutexUpdateEvent()
02396 {
02397 pthread_mutex_lock( &MutexUpdateEvent );
02398 }
02399
02400 void RioStream::releaseMutexUpdateEvent()
02401 {
02402 pthread_mutex_unlock( &MutexUpdateEvent );
02403 }
02404
02405
02406
02407
02408 void RioStream::prefetchBlock( int posAtBuffer,
02409 EventDataRequest* event,
02410 RioStreamObj *StreamObject )
02411 {
02412
02413 m_Buffer[posAtBuffer].blockInfo = m_PosAtClientsList->nextBlockToBeFetched;
02414
02415 #ifdef RIO_DEBUG2
02416 struct timeval time = Timer.CurrentTime();
02417 long double time_msec = time.tv_sec * 1000.0 + time.tv_usec/1000.0 ;
02418 if( m_log_requests.is_open() )
02419 {
02420 m_log_requests << m_Buffer[posAtBuffer].blockInfo->block << "\t\t"
02421 << m_Buffer[posAtBuffer].blockInfo->csystime << "\t\t"
02422 << time_msec - m_FirstArrivalTime << "\t\t"
02423 << ( time_msec - m_FirstArrivalTime ) -
02424 m_Buffer[posAtBuffer].blockInfo->csystime
02425 << endl;
02426 }
02427 #endif
02428
02429 m_Buffer[posAtBuffer].event = event;
02430 m_Buffer[posAtBuffer].event->Request.BufferId = -1;
02431
02432
02433 m_Buffer[posAtBuffer].event->Request.RequestCounter = -1;
02434 m_Buffer[posAtBuffer].event->Request.Reqid = 0;
02435 m_Buffer[posAtBuffer].event->Request.Target.IPaddress = 0;
02436 m_Buffer[posAtBuffer].event->Request.Target.Port = 0;
02437 m_Buffer[posAtBuffer].event->Request.Block =
02438 m_Buffer[posAtBuffer].blockInfo->block;
02439 m_Buffer[posAtBuffer].event->Request.Reps[0].block =
02440 m_Buffer[posAtBuffer].blockInfo->physicalblock;
02441 m_Buffer[posAtBuffer].event->Request.Reps[0].disk =
02442 m_Buffer[posAtBuffer].blockInfo->disk;
02443 m_Buffer[posAtBuffer].event->Request.RepBits = 0;
02444 m_Buffer[posAtBuffer].event->Request.Status = 0;
02445 m_Buffer[posAtBuffer].event->Request.Operation = RealTimePrefetchBlock;
02446 m_Buffer[posAtBuffer].event->Request.streamobj = StreamObject;
02447 m_Buffer[posAtBuffer].event->Request.rioobject = StreamObject->o_object;
02448
02449 StreamObject->m_PendingRequests++;
02450
02451 if( m_StartingToSend == true )
02452 {
02453 #ifdef RIO_DEBUG2
02454 if( m_log.is_open() )
02455 m_log << "prefetchBlock:starting prefetching block "
02456 << m_Buffer[posAtBuffer].event->Request.Block
02457 << " (m_nRequests " << m_nRequests << ")"
02458 << " m_ActiveRequests "
02459 << s_mgr->m_ActiveRequests + 1
02460 << endl;
02461 #endif
02462 s_mgr->m_Router->Put( (Event*)m_Buffer[posAtBuffer].event );
02463 s_mgr->m_ActiveRequests++;
02464 }
02465 else
02466 {
02467 if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) )
02468 {
02469 #ifdef RIO_DEBUG2
02470 if( m_log.is_open() )
02471 m_log << "prefetchBlock:prefetching block (Router) "
02472 << m_Buffer[posAtBuffer].event->Request.Block
02473 << " (m_nRequests " << m_nRequests << ")"
02474 << " m_ActiveRequests "
02475 << s_mgr->m_ActiveRequests + 1
02476 << endl;
02477 #endif
02478 s_mgr->m_Router->Put( (Event*)m_Buffer[posAtBuffer].event );
02479 s_mgr->m_ActiveRequests++;
02480 decTokensToDisk();
02481 }
02482 else
02483 {
02484 #ifdef RIO_DEBUG2
02485 if( m_log.is_open() )
02486 m_log << "prefetchBlock:prefetching block (RTQueue) "
02487 << m_Buffer[posAtBuffer].event->Request.Block
02488 << " (m_nRequests " << m_nRequests << ")"
02489 << endl;
02490 #endif
02491 RTHold( m_Buffer[posAtBuffer].event );
02492 }
02493 }
02494 }
02495
02496 void RioStream::cancelBlockAtBuffer( int positionAtBuffer )
02497 {
02498 getMutexUpdateEvent();
02499
02500 if( m_Buffer[positionAtBuffer].event != NULL )
02501 {
02502 m_Buffer[positionAtBuffer].event->Request.Operation=RealTimeCancelBlock;
02503 if( m_Buffer[positionAtBuffer].event->Request.BufferId != -1 )
02504 {
02505 releaseMutexUpdateEvent();
02506 decServerBufferStatus();
02507
02508 s_mgr->m_Router->Put( (Event*)m_Buffer[positionAtBuffer].event );
02509 #ifdef RIO_DEBUG2
02510 if( m_log.is_open() ) m_log << "cancelBlockAtBuffer: block "
02511 << m_Buffer[positionAtBuffer].blockInfo->block
02512 <<" canceled!"<< endl;
02513 #endif
02514 }
02515 else
02516 {
02517 releaseMutexUpdateEvent();
02518 #ifdef RIO_DEBUG2
02519 if( m_log.is_open() )
02520 m_log << "cancelBlockAtBuffer: setting cancel flag of block "
02521 << m_Buffer[positionAtBuffer].blockInfo->block << endl;
02522 #endif
02523 }
02524 m_Buffer[positionAtBuffer].event = NULL;
02525 }
02526 else
02527 {
02528 releaseMutexUpdateEvent();
02529 }
02530 }
02531
02532
02533
02534
02535 void RioStream::updateNextBlockToBeFetched( PSEQBLOCKLIST Next )
02536 {
02537 if( Next == NULL )
02538 {
02539 pthread_mutex_lock( &MutexNextBlockToBeFetched );
02540 if( m_PosAtClientsList->nextBlockToBeFetched->next == NULL )
02541 {
02542 #ifdef RIO_DEBUG2
02543 if( m_log.is_open() ) m_log << "updateNextBlockToBeFetched: prefetchAll true"<< endl;
02544 #endif
02545 m_PrefetchedAllBlocks = true;
02546 }
02547 else
02548 {
02549 m_PosAtClientsList->nextBlockToBeFetched =
02550 m_PosAtClientsList->nextBlockToBeFetched->next;
02551 }
02552 pthread_mutex_unlock( &MutexNextBlockToBeFetched );
02553 }
02554 else
02555 {
02556 pthread_mutex_lock( &MutexNextBlockToBeFetched );
02557 m_PosAtClientsList->nextBlockToBeFetched = Next;
02558 pthread_mutex_unlock( &MutexNextBlockToBeFetched );
02559 }
02560 #ifdef RIO_DEBUG2
02561 if( m_log.is_open() ) m_log << "updateNextBlockToBeFetched: "
02562 << m_PosAtClientsList->nextBlockToBeFetched->block<< endl;
02563 #endif
02564 }
02565
02566
02567
02568
02569 int RioStream::ClientCanStart()
02570 {
02571 struct timeval final_time;
02572 double time;
02573
02574
02575 #ifdef RIO_DEBUG2
02576 if( m_log.is_open() ) m_log << " Client asked 'can I start'? " << endl;
02577 #endif
02578 if( m_PosAtClientsList != NULL )
02579 {
02580 while(getServerBufferStatus()<s_mgr->GetNumberOfBuffersForEachClient())
02581 {
02582 pthread_mutex_lock( &MutexBlock );
02583 pthread_cond_wait( &ConditionBlockArrival, &MutexBlock );
02584 pthread_mutex_unlock( &MutexBlock );
02585 }
02586 final_time = Timer.CurrentTime();
02587 time = getInterval( m_TimeToBuffer, final_time );
02588 s_mgr->SetAverageBufferTime( time, this );
02589 m_TimeToBuffer.tv_sec =(u64)(( time * 1000.0 ) / 1000000 );
02590 m_TimeToBuffer.tv_usec =(u64)((u64) ( time * 1000.0 ) % 1000000 );
02591 #ifdef RIO_DEBUG2
02592 if( m_log.is_open() ) m_log << "Time to buffer " << time << endl;
02593 #endif
02594
02595
02596 m_StartingToSend = false;
02597
02598 m_TokensToDisk = s_mgr->GetBurstSizeOfEachClient();
02599
02600
02601 struct timeval clocknow = Timer.CurrentTime();
02602 m_TokensToDiskClock.tv_sec = clocknow.tv_sec;
02603 m_TokensToDiskClock.tv_usec = clocknow.tv_usec;
02604 }
02605 #ifdef RIO_DEBUG2
02606 if( m_log.is_open() ) m_log << " Yes. You can start now. m_TokensToDisk : "
02607 << m_TokensToDisk << endl;
02608 #endif
02609 return S_OK;
02610 }
02611
02612
02613
02614
02615 void RioStream::updateTokensToDisk( struct timeval time )
02616 {
02617 pthread_mutex_lock( &MutexTokensToDisk );
02618 while( ( (m_TokensToDiskClock.tv_sec < time.tv_sec ) ||
02619 ( ( m_TokensToDiskClock.tv_sec == time.tv_sec) &&
02620 (m_TokensToDiskClock.tv_usec < time.tv_usec )
02621 )
02622 )
02623 && (m_StartingToSend == false ))
02624 {
02625 m_TokensToDiskClock.tv_sec += m_ArrivalInterval.tv_sec;
02626 m_TokensToDiskClock.tv_usec += m_ArrivalInterval.tv_usec;
02627 if( m_TokensToDiskClock.tv_usec > 1000000 )
02628 {
02629 m_TokensToDiskClock.tv_sec += 1;
02630 m_TokensToDiskClock.tv_usec -= 1000000;
02631 }
02632
02633 if( m_TokensToDisk < s_mgr->GetBurstSizeOfEachClient() )
02634 {
02635 m_TokensToDisk++;
02636 }
02637 }
02638 pthread_mutex_unlock( &MutexTokensToDisk );
02639 #ifdef RIO_DEBUG2
02640 if( m_log.is_open() ) m_log << "TokensToDisk updated : " << m_TokensToDisk << endl;
02641 #endif
02642 }
02643
02644 void RioStream::decTokensToDisk()
02645 {
02646 pthread_mutex_lock( &MutexTokensToDisk );
02647 if( m_TokensToDisk > 0 )
02648 {
02649 m_TokensToDisk--;
02650 }
02651 pthread_mutex_unlock( &MutexTokensToDisk );
02652 #ifdef RIO_DEBUG2
02653 if( m_log.is_open() ) m_log << "TokensToDisk decremented : " << m_TokensToDisk << endl;
02654 #endif
02655 }
02656
02657
02658 int RioStream::getTokensToDisk()
02659 {
02660 return m_TokensToDisk;
02661 }
02662
02663
02664 struct timeval RioStream::getLastArrivalTime()
02665 {
02666 return m_LastArrivalTime;
02667 }
02668
02669 int RioStream::getClientBufferStatus()
02670 {
02671 if( m_StartingToSend )
02672 {
02673 return 0;
02674 }
02675 else
02676 {
02677 return m_ClientBufferSize - m_TokensToSend;
02678 }
02679 }
02680
02681 int RioStream::getPlayBlock()
02682 {
02683 if( m_StartingToSend )
02684 {
02685 return 0;
02686 }
02687 else
02688 {
02689 return(m_PosAtClientsList->nextBlockToBeRequested->block -
02690 getClientBufferSize());
02691 }
02692 }
02693
02694 int RioStream::getNumberOfBlocks()
02695 {
02696 return m_TotalBlocks;
02697 }
02698
02699 int RioStream::getClientBufferSize()
02700 {
02701 return m_ClientBufferSize;
02702 }
02703
02704
02705 int RioStream::getServerBufferStatus()
02706 {
02707 #ifdef RIO_DEBUG2
02708 if( m_log.is_open() ) m_log << " m_ServerBufferStatus " << m_ServerBufferStatus
02709 << " m_TokensToSend " << m_TokensToSend
02710 << endl;
02711 #endif
02712
02713 if( m_TokensToSend > 0 )
02714 return ( m_ServerBufferStatus - m_TokensToSend );
02715 else
02716 return m_ServerBufferStatus;
02717 }
02718
02719
02720 void RioStream::decServerBufferStatus()
02721 {
02722 pthread_mutex_lock( &MutexBlock );
02723 m_ServerBufferStatus--;
02724 pthread_mutex_unlock( &MutexBlock );
02725 #ifdef RIO_DEBUG2
02726 if( m_log.is_open() ) m_log << "BufferStatus decremented : " << m_ServerBufferStatus << endl;
02727 #endif
02728 }
02729
02730
02731 int RioStream::getNumberOfBuffersForEachClient()
02732 {
02733 return s_mgr->GetNumberOfBuffersForEachClient();
02734 }
02735
02736
02737 double RioStream::getAverageCACTime()
02738 {
02739 return s_mgr->GetAverageCACTime();
02740 }
02741
02742
02743 double RioStream::getEstimatedDiskServiceTime( int disk )
02744 {
02745 return s_mgr->GetEstimatedDiskServiceTime( disk );
02746 }
02747
02748 double RioStream::getEstimatedDiskResponseTime( int disk )
02749 {
02750 return s_mgr->GetEstimatedDiskResponseTime( disk );
02751 }
02752
02753 double RioStream::getEstimatedDiskQueueTime( int disk )
02754 {
02755 return s_mgr->GetEstimatedDiskQueueTime( disk );
02756 }
02757
02758 unsigned int RioStream::getBlockSize()
02759 {
02760 return s_mgr->GetBlockSize();
02761 }
02762
02763
02764 long double RioStream::getNetworkRate()
02765 {
02766 return s_mgr->GetNetworkRate();
02767 }
02768
02769
02770 int RioStream::getQueueSize()
02771 {
02772 return m_nQueue;
02773 }
02774
02775
02776 double RioStream::getRTTtoClient()
02777 {
02778 return m_AverageRTTtoClient_msec;
02779 }
02780
02781
02782
02783
02784 int RioStream::startingStream()
02785 {
02786 return m_StartingToSend;
02787 }
02788
02789
02790
02791 void RioStream::NewBlockArrival()
02792 {
02793 pthread_mutex_lock( &MutexBlock );
02794 m_ServerBufferStatus++;
02795 pthread_cond_broadcast(&ConditionBlockArrival);
02796 pthread_mutex_unlock( &MutexBlock );
02797 #ifdef RIO_DEBUG2
02798 if( m_log.is_open() ) m_log << "BufferStatus incremented : " << m_ServerBufferStatus << endl;
02799 #endif
02800 }
02801
02802
02803
02804 int RioStream::getTokensToSend()
02805 {
02806 return m_TokensToSend;
02807 }
02808
02809
02810 void RioStream::incTokensToSend()
02811 {
02812 pthread_mutex_lock( &MutexTokensToSend );
02813 m_TokensToSend++;
02814 pthread_mutex_unlock( &MutexTokensToSend );
02815 #ifdef RIO_DEBUG2
02816 if( m_log.is_open() ) m_log << "TokensToSend incremented : " << m_TokensToSend << endl;
02817 #endif
02818 }
02819
02820
02821 void RioStream::decTokensToSend()
02822 {
02823 pthread_mutex_lock( &MutexTokensToSend );
02824 m_TokensToSend--;
02825 pthread_mutex_unlock( &MutexTokensToSend );
02826 #ifdef RIO_DEBUG2
02827 if( m_log.is_open() ) m_log << "TokensToSend decremented : " << m_TokensToSend << endl;
02828 #endif
02829 }
02830
02831 void RioStream::setTokensToSend( int value )
02832 {
02833 pthread_mutex_lock( &MutexTokensToSend );
02834 m_TokensToSend = value;
02835 pthread_mutex_unlock( &MutexTokensToSend );
02836 #ifdef RIO_DEBUG2
02837 if( m_log.is_open() ) m_log << "TokensToSend set : " << m_TokensToSend << endl;
02838 #endif
02839 }
02840
02841
02842
02843
02844 u32 RioStream::getMaximumBlockToSend()
02845 {
02846 if( m_FinishedMovie == true )
02847 {
02848 return m_PosAtClientsList->nextBlockToBeRequested->block + 1;
02849 }
02850 else
02851 {
02852 return m_PosAtClientsList->nextBlockToBeRequested->block;
02853 }
02854 }
02855
02856 u32 RioStream::getMinimumBlockToSend()
02857 {
02858 return m_MinimumBlockToSend;
02859 }
02860
02861 void RioStream::incMinimumBlockToSend()
02862 {
02863 pthread_mutex_lock( &MutexMinBlock );
02864 m_MinimumBlockToSend++;
02865 pthread_mutex_unlock( &MutexMinBlock );
02866 #ifdef RIO_DEBUG2
02867 if( m_log.is_open() ) m_log << "MinimumBlockToSend incremented : " << m_MinimumBlockToSend
02868 << endl;
02869 #endif
02870 }
02871 void RioStream::setMinimumBlockToSend( u32 Block )
02872 {
02873 pthread_mutex_lock( &MutexMinBlock );
02874 m_MinimumBlockToSend = Block;
02875 pthread_mutex_unlock( &MutexMinBlock );
02876 #ifdef RIO_DEBUG2
02877 if( m_log.is_open() ) m_log << "MinimumBlockToSend set : " << m_MinimumBlockToSend << endl;
02878 #endif
02879 }
02880
02881
02882 bool RioStream::getPrefetchedAllBlocks()
02883 {
02884 return m_PrefetchedAllBlocks;
02885 }
02886
02887
02888
02889 PSEQBLOCKLIST RioStream::getInfoAboutRequestList( char *filename,
02890 RioObject* object )
02891 {
02892 FILE *file;
02893 PSEQBLOCKLIST curblock, prevblock, init_stream;
02894 struct timeval initial_time, final_time;
02895
02896 float duration = 0;
02897 uint block = 0;
02898 int rc = 0;
02899 int numreps;
02900 long double time = 0;
02901
02902 RioDiskBlock Reps[MaxReplications];
02903
02904 initial_time = Timer.CurrentTime();
02905
02906 m_FirstArrivalTime = initial_time.tv_sec * 1000.0 +
02907 initial_time.tv_usec / 1000.0;
02908
02909 uint SBufferSize = getNumberOfBuffersForEachClient();
02910 uint CBufferSize = m_ClientBufferSize;
02911
02912 uint buffersize = CBufferSize + SBufferSize;
02913
02914 #ifdef RIO_DEBUG2
02915 if( m_log.is_open() ) m_log <<"---------------------------------------------------------------- "<<endl;
02916 if( m_log.is_open() ) m_log <<" getInfoAboutRequestList "<<endl;
02917 if( m_log.is_open() ) m_log <<"---------------------------------------------------------------- "<<endl;
02918 if( m_log.is_open() ) m_log << "Getting disk service time info ... ";
02919 #endif
02920
02921 pthread_mutex_lock( &s_mgr->MutexUpdateMeasures );
02922 s_mgr->GetDiskServiceTime();
02923 s_mgr->UpdateDiskQueueAndServiceTime();
02924 pthread_mutex_unlock( &s_mgr->MutexUpdateMeasures );
02925
02926 #ifdef RIO_DEBUG2
02927 if( m_log.is_open() ) m_log << "OK." << endl;
02928 #endif
02929
02930 double msecRunCAC = getAverageCACTime();
02931 double NetworkRate = getNetworkRate();
02932
02933 unsigned int BlockSize = getBlockSize();
02934
02935 double timeToTransmit_msec = ( BlockSize/NetworkRate ) * 1000.0;
02936
02937 #ifdef RIO_DEBUG2
02938 if( m_log.is_open() ) m_log << "Client ID " << m_Id << endl;
02939 if( m_log.is_open() ) m_log << "SBufferSize " << SBufferSize << endl;
02940 if( m_log.is_open() ) m_log << "CBufferSize " << CBufferSize << endl;
02941 if( m_log.is_open() ) m_log << "RTT (msec) " << m_AverageRTTtoClient_msec << endl;
02942 if( m_log.is_open() ) m_log << "msecRunCAC " << msecRunCAC << endl;
02943 if( m_log.is_open() ) m_log << "NetworkRate " << NetworkRate << "B/sec" << endl;
02944 if( m_log.is_open() ) m_log << "BlockSize " << BlockSize << "Bytes" << endl;
02945 if( m_log.is_open() ) m_log << "timeToTransmit_msec " << timeToTransmit_msec << " msec" << endl;
02946 #endif
02947
02948
02949 if( (file = fopen( filename, "r" )) == NULL )
02950 {
02951 if( m_log.is_open() )
02952 m_log<<"getInfoAboutRequestList: "<<filename<<": File not found."<<endl;
02953 return NULL;
02954 }
02955
02956
02957 init_stream = prevblock = NULL;
02958
02959 float *req_time = (float *) malloc( buffersize * sizeof(float) );
02960 float *disk_block = (float *) malloc( buffersize * sizeof(float) );
02961
02962 int disks = s_mgr->m_MaxNumberOfDisks;
02963
02964 long double *disk_time = (long double*) malloc( disks * sizeof(long double) );
02965 for( int i = 0 ; i < disks ; i++ )
02966 disk_time[ i ] = msecRunCAC;
02967
02968 memset( req_time, 0, buffersize*sizeof(float) );
02969 memset( disk_block, 0, buffersize*sizeof(float) );
02970 memset( disk_time, 0, disks*sizeof(long double) );
02971
02972 #ifdef RIO_DEBUG2
02973 if( m_log.is_open() ) m_log << "getInfoAboutRequestList: Starting reading ... " << endl;
02974 #endif
02975
02976
02977 while( !feof( file ) )
02978 {
02979 if( fscanf( file, "%f\n", &duration ) == 1 )
02980 {
02981 duration = duration * 1000.0;
02982
02983 curblock = ( PSEQBLOCKLIST )malloc( sizeof( SEQBLOCKLIST ) );
02984 if( curblock != NULL )
02985 {
02986 curblock->id = m_Id;
02987 curblock->block = block;
02988 curblock->duration = duration;
02989
02990 rc = object->MapBlock( (RioBlock) block, 0, &numreps, Reps);
02991 if( rc )
02992 {
02993 RioErr << "getInfoAboutRequestList: Error at MapBlock"
02994 << endl;
02995 }
02996 else
02997 {
02998 curblock->disk = Reps[0].disk;
02999 curblock->physicalblock = Reps[0].block;
03000 }
03001
03002 curblock->next = NULL;
03003 curblock->prev = NULL;
03004
03005 if( block >= buffersize )
03006 {
03007 curblock->csystime = req_time[block % buffersize] + time;
03008 time += req_time[ block % buffersize ];
03009 }
03010 else
03011 {
03012 if( block < SBufferSize )
03013 {
03014
03015
03016 curblock->csystime = msecRunCAC;
03017 }
03018 else
03019 {
03020
03021
03022 curblock->csystime = msecRunCAC +
03023 m_AverageRTTtoClient_msec;
03024 }
03025
03026 if( disk_time[ curblock->disk ] < curblock->csystime )
03027 disk_time[ curblock->disk ] = curblock->csystime;
03028
03029 disk_time[ curblock->disk ] +=
03030 getEstimatedDiskServiceTime( curblock->disk );
03031
03032 disk_block[ block ] = disk_time[ curblock->disk ];
03033
03034 #ifdef RIO_DEBUG2
03035 if( m_log.is_open() ) m_log << " id " << curblock->id
03036 << " block " << curblock->block
03037 << " disk " << curblock->disk
03038 << " servicetime "
03039 << getEstimatedDiskServiceTime( curblock->disk )
03040 << " disk_time "
03041 << disk_time[ curblock->disk] << endl;
03042 #endif
03043
03044 if( (block + 1) == buffersize )
03045 {
03046 double maxdisktime = disk_time[ 0 ];
03047 for( int i = 1; i < disks; i++ )
03048 {
03049 if( disk_time [ i ] > maxdisktime )
03050 maxdisktime = disk_time[ i ];
03051 }
03052 double timetotx = msecRunCAC +
03053 m_AverageRTTtoClient_msec;
03054 for( uint i = 0; i < CBufferSize; i++ )
03055 {
03056 if( disk_block [ i ] > timetotx )
03057 timetotx = disk_block[ i ];
03058 timetotx += timeToTransmit_msec;
03059 }
03060
03061
03062
03063
03064 double maxtime;
03065 if( timetotx > maxdisktime )
03066 maxtime = timetotx;
03067 else
03068 maxtime = maxdisktime;
03069 time = maxtime + m_AverageRTTtoClient_msec;
03070
03071 #ifdef RIO_DEBUG2
03072 if( m_log.is_open() )
03073 {
03074 m_log << "maxdisktime " << maxdisktime << endl;
03075 m_log << "timetotx " << timetotx << endl;
03076 m_log << "max + RTT = time => " << time << endl;
03077 }
03078 #endif
03079 }
03080 }
03081 req_time[ block % buffersize ] = duration;
03082 }
03083 else
03084 {
03085 RioErr << "getInfoAboutRequestList: Could not allocate memory. "
03086 << endl;
03087
03088 free( req_time );
03089 free( disk_block );
03090 free( disk_time );
03091
03092 return NULL;
03093 }
03094 if( prevblock ) {
03095 curblock->prev = prevblock;
03096 prevblock = prevblock->next = curblock;
03097 }
03098 else
03099 prevblock = init_stream = curblock;
03100 duration = 0;
03101 }
03102 else
03103 {
03104 if( !feof( file ) )
03105 {
03106 RioErr << "getInfoAboutRequestList: Could not read from file."
03107 << endl;
03108
03109 free( req_time );
03110 free( disk_block );
03111 free( disk_time );
03112
03113 return NULL;
03114 }
03115 }
03116 block++;
03117 }
03118
03119 m_TotalBlocks = block;
03120
03121 fclose( file );
03122
03123 #ifdef RIO_DEBUG2
03124 if( m_log.is_open() ) m_log << "getInfoAboutRequestList: Closing file." << endl;
03125 #endif
03126
03127 final_time = Timer.CurrentTime();
03128 time = getInterval( initial_time, final_time );
03129 m_SampleGenReqListTime = time;
03130 s_mgr->SetAverageGenReqListTime( time , this );
03131
03132 #ifdef RIO_DEBUG2
03133 if( m_log.is_open() )
03134 {
03135 m_log<<"---------------------------------------------------------------- "<<endl;
03136 m_log<<"getInfoAboutRequestList Time = " << time << " msec " <<endl;
03137 m_log<<"---------------------------------------------------------------- "<<endl;
03138 }
03139 #endif
03140
03141 free( req_time );
03142 free( disk_block );
03143 free( disk_time );
03144
03145 return init_stream;
03146 }
03147
03148
03149 int CStreamManager::insertevent( PSEQEVENTLIST eventblock,
03150 EventTypeSimulation eventtype, int block,
03151 long double time_when )
03152 {
03153
03154 PSEQEVENTLIST auxblock = eventblock;
03155 PSEQEVENTLIST newevent;
03156
03157 while(( auxblock->csystime <= time_when ) && ( auxblock->next != NULL ))
03158 {
03159 auxblock = auxblock->next;
03160 }
03161
03162 newevent = ( PSEQEVENTLIST )malloc( sizeof( SEQEVENTLIST ) );
03163 if( newevent != NULL )
03164 {
03165 newevent->event = eventtype;
03166 newevent->id = eventblock->id;
03167 newevent->block = block;
03168 newevent->disk = eventblock->disk;
03169 newevent->csystime = time_when;
03170
03171 if( auxblock->csystime < time_when )
03172 {
03173 auxblock->next = newevent;
03174 newevent->prev = auxblock;
03175 newevent->next = NULL;
03176 }
03177 else
03178 {
03179 auxblock->prev->next = newevent;
03180 newevent->prev = auxblock->prev;
03181 auxblock->prev = newevent;
03182 newevent->next = auxblock;
03183 }
03184 }
03185 else
03186 {
03187 RioErr << "insertevent: Could not allocate memory" << endl;
03188 return -1;
03189 }
03190 return 1;
03191 }
03192
03193
03194 int CStreamManager::UpdateNumberOfWaitingClients(int client)
03195 {
03196 int queue;
03197 pthread_mutex_lock( &MutexUpdateNumberOfWaitingClients );
03198 queue = m_NumberOfWaitingClients;
03199 m_NumberOfWaitingClients += client;
03200 pthread_mutex_unlock( &MutexUpdateNumberOfWaitingClients );
03201 return queue;
03202 }
03203
03204
03205 PSTREAMLIST CStreamManager::CanAdmit( PSEQBLOCKLIST NewClient,
03206 RioStream* stream,
03207 double RTT, int clientbuffer )
03208 {
03209 struct timeval initial_time, final_time;
03210 long double timetosimul, when;
03211
03212
03213
03214 #ifdef RIO_DEBUG2
03215 if( m_log.is_open() )
03216 {
03217 m_log<<"---------------------------------------------------------------- "<<endl;
03218 m_log<<" CanAdmit Client " << stream->m_Id <<endl;
03219 m_log<<"---------------------------------------------------------------- "<<endl;
03220 }
03221 #endif
03222
03223 initial_time = Timer.CurrentTime();
03224
03225 PSTREAMLIST PositionAtClientsList = NULL;
03226
03227 #ifdef RIO_DEBUG2
03228 if( m_log.is_open() )
03229 {
03230 m_log<<"OpenObject: VBR Traffic: Running Control Admission Algorithm ..."<<endl;
03231 m_log<< "CanAdmit: Can Admit ? " << endl;
03232 }
03233 #endif
03234
03235 if( m_NumberOfAdmittedClients > 0 )
03236 {
03237
03238 int disks = m_MaxNumberOfDisks;
03239 PSEQEVENTLIST EventList;
03240
03241 unsigned int *disk_queue = (unsigned int*) malloc( disks * sizeof(unsigned int) );
03242 unsigned int *max_disk_queue = (unsigned int*) malloc( disks * sizeof(unsigned int) );
03243
03244 long double *disk_time = (long double*) malloc( disks * sizeof(long double) );
03245 double timeToTransmit_msec=( m_BlockSize/m_NetworkRate )*1000.0;
03246 long double last_tx = 0;
03247 double interval;
03248 int NumberOfEvents = 0;
03249 bool CanNotAdmit = false;
03250 int eventToDisk, eventToClient, blockToClient, blockToDisk;
03251
03252
03253 memset( ClientBuffer, -1, sizeof( ClientBuffer ));
03254 memset( ServerBuffer, 0, sizeof( ServerBuffer ));
03255 memset( PlayBlock, 0, sizeof( PlayBlock ));
03256 memset( TotalBlocks, 0, sizeof( TotalBlocks ));
03257 memset( NextBlockToSend, 0, sizeof( NextBlockToSend ));
03258 memset( NHiccups, 0, sizeof( NHiccups ));
03259 memset( NReqWhenEmptyBuffer, 0, sizeof( NReqWhenEmptyBuffer ));
03260 memset( Initial_time, 0, sizeof( Initial_time ));
03261 memset( TimeBufferEmpty, 0, sizeof( TimeBufferEmpty ));
03262 memset( MaxIntervalBufferEmpty, 0, sizeof( MaxIntervalBufferEmpty ));
03263 memset( disk_queue, 0, disks*sizeof(unsigned int) );
03264 memset( max_disk_queue, 0, disks*sizeof(unsigned int) );
03265 memset( disk_time, 0, disks*sizeof(long double) );
03266
03267 PositionAtClientsList = InsertNewClient( NewClient, stream );
03268
03269 #ifdef RIO_DEBUG2
03270 if( m_log.is_open() ) m_log << "CanAdmit: sorting all streams ... ";
03271 #endif
03272
03273 pthread_mutex_lock( &MutexModifyClientsList );
03274 EventList = sortAllStreamsGeneratingOneStream( m_AdmittedClientsList,
03275 stream->m_Id );
03276 pthread_mutex_unlock( &MutexModifyClientsList );
03277
03278 initial_time = Timer.CurrentTime();
03279 blockToClient = 0;
03280 blockToDisk = 0;
03281
03282 #ifdef RIO_DEBUG2
03283 if( m_log.is_open() ) m_log << "CanAdmit: starting simulation. " << endl;
03284 #endif
03285
03286
03287 while(( EventList != NULL ) && ( CanNotAdmit == false ))
03288 {
03289 eventToDisk = eventToClient = -1;
03290 NumberOfEvents++;
03291
03292 if( NumberOfEvents % 50000 == 0 )
03293 if( m_log.is_open() ) m_log << "CanAdmit: NumberOfEvents -- "<< NumberOfEvents
03294 << " triggers"<< endl;
03295 switch ( EventList->event )
03296 {
03297 case RequestArrival:
03298 case RequestFullBuffer:
03299 case RequestFullClientBuffer:
03300 if( ( EventList->event == RequestArrival ) ||
03301 ( EventList->event == RequestFullClientBuffer ))
03302 {
03303 if( ServerBuffer [ EventList->id ] > 0 )
03304 {
03305
03306 eventToClient = BlockArrival;
03307 #ifdef RIO_DEBUG2
03308 blockToClient = NextBlockToSend[ EventList->id ] ;
03309 NextBlockToSend[ EventList->id ]++;
03310 #endif
03311 }
03312
03313 ServerBuffer [ EventList->id ]--;
03314 }
03315
03316 if( EventList->event == RequestArrival )
03317 {
03318 #ifdef RIO_DEBUG2
03319
03320 PlayBlock[ EventList->id ]++;
03321 #endif
03322
03323
03324 ClientBuffer[ EventList->id ]--;
03325 if( ClientBuffer[ EventList->id ] < 0 )
03326 {
03327 NReqWhenEmptyBuffer[ EventList->id ]++;
03328 }
03329 if( ClientBuffer[ EventList->id ] == -1 )
03330 {
03331 NHiccups[ EventList->id ]++;
03332 Initial_time[ EventList->id ] = EventList->csystime;
03333
03334 if( ( NHiccups[ EventList->id ] * 100.0 / TotalBlocks[EventList->id] ) >
03335 ( m_NumberOfEmptyTimes * 1.0 ))
03336 {
03337 CanNotAdmit = true;
03338 break;
03339 }
03340 }
03341 }
03342
03343 eventToDisk = DiskService;
03344 blockToDisk = EventList->block;
03345 break;
03346
03347 case BlockArrival:
03348
03349 ClientBuffer [ EventList->id ]++;
03350
03351 if( ClientBuffer [ EventList->id ] == 0 )
03352 {
03353 interval = EventList->csystime - Initial_time[ EventList->id ];
03354
03355 TimeBufferEmpty[ EventList->id ] += interval;
03356
03357 if( MaxIntervalBufferEmpty[ EventList->id ] < interval )
03358 {
03359 MaxIntervalBufferEmpty[ EventList->id ] = interval;
03360 }
03361
03362 if( MaxIntervalBufferEmpty[ EventList->id ] > m_MaxIntervalEmpty )
03363 {
03364 CanNotAdmit = true;
03365 break;
03366 }
03367 }
03368 break;
03369
03370 case DiskService:
03371
03372 if( ServerBuffer [ EventList->id ] < 0 )
03373 {
03374 eventToClient = BlockArrival;
03375 blockToClient = NextBlockToSend[ EventList->id ] ;
03376 NextBlockToSend[ EventList->id ]++;
03377 }
03378 ServerBuffer [ EventList->id ]++;
03379 disk_queue[ EventList->disk ]--;
03380 break;
03381
03382 default:
03383 RioErr << "Invalid event at simulation. "<< endl;
03384 }
03385 if( CanNotAdmit == true )
03386 break;
03387
03388 if( eventToClient != -1 )
03389 {
03390 if( EventList->csystime < last_tx )
03391 last_tx += timeToTransmit_msec;
03392 else
03393 last_tx = EventList->csystime + timeToTransmit_msec;
03394 when = last_tx + ClientRTT[ EventList->id ];
03395
03396 if( insertevent( EventList, (EventTypeSimulation) eventToClient,
03397 blockToClient, when ) == -1 )
03398 {
03399 RioErr <<"processevent: Could not insert event." << endl;
03400
03401 free( disk_queue );
03402 free( max_disk_queue );
03403 free( disk_time );
03404
03405 return NULL;
03406 }
03407 }
03408 if( eventToDisk != -1 )
03409 {
03410 if( disk_time[ EventList->disk ] > EventList->csystime )
03411 when = disk_time[ EventList->disk ];
03412 else
03413 when = EventList->csystime;
03414 when += GetDiskServiceTime( EventList->disk,
03415 disk_queue[ EventList->disk ] );
03416 disk_queue[ EventList->disk ]++;
03417
03418 disk_time[ EventList->disk ] = when;
03419 if( disk_queue[ EventList->disk ] >
03420 max_disk_queue[ EventList->disk ]
03421 )
03422 {
03423 max_disk_queue[ EventList->disk ] =
03424 disk_queue[ EventList->disk ];
03425 }
03426 if( insertevent( EventList, (EventTypeSimulation) eventToDisk,
03427 blockToDisk, when ) == -1 )
03428 {
03429 RioErr <<"processevent: Could not insert event." << endl;
03430
03431 free( disk_queue );
03432 free( max_disk_queue );
03433 free( disk_time );
03434
03435 return NULL;
03436 }
03437 }
03438
03439 if( EventList->next != NULL )
03440 {
03441 EventList = EventList->next;
03442 if( EventList->prev != NULL )
03443 {
03444 free( EventList->prev );
03445 EventList->prev = NULL;
03446 }
03447 }
03448 else
03449 {
03450 free( EventList );
03451 EventList = NULL;
03452 }
03453 }
03454 if( m_log.is_open() ) m_log << "CanAdmit: Simulation finished. Number of events "
03455 << NumberOfEvents << endl;
03456
03457 final_time = Timer.CurrentTime();
03458 timetosimul = getInterval( initial_time, final_time );
03459 stream->m_SampleSimulationTime = timetosimul;
03460 SetAverageSimulationTime( timetosimul, stream );
03461 if( m_log.is_open() ) m_log << "CanAdmit: Time (only cac) = " << timetosimul << " msec" << endl;
03462
03463 if( CanNotAdmit == true )
03464 {
03465 if( m_log.is_open() ) m_log << "Could not admit new client: Client " << EventList->id
03466 << " TotalBlocks " << TotalBlocks[EventList->id]
03467 << " Hiccups " << NHiccups[ EventList->id ]
03468 << "(" << NHiccups[ EventList->id ]*100.0/TotalBlocks[EventList->id]
03469 <<" %)"
03470 << " QoS parameter " << m_NumberOfEmptyTimes * 1.0
03471 << " % TimeBufferEmpty " << TimeBufferEmpty[ EventList->id ]
03472 << " MaxIntervalBufferEmpty " << MaxIntervalBufferEmpty[ EventList->id ]
03473 << " NReqWhenEmptyBuffer " << NReqWhenEmptyBuffer[ EventList->id ]
03474 << endl;
03475
03476 #ifdef RIO_DEBUG2
03477 if( m_log.is_open() ) m_log << "DiskQueue: " ;
03478 for(int i = 1; i < disks; i++ )
03479 {
03480 if( max_disk_queue[ i ] != 0 )
03481 {
03482 if( m_log.is_open() ) m_log << " D_" << i << "=" << max_disk_queue[ i ];
03483 }
03484 }
03485 if( m_log.is_open() ) m_log << endl;
03486 #endif
03487
03488 while( EventList != NULL )
03489 {
03490 if( EventList->next != NULL )
03491 {
03492 EventList = EventList->next;
03493 if( EventList->prev != NULL )
03494 {
03495 free( EventList->prev );
03496 EventList->prev = NULL;
03497 }
03498 }
03499 else
03500 {
03501 free( EventList );
03502 EventList = NULL;
03503 }
03504 }
03505 RemoveClient( PositionAtClientsList );
03506
03507
03508 free( disk_queue );
03509 free( max_disk_queue );
03510 free( disk_time );
03511
03512 return NULL;
03513 }
03514
03515 #ifdef RIO_DEBUG2
03516 if( m_log.is_open() ) m_log << "DiskQueue: " ;
03517 for(int i = 1; i < disks; i++ )
03518 {
03519 if( max_disk_queue[ i ] != 0 )
03520 {
03521 if( m_log.is_open() ) m_log << " D_" << i << "=" << max_disk_queue[ i ];
03522 }
03523 }
03524 if( m_log.is_open() ) m_log << endl;
03525
03526
03527 for( uint i = 0; i < 1001; i++ )
03528 {
03529 if( ClientBuffer[ i ] != -1 )
03530 {
03531 if( m_log.is_open() ) m_log << " Client " << i
03532 << " TotalBlocks " << TotalBlocks[i]
03533 << " Hiccups " << NHiccups[ i ]
03534 << "(" << NHiccups[ i ]*100.0/TotalBlocks[ i ] <<" %)"
03535 << " QoS parameter % " << m_NumberOfEmptyTimes * 1.0
03536 << " TimeBufferEmpty " << TimeBufferEmpty[ i ]
03537 << " MaxIntervalBufferEmpty " << MaxIntervalBufferEmpty[ i ]
03538 << " NReqWhenEmptyBuffer " << NReqWhenEmptyBuffer[ i ]
03539 << endl;
03540 }
03541 }
03542 #endif
03543 }
03544 else
03545 {
03546 PositionAtClientsList = InsertNewClient( NewClient, stream );
03547 }
03548
03549 if( PositionAtClientsList != NULL ) {
03550 PositionAtClientsList->nextBlockToBeFetched = NewClient;
03551 }
03552 if( m_log.is_open() ) m_log << "CanAdmit: Number of admitted clients " << m_NumberOfAdmittedClients << endl ;
03553
03554
03555 return PositionAtClientsList;
03556 }
03557
03558
03559 PSTREAMLIST CStreamManager::InsertNewClient( PSEQBLOCKLIST NewClient,
03560 RioStream* stream )
03561 {
03562 pthread_mutex_lock( &MutexModifyClientsList );
03563
03564 PSEQBLOCKLIST reqlistaux;
03565 PSTREAMLIST curlist, prevlist;
03566
03567 prevlist = m_AdmittedClientsList;
03568 while(( prevlist != NULL ) && ( prevlist->next != NULL ))
03569 {
03570 prevlist = prevlist->next;
03571 }
03572
03573 reqlistaux = NewClient;
03574 curlist = ( PSTREAMLIST )malloc( sizeof( STREAMLIST ) );
03575 if( curlist )
03576 {
03577 curlist->stream = stream;
03578 curlist->reqlist = reqlistaux;
03579 curlist->next = NULL;
03580 curlist->nextBlockToBeRequested = reqlistaux;
03581 curlist->nextBlockToBeFetched = reqlistaux;
03582
03583 curlist->aux = reqlistaux;
03584 curlist->auxnext = NULL;
03585
03586 if( prevlist )
03587 {
03588 curlist->prev = prevlist;
03589 prevlist->next = curlist;
03590
03591 curlist->auxprev = prevlist;
03592 prevlist->auxnext = curlist;
03593 }
03594 else
03595 {
03596 m_AdmittedClientsList = curlist;
03597 curlist->prev = NULL;
03598 curlist->auxprev = NULL;
03599 }
03600 m_NumberOfAdmittedClients++;
03601 }
03602 else
03603 {
03604 #ifdef RIO_DEBUG2
03605 if( m_log.is_open() ) m_log << "InsertNewClient: Could not insert new client!" << endl;
03606 #endif
03607 return NULL;
03608 }
03609
03610 #ifdef RIO_DEBUG2
03611 if( m_log.is_open() ) m_log << "InsertNewClient: OK!" << endl;
03612 #endif
03613 pthread_mutex_unlock( &MutexModifyClientsList );
03614
03615 return curlist;
03616 }
03617
03618 void CStreamManager::RemoveClient( PSTREAMLIST ClientPosition )
03619 {
03620 #ifdef RIO_DEBUG2
03621 if( m_log.is_open() ) m_log << "Removing Client: " << ClientPosition->stream->GetId();
03622 #endif
03623
03624 pthread_mutex_lock( &MutexModifyClientsList );
03625
03626 m_NumberOfAdmittedClients--;
03627
03628
03629 if( ClientPosition == m_AdmittedClientsList )
03630 {
03631 m_AdmittedClientsList = ClientPosition->next;
03632
03633
03634 if( m_AdmittedClientsList != NULL )
03635 {
03636 m_AdmittedClientsList->prev = NULL;
03637 m_AdmittedClientsList->auxprev = NULL;
03638
03639 }
03640 }
03641 else
03642 {
03643 if( ClientPosition->next != NULL )
03644 {
03645 ClientPosition->next->prev = ClientPosition->prev;
03646 ClientPosition->next->auxprev = ClientPosition->prev;
03647 }
03648 ClientPosition->prev->next = ClientPosition->next;
03649 ClientPosition->prev->auxnext = ClientPosition->next;
03650 }
03651
03652 PSEQBLOCKLIST current = ClientPosition->reqlist;
03653 while( current != NULL )
03654 {
03655 if( current->next != NULL )
03656 {
03657 current = current->next;
03658 if( current->prev != NULL )
03659 {
03660 free( current->prev );
03661 current->prev = NULL;
03662
03663 }
03664 }
03665 else
03666 {
03667 free( current );
03668 current = NULL;
03669 }
03670 }
03671
03672 free( ClientPosition );
03673 ClientPosition = NULL;
03674
03675 pthread_mutex_unlock( &MutexModifyClientsList );
03676
03677 #ifdef RIO_DEBUG2
03678 if( m_log.is_open() ) m_log << " OK!" << endl;
03679 #endif
03680 }
03681
03682
03683 PSEQEVENTLIST CStreamManager::sortAllStreamsGeneratingOneStream(
03684 PSTREAMLIST stream_reqlist,
03685 uint newclientid )
03686 {
03687 PSEQEVENTLIST l_list = NULL;
03688 PSEQEVENTLIST curblock = NULL;
03689 PSEQEVENTLIST prevblock = NULL;
03690 PSTREAMLIST auxStreamList;
03691 PSTREAMLIST auxStream;
03692 PSTREAMLIST original_stream_reqlist;
03693 RioStream *stream = NULL;
03694
03695 struct timeval serverclock = Timer.CurrentTime();
03696 long double servertime = serverclock.tv_sec * 1000.0 + serverclock.tv_usec/1000.0;
03697 #ifdef RIO_DEBUG2
03698 if( m_log.is_open() ) m_log << "sortAllStreamsGeneratingOneStream: servertime now " << servertime << endl;
03699 #endif
03700 struct timeval streamlastarrivaltime;
03701 long double streamlastarrival, streamtime;
03702
03703 struct timeval initial_time, final_time;
03704 long double time;
03705
03706 initial_time = serverclock;
03707
03708
03709
03710
03711
03712 long double *now = (long double*) malloc(1001*sizeof(long double));
03713 memset( now, 0 , 1001*sizeof(long double));
03714
03715 original_stream_reqlist = auxStreamList = stream_reqlist;
03716
03717 while( auxStreamList )
03718 {
03719 auxStreamList->aux = auxStreamList->nextBlockToBeFetched;
03720
03721 #ifdef RIO_DEBUG2
03722 if( m_log.is_open() ) m_log << "sortAll: id " << auxStreamList->stream->GetId();
03723 #endif
03724
03725 if( auxStreamList->stream->getPrefetchedAllBlocks() )
03726 {
03727 #ifdef RIO_DEBUG2
03728 if( m_log.is_open() ) m_log << " finished!" << endl;
03729 #endif
03730
03731
03732 if( auxStreamList != stream_reqlist )
03733 {
03734 auxStreamList->auxprev->auxnext = auxStreamList->auxnext;
03735
03736
03737 if( auxStreamList->auxnext != NULL )
03738 auxStreamList->auxnext->auxprev = auxStreamList->auxprev;
03739 }
03740 else
03741 {
03742
03743
03744 stream_reqlist = auxStreamList->auxnext;
03745 }
03746 }
03747 else
03748 {
03749 #ifdef RIO_DEBUG2
03750 if( m_log.is_open() )
03751 m_log << " nbtbf " << auxStreamList->aux->block;
03752 if( auxStreamList->aux->prev != NULL )
03753 {
03754 if( m_log.is_open() )
03755 m_log << " prev time " << auxStreamList->aux->prev->csystime;
03756 }
03757 #endif
03758
03759
03760 if( auxStreamList->aux->id != newclientid )
03761 {
03762 streamlastarrivaltime = auxStreamList->stream->getLastArrivalTime();
03763 streamlastarrival = streamlastarrivaltime.tv_sec * 1000.0 + streamlastarrivaltime.tv_usec / 1000.0;
03764 if( auxStreamList->aux->prev != NULL )
03765 {
03766 streamtime = auxStreamList->aux->prev->csystime +
03767 ( servertime - streamlastarrival );
03768 if( auxStreamList->aux->csystime < streamtime )
03769 {
03770
03771 streamtime = auxStreamList->aux->csystime;
03772 #ifdef RIO_DEBUG2
03773 if( m_log.is_open() ) m_log << " client paused.";
03774 #endif
03775 }
03776 }
03777 else
03778 streamtime = 0;
03779
03780 #ifdef RIO_DEBUG2
03781 if( m_log.is_open() )
03782 {
03783 m_log << " ( lastarrival " << streamlastarrival<< " ). "
03784 << "streamtime " << streamtime << endl;
03785 m_log << " nbtbf in "
03786 << auxStreamList->aux->csystime - streamtime << endl;
03787 }
03788 #endif
03789 now[ auxStreamList->aux->id ] = streamtime;
03790 PlayBlock[ auxStreamList->aux->id ] =
03791 auxStreamList->stream->getPlayBlock();
03792 }
03793 else
03794 {
03795 stream = auxStreamList->stream;
03796 now[ newclientid ] = 0;
03797 PlayBlock[ auxStreamList->aux->id ] = 0;
03798 #ifdef RIO_DEBUG2
03799 if( m_log.is_open() ) m_log << endl;
03800 #endif
03801 }
03802
03803
03804 TotalBlocks[ auxStreamList->aux->id ] =
03805 auxStreamList->stream->getNumberOfBlocks();
03806 ClientBuffer[ auxStreamList->aux->id ] =
03807 auxStreamList->stream->getClientBufferSize();
03808 ServerBuffer[ auxStreamList->aux->id ] =
03809 m_NumberOfBuffersForEachClient;
03810 ClientRTT[ auxStreamList->aux->id] =
03811 auxStreamList->stream->getRTTtoClient();
03812 NextBlockToSend[ auxStreamList->aux->id ] =
03813 auxStreamList->nextBlockToBeRequested->block;
03814
03815 #ifdef RIO_DEBUG2
03816 if( m_log.is_open() )
03817 m_log << "sortAll: Client " << auxStreamList->aux->id
03818 << " RTT " << ClientRTT[ auxStreamList->aux->id ]
03819 << " CB Size " << auxStreamList->stream->getClientBufferSize()
03820 << " SB Status " << ServerBuffer[ auxStreamList->aux->id ]
03821 << " CB Status " << ClientBuffer[ auxStreamList->aux->id ]
03822 << " PlayBlock " << PlayBlock[ auxStreamList->aux->id ]
03823 << " NextBlockToSend " << NextBlockToSend[ auxStreamList->aux->id ]
03824 << " now " << now[ auxStreamList->aux->id ]
03825 << endl;
03826 #endif
03827 }
03828
03829 auxStreamList = auxStreamList->next;
03830 }
03831
03832
03833 auxStreamList = stream_reqlist;
03834 l_list = prevblock = NULL;
03835
03836 int NumberOfEvents = 0;
03837
03838
03839
03840
03841 while( auxStreamList )
03842 {
03843 auxStream = firstRequestOfAllStreams( auxStreamList, now );
03844 if( auxStream == NULL )
03845 {
03846 RioErr << "sortAllStreamsGeneratingOneStream: Could not get "
03847 << "the minimum value." << endl;
03848
03849 free( now );
03850
03851 return l_list;
03852 }
03853 curblock = ( PSEQEVENTLIST )malloc( sizeof( SEQEVENTLIST ) );
03854 if( curblock != NULL )
03855 {
03856 if( auxStream->aux->block < (u32) m_NumberOfBuffersForEachClient )
03857 {
03858 curblock->event = RequestFullBuffer;
03859 ServerBuffer[ auxStream->aux->id ]--;
03860 }
03861 else if( auxStream->aux->block < ((u32 )
03862 (m_NumberOfBuffersForEachClient+auxStream->stream->getClientBufferSize())))
03863 {
03864 curblock->event = RequestFullClientBuffer;
03865 ClientBuffer[ auxStream->aux->id ]--;
03866 }
03867 else
03868 {
03869 curblock->event = RequestArrival;
03870 }
03871
03872 curblock->id = auxStream->aux->id;
03873 curblock->block = auxStream->aux->block;
03874 curblock->disk = auxStream->aux->disk;
03875 curblock->csystime = auxStream->aux->csystime -
03876 now[auxStream->stream->GetId()];
03877 curblock->next = NULL;
03878 curblock->prev = NULL;
03879 NumberOfEvents++;
03880 auxStream->aux = auxStream->aux->next;
03881
03882
03883
03884 if( ( auxStream->aux == NULL ) && ( auxStream != auxStreamList ) )
03885 {
03886 auxStream->auxprev->auxnext = auxStream->auxnext;
03887
03888
03889 if( auxStream->auxnext != NULL )
03890 auxStream->auxnext->auxprev = auxStream->auxprev;
03891 }
03892 else
03893 {
03894
03895
03896 if( ( auxStream->aux == NULL ) && ( auxStream == auxStreamList ) )
03897 auxStreamList = auxStream->auxnext;
03898 }
03899 }
03900 else {
03901 RioErr << "sortAllStreamsGeneratingOneStream: Could not "
03902 << "allocate memory " << endl;
03903
03904 free( now );
03905
03906 return NULL;
03907 }
03908 if( prevblock )
03909 {
03910 curblock->prev = prevblock;
03911 prevblock = prevblock->next = curblock;
03912 }
03913 else
03914 {
03915 l_list = prevblock = curblock;
03916 }
03917 }
03918
03919
03920
03921 auxStreamList = original_stream_reqlist;
03922 while( auxStreamList ) {
03923 auxStreamList->aux = auxStreamList->reqlist;
03924 auxStreamList->auxnext = auxStreamList->next;
03925 auxStreamList->auxprev = auxStreamList->prev;
03926 auxStreamList = auxStreamList->next;
03927 }
03928
03929 final_time = Timer.CurrentTime();
03930 time = getInterval( initial_time, final_time );
03931 stream->m_SampleSortListTime = time;
03932 SetAverageSortListTime( time, stream );
03933 #ifdef RIO_DEBUG2
03934 if( m_log.is_open() )
03935 m_log << "sortAll: Finished" << endl;
03936 #endif
03937 if( m_log.is_open() )
03938 m_log << "(NumberOfEvents = "<< NumberOfEvents <<")" << endl;
03939
03940 free( now );
03941
03942 return l_list;
03943 }
03944
03945
03946
03947
03948 PSTREAMLIST CStreamManager::firstRequestOfAllStreams(
03949 PSTREAMLIST stream_reqlist,
03950 long double *now )
03951 {
03952 PSTREAMLIST auxStream, minStream;
03953 double min = MAXDOUBLE;
03954
03955 minStream = NULL;
03956 auxStream = stream_reqlist;
03957 if( auxStream )
03958 {
03959
03960 min = auxStream->aux->csystime - now[ auxStream->aux->id ];
03961 minStream = auxStream;
03962 auxStream = auxStream->auxnext;
03963
03964 while( auxStream )
03965 {
03966
03967 if( min >= ( auxStream->aux->csystime - now[ auxStream->aux->id] ) )
03968 {
03969
03970 min = auxStream->aux->csystime - now[ auxStream->aux->id] ;
03971 minStream = auxStream;
03972 }
03973 auxStream = auxStream->auxnext;
03974 }
03975 }
03976 return minStream;
03977 }
03978
03979
03980
03981 PSTREAMLIST CStreamManager::GetAdmittedClientsList()
03982 {
03983 return m_AdmittedClientsList;
03984 }
03985
03986
03987 int CStreamManager::GetNumberOfBuffersForEachClient()
03988 {
03989 return m_NumberOfBuffersForEachClient;
03990 }
03991
03992
03993
03994
03995 int CStreamManager::GetBurstSizeOfEachClient()
03996 {
03997 return m_BurstSizeOfEachClient;
03998 }
03999
04000 double CStreamManager::GetAverageCACTime()
04001 {
04002 return m_AverageCACTime;
04003 }
04004
04005
04006 void CStreamManager::SetAverageGenReqListTime( double sample, RioStream *stream )
04007 {
04008 double average;
04009
04010 average = m_AverageGenReqListTime;
04011
04012 m_AverageGenReqListTime = m_AverageGenReqListTime +
04013 (( sample - m_AverageGenReqListTime )/( m_NumberOfSamples + 1 ));
04014
04015 if( m_NumberOfSamples >= 1 )
04016 {
04017 m_VarianceGenReqListTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceGenReqListTime
04018 + (m_NumberOfSamples + 1) * pow((m_AverageGenReqListTime - average), 2);
04019 }
04020
04021 if( m_CollectMeasures )
04022 {
04023 if( m_logSGENREQLT.is_open() )
04024 m_logSGENREQLT << stream->m_Id << "\t" << sample << endl;
04025 if( m_logEGENREQLT.is_open() )
04026 m_logEGENREQLT << stream->m_Id << "\t" << m_AverageGenReqListTime
04027 << "\t# " << m_VarianceGenReqListTime<< endl;
04028 }
04029 }
04030
04031
04032 void CStreamManager::SetAverageSortListTime( double sample, RioStream *stream )
04033 {
04034 double average;
04035
04036 average = m_AverageSortListTime;
04037
04038 m_AverageSortListTime = m_AverageSortListTime +
04039 (( sample - m_AverageSortListTime )/( m_NumberOfSamples + 1 ));
04040
04041 if( m_NumberOfSamples >= 1 )
04042 {
04043 m_VarianceSortListTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceSortListTime
04044 + (m_NumberOfSamples + 1) * pow((m_AverageSortListTime - average), 2);
04045 }
04046
04047 average = m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ];
04048
04049 m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] = m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] +
04050 (( sample - m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] )/( m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] + 1 ));
04051
04052 if( m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] >= 1 )
04053 {
04054 m_VarianceSortListTimeAccClients[ m_NumberOfAdmittedClients ] = (1 - (float) 1/m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] ) * m_VarianceSortListTimeAccClients[ m_NumberOfAdmittedClients ]
04055 + (m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] + 1) * pow((m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] - average), 2);
04056 }
04057 m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ]++;
04058
04059 if( m_CollectMeasures )
04060 {
04061 if( m_logSSORTLT.is_open() )
04062 m_logSSORTLT << stream->m_Id << "\t" << sample << endl;
04063 if( m_logESORTLT.is_open() )
04064 m_logESORTLT << stream->m_Id << "\t" << m_AverageSortListTime
04065 << "\t# "<< m_VarianceSortListTime << endl;
04066 }
04067 }
04068
04069
04070 void CStreamManager::SetAverageBufferTime( double sample, RioStream *stream )
04071 {
04072 double average;
04073
04074 average = m_AverageBufferTime;
04075
04076 m_AverageBufferTime = m_AverageBufferTime +
04077 (( sample - m_AverageBufferTime )/( m_NumberOfSamples + 1 ));
04078
04079 if( m_NumberOfSamples >= 1 )
04080 {
04081 m_VarianceBufferTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceBufferTime
04082 + (m_NumberOfSamples + 1) * pow((m_AverageBufferTime - average), 2);
04083 }
04084
04085 average = m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ];
04086
04087 m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] = m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] +
04088 (( sample - m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] )/( m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] + 1 ));
04089
04090 if( m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] >= 1 )
04091 {
04092 m_VarianceBufferTimeAccClients[ m_NumberOfAdmittedClients ] = (1 - (float) 1/m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] ) * m_VarianceBufferTimeAccClients[ m_NumberOfAdmittedClients ]
04093 + (m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] + 1) * pow((m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] - average), 2);
04094 }
04095
04096 m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ]++;
04097
04098 if( m_CollectMeasures )
04099 {
04100 if( m_logSBUFFERT.is_open() )
04101 m_logSBUFFERT << stream->m_Id << "\t" << sample << endl;
04102 if( m_logEBUFFERT.is_open() )
04103 m_logEBUFFERT << stream->m_Id << "\t" << m_AverageBufferTime
04104 <<"\t# "<< m_VarianceBufferTime<< endl;
04105 }
04106 }
04107
04108
04109 void CStreamManager::SetAverageCACTime( double sample, RioStream *stream, int index )
04110 {
04111 double average;
04112
04113 average = m_AverageCACTime;
04114
04115 m_AverageCACTime = m_AverageCACTime + (( sample - m_AverageCACTime )/( m_NumberOfSamples + 1 ));
04116
04117 if( m_NumberOfSamples >= 1 )
04118 {
04119 m_VarianceCACTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceCACTime
04120 + (m_NumberOfSamples + 1) * pow((m_AverageCACTime - average), 2);
04121 }
04122
04123 average = m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ];
04124
04125 m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] = m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] +
04126 (( sample - m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] )/( m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1 ));
04127
04128 if( m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] >= 1 )
04129 {
04130 m_VarianceCACTimeAccClients[ m_NumberOfAdmittedClients+index ] = (1 - (float) 1/m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] ) * m_VarianceCACTimeAccClients[ m_NumberOfAdmittedClients+index ]
04131 + (m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1) * pow((m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] - average), 2);
04132 }
04133
04134 m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ]++;
04135 m_NumberOfSamples++;
04136
04137 UpdateNumberOfWaitingClients(-1);
04138 pthread_mutex_unlock( &MutexRunCAC );
04139
04140 if( m_CollectMeasures )
04141 {
04142 if( m_logSCACT.is_open() )
04143 m_logSCACT << stream->m_Id << "\t" << sample << endl;
04144 if( m_logECACT.is_open() )
04145 m_logECACT << stream->m_Id << "\t" << m_AverageCACTime
04146 <<"\t# " << m_VarianceCACTime<< endl;
04147 }
04148 }
04149
04150
04151 void CStreamManager::SetAverageAdmissionProcessTime( double sample, RioStream *stream, int index )
04152 {
04153 double average;
04154
04155 average = m_AverageAdmissionProcessTime;
04156
04157 m_AverageAdmissionProcessTime = m_AverageAdmissionProcessTime +
04158 (( sample - m_AverageAdmissionProcessTime )/( m_NumberOfSamples + 1 ));
04159
04160 if( m_NumberOfSamples >= 1 )
04161 {
04162 m_VarianceAdmissionProcessTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceAdmissionProcessTime
04163 + (m_NumberOfSamples + 1) * pow((m_AverageAdmissionProcessTime - average), 2);
04164 }
04165
04166 average = m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ];
04167
04168 m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] = m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]+
04169 (( sample - m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] )/( m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1 ));
04170
04171 if( m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] >= 1 )
04172 {
04173 m_VarianceAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] = (1 - (float) 1/m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] ) * m_VarianceAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]
04174 + (m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1) * pow((m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]- average), 2);
04175 }
04176
04177 m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]++;
04178 if( m_CollectMeasures )
04179 {
04180 if( m_logSAPT.is_open() )
04181 m_logSAPT << stream->m_Id << "\t"<< sample << endl;
04182 if( m_logEAPT.is_open() )
04183 m_logEAPT << stream->m_Id << "\t"<< m_AverageAdmissionProcessTime
04184 << "\t# "<< m_VarianceAdmissionProcessTime << endl;
04185 }
04186 }
04187
04188
04189 void CStreamManager::SetAverageSimulationTime( double sample, RioStream *stream )
04190 {
04191 double average;
04192
04193 average = m_AverageSimulationTime;
04194
04195 m_AverageSimulationTime = m_AverageSimulationTime + (( sample - m_AverageSimulationTime )/( m_NumberOfSamples + 1 ));
04196
04197 if( m_NumberOfSamples >= 1 )
04198 {
04199 m_VarianceSimulationTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceSimulationTime
04200 + (m_NumberOfSamples + 1) * pow((m_AverageSimulationTime - average), 2);
04201 }
04202
04203 average = m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ];
04204
04205 m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ] = m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ]+
04206 (( sample - m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ] )/( m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] + 1 ));
04207
04208 if( m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] >= 1 )
04209 {
04210 m_VarianceSimulationTimeAccClients[ m_NumberOfAdmittedClients ] = (1 - (float) 1/m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] ) * m_VarianceSimulationTimeAccClients[ m_NumberOfAdmittedClients ]
04211 + (m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] + 1) * pow((m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ]- average), 2);
04212 }
04213
04214 m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ]++;
04215
04216 if( m_CollectMeasures )
04217 {
04218 if( m_logSSIMULT.is_open() )
04219 m_logSSIMULT << stream->m_Id << "\t" << sample << endl;
04220 if( m_logESIMULT.is_open() )
04221 m_logESIMULT << stream->m_Id << "\t" << m_AverageSimulationTime
04222 << "\t# " << m_VarianceSimulationTime << endl;
04223 }
04224 }
04225
04226
04227 void CStreamManager::SetAverageWaitingTime( double sample, RioStream *stream, int index, int queue )
04228 {
04229 double average;
04230
04231 average = m_AverageWaitingTime;
04232
04233 m_AverageWaitingTime = m_AverageWaitingTime + (( sample - m_AverageWaitingTime )/( m_NumberOfSamples + 1 ));
04234
04235 if( m_NumberOfSamples >= 1 )
04236 {
04237 m_VarianceWaitingTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceWaitingTime
04238 + (m_NumberOfSamples + 1) * pow((m_AverageWaitingTime - average), 2);
04239 }
04240
04241 average = m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ];
04242
04243 m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]= m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]+
04244 (( sample - m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] )/( m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1 ));
04245
04246 if( m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] >= 1 )
04247 {
04248 m_VarianceWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] = (1 - (float) 1/m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] ) * m_VarianceWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]
04249 + (m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1) * pow((m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]- average), 2);
04250 }
04251
04252 m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]++;
04253
04254 average = m_AverageWaitingTimeAccQueue[ queue ];
04255
04256 m_AverageWaitingTimeAccQueue[ queue ]= m_AverageWaitingTimeAccQueue[ queue ]+
04257 (( sample - m_AverageWaitingTimeAccQueue[ queue ] )/( m_SamplesWaitingTimeAccQueue[ queue ] + 1 ));
04258
04259 if( m_SamplesWaitingTimeAccQueue[ queue ] >= 1 )
04260 {
04261 m_VarianceWaitingTimeAccQueue[ queue ] = (1 - (float) 1/m_SamplesWaitingTimeAccQueue[ queue ] ) * m_VarianceWaitingTimeAccQueue[ queue ]
04262 + (m_SamplesWaitingTimeAccQueue[ queue ] + 1) * pow((m_AverageWaitingTimeAccQueue[ queue ] - average), 2);
04263 }
04264
04265 m_SamplesWaitingTimeAccQueue[ queue ]++;
04266
04267 if( m_CollectMeasures )
04268 {
04269 if( m_logSWAITINGT.is_open() )
04270 m_logSWAITINGT << stream->m_Id << "\t" << sample << "\t# " << queue << endl;
04271 if( m_logEWAITINGT.is_open() )
04272 m_logEWAITINGT << stream->m_Id << "\t" << m_AverageWaitingTime
04273 << "\t# " << m_VarianceWaitingTime << endl;
04274 }
04275 }
04276
04277
04278 double CStreamManager::GetEstimatedDiskServiceTime( int disk )
04279 {
04280 return m_EstimatedDiskServiceTimeOfDisk[ disk ];
04281 }
04282
04283 double CStreamManager::GetEstimatedDiskResponseTime( int disk )
04284 {
04285 return m_EstimatedDiskResponseTimeOfDisk[ disk ];
04286 }
04287
04288 double CStreamManager::GetEstimatedDiskQueueTime( int disk )
04289 {
04290 return m_EstimatedDiskQueueTimeOfDisk[disk];
04291 }
04292
04293 unsigned int CStreamManager::GetBlockSize()
04294 {
04295 return m_BlockSize;
04296 }
04297
04298
04299 long double CStreamManager::GetNetworkRate()
04300 {
04301 return m_NetworkRate;
04302 }
04303
04304
04305
04306 int CStreamManager::GetDiskServiceTime()
04307 {
04308 int num = m_Router->GetMaxNumberOfDisks();
04309
04310 EventStorageRequest* Event;
04311
04312
04313
04314 for( int i = 1; i <= num; i++ )
04315 {
04316 Event = ( EventStorageRequest* ) EventManager.New(EventTypeStorageRequest);
04317 Event->StorageRequest.DiskServiceTimeInfo.Type = MSG_RSS_DISKSERVICETIMEINFO_REQ;
04318 Event->StorageRequest.DiskServiceTimeInfo.Size = SizeMsgRSSdiskServiceTimeInfoReq;
04319 Event->StorageRequest.DiskServiceTimeInfo.Token = RSS_TOKEN_ROUTER;
04320 Event->StorageRequest.DiskServiceTimeInfo.DiskId = i;
04321
04322 m_Router->GetDiskServiceTime( Event, i );
04323 }
04324 return 0;
04325 }
04326
04327
04328 void CStreamManager::UpdateDiskQueueAndServiceTime()
04329 {
04330 m_Router->UpdateDiskServiceTime( m_EstimatedDiskServiceTimeOfDisk,
04331 m_EstimatedDiskServiceTime );
04332 m_Router->UpdateDiskQueueTime( m_EstimatedDiskQueueTimeOfDisk,
04333 m_EstimatedDiskQueueTime );
04334 m_Router->UpdateDiskResponseTime( m_EstimatedDiskResponseTimeOfDisk,
04335 m_EstimatedDiskResponseTime,
04336 m_DeviationDiskResponseTimeOfDisk,
04337 m_DeviationDiskResponseTime );
04338 return;
04339 }
04340
04341
04342 double CStreamManager::GetDiskQueueTime( int disk, int queuesize )
04343 {
04344 int index;
04345 if( queuesize < (m_MaxPendingRequests + 1) )
04346 index = 0;
04347 else if( queuesize > (m_MaxPendingRequests + 300) )
04348 index = 300;
04349 else
04350 index = queuesize - m_MaxPendingRequests;
04351
04352 while(( m_EstimatedDiskQueueTime[ disk ][ index ] == 0 ) && ( index > 0))
04353 {
04354 index--;
04355 }
04356 return m_EstimatedDiskQueueTime[ disk ][ index ];
04357 }
04358
04359 double CStreamManager::GetDiskServiceTime( int disk, int index )
04360 {
04361 if( index > m_MaxPendingRequests )
04362 index = m_MaxPendingRequests;
04363
04364 while(( m_EstimatedDiskServiceTime[ disk ][ index ] == 0 ) && ( index > 0))
04365 {
04366 index--;
04367 }
04368 return m_EstimatedDiskServiceTime[ disk ][ index ];
04369 }
04370
04371 double CStreamManager::GetDiskResponseTime( int disk, int index )
04372 {
04373 if( index > m_MaxPendingRequests )
04374 index = m_MaxPendingRequests;
04375
04376 while(( m_EstimatedDiskResponseTime[ disk ][ index ] == 0 ) && ( index > 0))
04377 {
04378 index--;
04379 }
04380 return m_EstimatedDiskResponseTime[ disk ][ index ];
04381 }
04382
04383
04384
04385 int CStreamManager::SaveMeasures()
04386 {
04387 FILE *log, *logDSRT, *logDQT;
04388 char logname[100] = "";
04389
04390 if( m_CollectMeasures )
04391 {
04392 pthread_mutex_lock( &MutexUpdateMeasures );
04393
04394 GetDiskServiceTime();
04395 #ifdef RIO_DEBUG2
04396 if( m_log.is_open() ) m_log << "Updating disk service time and queue time info ... ";
04397 #endif
04398
04399 UpdateDiskQueueAndServiceTime();
04400
04401 #ifdef RIO_DEBUG2
04402 if( m_log.is_open() ) m_log << "OK." << endl;
04403 #endif
04404
04405
04406 if( (log = fopen( "AverageCACTimeaccClients.log", "w" )) == NULL )
04407 {
04408 pthread_mutex_unlock( &MutexUpdateMeasures );
04409 return -1;
04410 }
04411 fprintf( log, "# AverageCACTime %f msec (Variance %f)\n", m_AverageCACTime, m_VarianceCACTime );
04412 fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04413 fprintf( log, "# Measures in msec\n" );
04414 for( int j = 1; ( j < 1001 ) && ( m_AverageCACTimeAccClients[ j ] != 0); j++ )
04415 {
04416 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageCACTimeAccClients[ j ], m_VarianceCACTimeAccClients[ j ], m_SamplesCACTimeAccClients[ j ] );
04417 }
04418 fclose( log );
04419
04420
04421 if( (log = fopen( "AverageAdmissionProcessTimeaccClients.log", "w" )) == NULL )
04422 {
04423 pthread_mutex_unlock( &MutexUpdateMeasures );
04424 return -1;
04425 }
04426 fprintf( log, "# AverageAdmissionProcessTime %f msec (Variance %f)\n", m_AverageAdmissionProcessTime, m_VarianceAdmissionProcessTime );
04427 fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04428 fprintf( log, "# Measures in msec\n" );
04429 for( int j = 1; ( j < 1001 ) && ( m_AverageAdmissionProcessTimeAccClients[ j ] != 0); j++ )
04430 {
04431 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageAdmissionProcessTimeAccClients[ j ], m_VarianceAdmissionProcessTimeAccClients[ j ], m_SamplesAdmissionProcessTimeAccClients[ j ] );
04432 }
04433 fclose( log );
04434
04435
04436 if( (log = fopen( "AverageSortTimeaccClients.log", "w" )) == NULL )
04437 {
04438 pthread_mutex_unlock( &MutexUpdateMeasures );
04439 return -1;
04440 }
04441 fprintf( log, "# AverageSortTime %f msec (Variance %f)\n", m_AverageSortListTime, m_VarianceSortListTime);
04442 fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04443 fprintf( log, "# Measures in msec\n" );
04444 for( int j = 1; j < 1001; j++ )
04445 {
04446 if( m_AverageSortListTimeAccClients[ j ] != 0 )
04447 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageSortListTimeAccClients[ j ], m_VarianceSortListTimeAccClients[ j ], m_SamplesSortListTimeAccClients[ j ] );
04448 }
04449 fclose( log );
04450
04451
04452 if( (log = fopen( "AverageSimulationTimeaccClients.log", "w" )) == NULL )
04453 {
04454 pthread_mutex_unlock( &MutexUpdateMeasures );
04455 return -1;
04456 }
04457 fprintf( log, "# AverageSimulationTime %f msec (Variance %f)\n", m_AverageSimulationTime, m_VarianceSimulationTime );
04458 fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04459 fprintf( log, "# Measures in msec\n" );
04460 for( int j = 1; j < 1001; j++ )
04461 {
04462 if( m_AverageSimulationTimeAccClients[ j ] != 0 )
04463 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageSimulationTimeAccClients[ j ], m_VarianceSimulationTimeAccClients[ j ], m_SamplesSimulationTimeAccClients[ j ] );
04464 }
04465 fclose( log );
04466
04467
04468 if( (log = fopen( "AverageWaitingTimeaccClients.log", "w" )) == NULL )
04469 {
04470 pthread_mutex_unlock( &MutexUpdateMeasures );
04471 return -1;
04472 }
04473 fprintf( log, "# AverageWaitingTime %f msec (Variance %f)\n", m_AverageWaitingTime, m_VarianceWaitingTime );
04474 fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04475 fprintf( log, "# Measures in msec\n" );
04476 for( int j = 1; j < 1001; j++ )
04477 {
04478 if( m_AverageWaitingTimeAccClients[ j ] != 0 )
04479 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageWaitingTimeAccClients[ j ], m_VarianceWaitingTimeAccClients[ j ], m_SamplesWaitingTimeAccClients[ j ] );
04480 }
04481 fclose( log );
04482
04483
04484 if( (log = fopen( "AverageWaitingTimeaccQueue.log", "w" )) == NULL )
04485 {
04486 pthread_mutex_unlock( &MutexUpdateMeasures );
04487 return -1;
04488 }
04489 fprintf( log, "# Measures in msec\n" );
04490 for( int j = 0; j < 1001; j++ )
04491 {
04492 if( m_AverageWaitingTimeAccQueue[ j ] != 0 )
04493 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageWaitingTimeAccQueue[ j ], m_VarianceWaitingTimeAccQueue[ j ], m_SamplesWaitingTimeAccQueue[ j ] );
04494 }
04495 fclose( log );
04496
04497 if( m_log.is_open() ) m_log << "CStreamManager::SaveMeasures: number of disks "<< m_MaxNumberOfDisks << endl;
04498
04499 for( int i = 1; i < m_MaxNumberOfDisks; i++ )
04500 {
04501 if( m_EstimatedDiskServiceTimeOfDisk[ i ] != -1 )
04502 {
04503 sprintf( logname, "EstimatedDisk_%d_ServiceAndResponseTime.log", i );
04504 if( (logDSRT = fopen( logname, "w" )) == NULL )
04505 {
04506 pthread_mutex_unlock( &MutexUpdateMeasures );
04507 return -1;
04508 }
04509 sprintf( logname, "EstimatedDisk_%d_QueueTime.log", i );
04510 if( (logDQT = fopen( logname, "w" )) == NULL )
04511 {
04512 pthread_mutex_unlock( &MutexUpdateMeasures );
04513 return -1;
04514 }
04515 fprintf( logDSRT, "# EstimatedDiskServiceTime %f msec\n",
04516 m_EstimatedDiskServiceTimeOfDisk[ i ]);
04517
04518 fprintf( logDQT, "# EstimatedDiskQueueTime %f msec\n",
04519 m_EstimatedDiskQueueTimeOfDisk[ i ]);
04520
04521 fprintf( logDSRT, "# Measures in msec \n#AccThreads\tServiceTime\t\t\tResponseAccThreads\n");
04522
04523 fprintf( logDQT, "# Measures in msec \n#AccQueueSize\tTime\n");
04524 fprintf( logDQT, "%d\t\t%f\n", m_MaxPendingRequests, m_EstimatedDiskQueueTime[ i ][ 0 ] );
04525 for( int j = 1; j < 301; j++ )
04526 {
04527
04528 fprintf( logDSRT, "%d\t\t%f\t\t%f\t\t%f\n", j, m_EstimatedDiskServiceTime[ i ][ j ], m_EstimatedDiskResponseTime[ i ][ j ], m_DeviationDiskResponseTime[ i ][ j ]);
04529 fprintf( logDQT, "%d\t\t%f\n", j + m_MaxPendingRequests, m_EstimatedDiskQueueTime[ i ][ j ]);
04530 }
04531 fclose(logDSRT);
04532 fclose(logDQT);
04533 }
04534 }
04535
04536 pthread_mutex_unlock( &MutexUpdateMeasures );
04537 return S_OK;
04538 }
04539 else
04540 {
04541 return -1;
04542 }
04543 }
04544
04545
04546
04547
04548
04549
04550
04551
04552
04553 int CStreamManager::GetNetMgrIPAddr()
04554 {
04555 return( m_NetMgr->getipaddr() );
04556 }
04557
04558
04559
04560
04561 int CStreamManager::GetNetInterfaceIPAddr()
04562 {
04563 if( m_NetInterface != NULL )
04564 return( m_NetInterface->getipaddr() );
04565 else return( GetNetMgrIPAddr() );
04566 }
04567
04568
04569
04570
04571
04572
04573
04574
04575 NetMgr *CStreamManager::getNetMgr( void )
04576 {
04577 return m_NetMgr;
04578 }
04579
04580
04581
04582 CNetInterface *CStreamManager::getNetInterface( void )
04583 {
04584 return m_NetInterface;
04585 }
04586
04587
04588
04589
04590
04591
04592
04593
04594
04595 RioStreamObj::RioStreamObj( CStreamManager *mgr, RioStream *stream )
04596 {
04597 o_mgr = mgr;
04598 o_stream = stream;
04599
04600
04601 o_object = NULL;
04602 m_Status = StreamObjStatusOpened;
04603 m_PendingRequests = 0;
04604 }
04605
04606
04607 RioStreamObj::~RioStreamObj()
04608 {
04609 CStreamManager* mgr = o_mgr;
04610 mgr->m_MutexTable->Wait();
04611 CompleteClose();
04612 mgr->m_MutexTable->Release();
04613 }
04614
04615
04616 int RioStreamObj::GetSize( RioObjectSize *ObjectSize )
04617 {
04618 return o_object->GetSize( ObjectSize );
04619 }
04620
04621 int RioStreamObj::SetSize( RioObjectSize ObjectSize, char *md5sum,
04622 unsigned long long int ExcludeStorages )
04623 {
04624 return o_object->SetSize( ObjectSize, md5sum, ExcludeStorages );
04625 }
04626
04627 int RioStreamObj::GetType( short *Type )
04628 {
04629 return o_object->GetType( Type );
04630 }
04631
04632 int RioStreamObj::GetnBlocks( RioBlock *nBlocks )
04633 {
04634 return o_object->GetnBlocks( nBlocks );
04635 }
04636
04637 int RioStreamObj::DataRequest( u32 reqid, u16 operation, u32 ipaddr, u16 port,
04638 u32 block, u32 repbits )
04639 {
04640 int rc;
04641
04642 #ifdef RIO_DEBUG2
04643
04644 o_stream->m_log << "SERVER: StreamManager - DataRequest antes do mutex "
04645 << endl;
04646 #endif
04647 o_mgr->m_MutexTable->Wait();
04648 #ifdef RIO_DEBUG2
04649
04650 o_stream->m_log << "SERVER: StreamManager - DataRequest depois do mutex "
04651 << endl;
04652 #endif
04653 rc = o_stream->DataReq( reqid, operation, ipaddr, port, block,
04654 repbits, this );
04655
04656
04657 if( rc == S_OK )
04658 m_PendingRequests++;
04659 else
04660 {
04661 o_stream->m_log << "[StreamManager] Error: Failed executing DataReq." << endl;
04662 }
04663
04664 o_mgr->m_MutexTable->Release();
04665 #ifdef RIO_DEBUG2
04666
04667 o_stream->m_log << "SERVER: StreamManager - DataRequest liberando mutex "
04668 << endl;
04669 #endif
04670 return rc;
04671 }
04672
04673
04674
04675 void RioStreamObj::RequestCompleted( Event* Request, bool *RemoveStreamObj )
04676 {
04677
04678
04679
04680 *RemoveStreamObj = false;
04681
04682 CStreamManager* mgr = o_mgr;
04683
04684 mgr->m_MutexTable->Wait();
04685 m_PendingRequests--;
04686 o_stream->RequestCompleted( Request );
04687
04688
04689 if( m_Status==StreamObjStatusClosed )
04690 {
04691 if( m_PendingRequests == 0 )
04692 {
04693 CompleteClose();
04694
04695
04696
04697
04698 *RemoveStreamObj = true;
04699 }
04700 }
04701 mgr->m_MutexTable->Release();
04702 }
04703
04704
04705 int RioStreamObj::Close( bool *RemoveStreamObj )
04706 {
04707 CStreamManager* mgr = o_mgr;
04708
04709 mgr->m_MutexTable->Wait();
04710 m_Status = StreamObjStatusClosed;
04711
04712
04713
04714 *RemoveStreamObj = false;
04715
04716
04717 if( m_PendingRequests == 0 )
04718 {
04719 CompleteClose();
04720
04721
04722
04723 *RemoveStreamObj = true;
04724 }
04725
04726 mgr->m_MutexTable->Release();
04727
04728 return 0;
04729 }
04730
04731
04732 void RioStreamObj::CompleteClose()
04733 {
04734 if( o_object != 0 )
04735 {
04736 o_object->Close();
04737 delete o_object;
04738 o_object = 0;
04739 }
04740
04741
04742
04743 }
04744
04745
04746 RioStream* RioStreamObj::Stream()
04747 {
04748 return o_stream;
04749 }
04750
04751
04752
04753 int RioStreamObj::GetObjectName(char *ObjectName)
04754 {
04755
04756
04757
04758 return o_object->GetObjectName( ObjectName );
04759 }
04760
04761 int RioStreamObj::GetQueueSize()
04762 {
04763 return o_stream->getQueueSize();
04764 }
04765
04766
04767
04768
04769
04770
04771
04772
04773 int RioStreamObj::GetVideoRate( unsigned int *VideoRate )
04774 {
04775 return o_object->GetVideoRate( VideoRate );
04776 }
04777
04778
04779
04780
04781 int RioStreamObj::SetVideoRate( unsigned int VideoRate )
04782 {
04783 return o_object->SetVideoRate( VideoRate );
04784 }
04785
04786
04787
04788
04789
04790
04791
04792 int RioStreamObj::ReallocBlocks( unsigned int Block,
04793 unsigned long long int ExcludeStorages )
04794 {
04795 return o_object->ReallocBlocks( Block, ExcludeStorages );
04796 }
04797
04798
04799
04800 double getInterval( struct timeval initial, struct timeval final )
04801 {
04802 double msec;
04803 struct timeval interval;
04804 interval.tv_sec = final.tv_sec - initial.tv_sec;
04805 interval.tv_usec = final.tv_usec - initial.tv_usec;
04806 if( interval.tv_usec < 0 )
04807 {
04808 interval.tv_sec -= 1;
04809 interval.tv_usec += 1000000;
04810 }
04811 msec = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0;
04812 return msec;
04813 }
04814
04815 */