CStreamManager Class Reference

#include <StreamManager.h>

Public Member Functions

 CStreamManager (CRioTCP *TCPconnection)
 ~CStreamManager ()
HRESULT Close (const RioStreamId Stream)
HRESULT OpenObject (const RioStreamId Stream, const char *ObjectName, const RioAccess Access, ObjectHandle *Handle)
HRESULT MaxRequests (RioStreamId Stream)
HRESULT OpenObject (const RioStreamId Stream, const char *ObjectName, const RioAccess Access, struct timeval RTT_average, int BufferSize, ObjectHandle *Handle, RioObjectSize *Size, int *m_FlagRequests, int *m_PID)
HRESULT CanStart (const RioStreamId Stream)
 CStreamManager ()
 ~CStreamManager ()
int Initialize (StreamManagerConfig *Config)
int Open (RioStreamTraffic *Traffic, RioStream **Stream, SOCKADDR_IN RemoteAddress)
void PostITEvent (MonitorEvent *event)
int GetDiskServiceTime ()
double GetEstimatedDiskServiceTime (int disk)
double GetEstimatedDiskResponseTime (int disk)
double GetEstimatedDiskQueueTime (int disk)
double GetDiskServiceTime (int disk, int index)
double GetDiskResponseTime (int disk, int index)
double GetDiskQueueTime (int disk, int queuesize)
void UpdateDiskQueueAndServiceTime ()
int SaveMeasures ()
PSTREAMLIST InsertNewClient (PSEQBLOCKLIST NewClient, RioStream *stream)
void RemoveClient (PSTREAMLIST ClientPosition)
PSTREAMLIST CanAdmit (PSEQBLOCKLIST NewClient, RioStream *stream, double RTT, int clientbuffer)
PSTREAMLIST GetAdmittedClientsList ()
int GetNumberOfBuffersForEachClient ()
int insertevent (PSEQEVENTLIST eventblock, EventTypeSimulation eventtype, int block, long double time_when)
int GetBurstSizeOfEachClient ()
unsigned int GetBlockSize ()
double GetAverageCACTime ()
void SetAverageBufferTime (double sample, RioStream *stream)
void SetAverageCACTime (double sample, RioStream *stream, int index)
void SetAverageAdmissionProcessTime (double sample, RioStream *stream, int index)
void SetAverageGenReqListTime (double sample, RioStream *stream)
void SetAverageSortListTime (double sample, RioStream *stream)
void SetAverageSimulationTime (double sample, RioStream *stream)
void SetAverageWaitingTime (double sample, RioStream *stream, int index, int queue)
long double GetNetworkRate ()
int UpdateNumberOfWaitingClients (int client)
void generateTraceOfAllStreams (double time)
int GetNetMgrIPAddr ()
int GetNetInterfaceIPAddr ()
NetMgrgetNetMgr (void)
 Nova funcao para obter o objeto da classe NetMgr associado a classe CStreamManager.
CNetInterfacegetNetInterface (void)
 Nova funcao para obter o objeto da classe NetInterface associado a classe CStreamManager.
void SetNumberOfDisks (unsigned int NumberOfDisks)
 Funcao para alterar o valor do numero de discos do servidor.

Data Fields

char * m_FileRoot
bool m_UseServerSideBuffers
bool m_UseNewCAC
bool m_CollectMeasures

Private Member Functions

PSTREAMLIST firstRequestOfAllStreams (PSTREAMLIST stream_list, long double *now)
PSEQEVENTLIST sortAllStreamsGeneratingOneStream (PSTREAMLIST stream_list, uint newclientid)

Static Private Member Functions

static void Timeout (void *Param)

Private Attributes

CRioTCPm_TCPconnection
unsigned int m_BlockSize
int m_MaxStreams
int m_used
double m_UsedRate
double m_TotalRate
double m_NRTReservedRate
int m_MaxActiveRequests
int m_MaxRTRequests
int m_MaxNRTRequests
int m_ExtraStreamRequests
int m_HoldingNonRealTime
int m_ActiveRequests
int m_nDisks
int m_TimerId
CStreamList m_RTHoldList
CStreamList m_NRTHoldList
CStreamList m_CloseList
CStreamList m_FreeList
RioStreamm_VectorStream
RioStreamm_NextNRTStream
CMutexm_MutexTable
CObjectManagerm_ObjectManager
CSystemManagerm_SystemManager
CRouterm_Router
NetMgrm_NetMgr
bool m_initialized
ofstream m_log
int m_MaxPendingRequests
int m_MaxNumberOfDisks
int m_NumberOfBuffersForEachClient
int m_BurstSizeOfEachClient
double m_MaxIntervalEmpty
int m_NumberOfEmptyTimes
pthread_mutex_t MutexModifyClientsList
pthread_mutex_t MutexRunCAC
pthread_mutex_t MutexUpdateMeasures
pthread_mutex_t MutexUpdateNumberOfWaitingClients
ofstream m_logSCACT
ofstream m_logECACT
ofstream m_logSAPT
ofstream m_logEAPT
ofstream m_logSSIMULT
ofstream m_logESIMULT
ofstream m_logSWAITINGT
ofstream m_logEWAITINGT
ofstream m_logSGENREQLT
ofstream m_logEGENREQLT
ofstream m_logSBUFFERT
ofstream m_logEBUFFERT
ofstream m_logSSORTLT
ofstream m_logESORTLT
uint m_lastID
double m_EstimatedDiskServiceTime [100][301]
double m_EstimatedDiskServiceTimeOfDisk [100]
double m_EstimatedDiskResponseTime [100][301]
double m_EstimatedDiskResponseTimeOfDisk [100]
double m_DeviationDiskResponseTime [100][301]
double m_DeviationDiskResponseTimeOfDisk [100]
double m_EstimatedDiskQueueTime [100][301]
double m_EstimatedDiskQueueTimeOfDisk [100]
double m_AverageCACTime
double m_VarianceCACTime
long long int m_NumberOfSamples
double m_AverageAdmissionProcessTime
double m_VarianceAdmissionProcessTime
double m_AverageSimulationTime
double m_VarianceSimulationTime
double m_AverageBufferTime
double m_VarianceBufferTime
double m_AverageGenReqListTime
double m_VarianceGenReqListTime
double m_AverageSortListTime
double m_VarianceSortListTime
double m_AverageWaitingTime
double m_VarianceWaitingTime
double m_AverageCACTimeAccClients [1001]
double m_VarianceCACTimeAccClients [1001]
int m_SamplesCACTimeAccClients [1001]
double m_AverageAdmissionProcessTimeAccClients [1001]
double m_VarianceAdmissionProcessTimeAccClients [1001]
int m_SamplesAdmissionProcessTimeAccClients [1001]
double m_AverageSimulationTimeAccClients [1001]
double m_VarianceSimulationTimeAccClients [1001]
int m_SamplesSimulationTimeAccClients [1001]
double m_AverageSortListTimeAccClients [1001]
double m_VarianceSortListTimeAccClients [1001]
int m_SamplesSortListTimeAccClients [1001]
double m_AverageWaitingTimeAccClients [1001]
double m_VarianceWaitingTimeAccClients [1001]
int m_SamplesWaitingTimeAccClients [1001]
double m_AverageWaitingTimeAccQueue [1001]
double m_VarianceWaitingTimeAccQueue [1001]
int m_SamplesWaitingTimeAccQueue [1001]
double m_AverageBufferTimeAccClients [1001]
double m_VarianceBufferTimeAccClients [1001]
int m_SamplesBufferTimeAccClients [1001]
double m_EstimatedTimeParameter
double m_NetworkRate
PSTREAMLIST m_AdmittedClientsList
int m_NumberOfAdmittedClients
int m_NumberOfWaitingClients
int ClientBuffer [1001]
unsigned int PlayBlock [1001]
unsigned int TotalBlocks [1001]
unsigned int NextBlockToSend [1001]
int ServerBuffer [1001]
unsigned int NReqWhenEmptyBuffer [1001]
unsigned int NHiccups [1001]
long double Initial_time [1001]
double ClientRTT [1001]
long double TimeBufferEmpty [1001]
long double MaxIntervalBufferEmpty [1001]
char * m_LogsDirectory
CNetInterfacem_NetInterface

Friends

class RioStream
class RioStreamObj

Detailed Description

Definition at line 31 of file interface/StreamManager.h.


Constructor & Destructor Documentation

CStreamManager::CStreamManager ( CRioTCP TCPconnection  ) 

Definition at line 43 of file interface/StreamManager.cpp.

00044 {
00045   m_TCPconnection = TCPconnection;
00046 }

CStreamManager::~CStreamManager (  ) 

Definition at line 48 of file interface/StreamManager.cpp.

00049 {
00050 }

CStreamManager::CStreamManager (  ) 

Definition at line 132 of file server/StreamManager.cpp.

00133 {
00134     m_BlockSize           = 0;
00135     m_MaxStreams          = 0;
00136     m_used                = 0;
00137     m_UsedRate            = 0.0;
00138     m_TotalRate           = 0.0;
00139     m_NRTReservedRate     = 0.0;
00140     m_MaxActiveRequests   = 0;
00141     m_MaxRTRequests       = 0;
00142     m_MaxNRTRequests      = 0;
00143     m_ExtraStreamRequests = 0;
00144     m_HoldingNonRealTime  = false;
00145     m_ActiveRequests      = 0;
00146     m_nDisks              = 0;
00147     m_TimerId             = 0;
00148     // m_RTHoldList  does not need to be initialized
00149     // m_NRTHoldList does not need to be initialized
00150     // m_CloseList   does not need to be initialized
00151     // m_FreeList    does not need to be initialized
00152     m_VectorStream        = NULL;
00153     m_NextNRTStream       = NULL;
00154     m_MutexTable          = new CMutex;
00155     m_ObjectManager       = NULL;
00156     m_SystemManager       = NULL;
00157     m_Router              = NULL;
00158     m_NetMgr              = NULL;
00159     m_initialized         = false;
00160     // m_log does not need to be initialized
00161 
00162     m_FileRoot                      = NULL;
00163     m_UseServerSideBuffers          = false;
00164     m_UseNewCAC                     = false;
00165     m_CollectMeasures               = false;
00166     m_MaxPendingRequests            = 0;
00167     m_MaxNumberOfDisks              = 0;
00168     m_NumberOfBuffersForEachClient  = 0;
00169     m_BurstSizeOfEachClient         = 0;
00170     m_MaxIntervalEmpty              = 0.0;
00171     m_NumberOfEmptyTimes            = 0;
00172     pthread_mutex_init( &MutexModifyClientsList           , NULL );
00173     pthread_mutex_init( &MutexRunCAC                      , NULL );
00174     pthread_mutex_init( &MutexUpdateMeasures              , NULL );
00175     pthread_mutex_init( &MutexUpdateNumberOfWaitingClients, NULL );
00176     // m_logSCACT     does not need to be initialized
00177     // m_logECACT     does not need to be initialized
00178     // m_logSAPT      does not need to be initialized
00179     // m_logEAPT      does not need to be initialized
00180     // m_logSSIMULT   does not need to be initialized
00181     // m_logESIMULT   does not need to be initialized
00182     // m_logSWAITINGT does not need to be initialized
00183     // m_logEWAITINGT does not need to be initialized
00184     // m_logSGENREQLT does not need to be initialized
00185     // m_logEGENREQLT does not need to be initialized
00186     // m_logSBUFFERT  does not need to be initialized
00187     // m_logEBUFFERT  does not need to be initialized
00188     // m_logSSORTLT   does not need to be initialized
00189     // m_logESORTLT   does not need to be initialized
00190 
00191     m_lastID                        = 0;
00192 
00193     memset( m_EstimatedDiskServiceTime, 0,
00194             sizeof( m_EstimatedDiskServiceTime ) );
00195     memset( m_EstimatedDiskServiceTimeOfDisk, 0,
00196             sizeof( m_EstimatedDiskServiceTimeOfDisk ) );
00197     memset( m_EstimatedDiskResponseTime, 0,
00198             sizeof( m_EstimatedDiskResponseTime ) );
00199     memset( m_EstimatedDiskResponseTimeOfDisk, 0,
00200             sizeof( m_EstimatedDiskResponseTimeOfDisk ) );
00201     memset( m_DeviationDiskResponseTime, 0,
00202             sizeof( m_DeviationDiskResponseTime ) );
00203     memset( m_DeviationDiskResponseTimeOfDisk, 0,
00204             sizeof( m_DeviationDiskResponseTimeOfDisk ) );
00205     memset( m_EstimatedDiskQueueTime, 0,
00206             sizeof( m_EstimatedDiskQueueTime ) );
00207     memset( m_EstimatedDiskQueueTimeOfDisk, 0,
00208             sizeof( m_EstimatedDiskQueueTimeOfDisk ) );
00209 
00210     m_AverageCACTime                = 0.0;
00211     m_VarianceCACTime               = 0.0;
00212     m_NumberOfSamples               = 0;
00213     m_AverageAdmissionProcessTime   = 0.0;
00214     m_VarianceAdmissionProcessTime  = 0.0;
00215     m_AverageSimulationTime         = 0.0;
00216     m_VarianceSimulationTime        = 0.0;
00217     m_AverageBufferTime             = 0.0;
00218     m_VarianceBufferTime            = 0.0;
00219     m_AverageGenReqListTime         = 0.0;
00220     m_VarianceGenReqListTime        = 0.0;
00221     m_AverageSortListTime           = 0.0;
00222     m_VarianceSortListTime          = 0.0;
00223     m_AverageWaitingTime            = 0.0;
00224     m_VarianceWaitingTime           = 0.0;
00225 
00226     memset( m_AverageCACTimeAccClients, 0,
00227             sizeof( m_AverageCACTimeAccClients ));
00228     memset( m_VarianceCACTimeAccClients, 0,
00229             sizeof( m_VarianceCACTimeAccClients ));
00230     memset( m_SamplesCACTimeAccClients, 0,
00231             sizeof( m_SamplesCACTimeAccClients ));
00232 
00233     memset( m_AverageAdmissionProcessTimeAccClients, 0,
00234             sizeof( m_AverageAdmissionProcessTimeAccClients ));
00235     memset( m_VarianceAdmissionProcessTimeAccClients, 0,
00236             sizeof( m_VarianceAdmissionProcessTimeAccClients ));
00237     memset( m_SamplesAdmissionProcessTimeAccClients, 0,
00238             sizeof( m_SamplesAdmissionProcessTimeAccClients ));
00239 
00240     memset( m_AverageSimulationTimeAccClients, 0,
00241             sizeof( m_AverageSimulationTimeAccClients ));
00242     memset( m_VarianceSimulationTimeAccClients, 0,
00243             sizeof( m_VarianceSimulationTimeAccClients ));
00244     memset( m_SamplesSimulationTimeAccClients, 0,
00245             sizeof( m_SamplesSimulationTimeAccClients ));
00246 
00247     memset( m_AverageSortListTimeAccClients, 0,
00248             sizeof( m_AverageSortListTimeAccClients ));
00249     memset( m_VarianceSortListTimeAccClients, 0,
00250             sizeof( m_VarianceSortListTimeAccClients ));
00251     memset( m_SamplesSortListTimeAccClients, 0,
00252             sizeof( m_SamplesSortListTimeAccClients ));
00253 
00254     memset( m_AverageWaitingTimeAccClients, 0,
00255             sizeof( m_AverageWaitingTimeAccClients ));
00256     memset( m_VarianceWaitingTimeAccClients, 0,
00257             sizeof( m_VarianceWaitingTimeAccClients ));
00258     memset( m_SamplesWaitingTimeAccClients, 0,
00259             sizeof( m_SamplesWaitingTimeAccClients ));
00260 
00261     memset( m_AverageWaitingTimeAccQueue, 0,
00262             sizeof( m_AverageWaitingTimeAccQueue ));
00263     memset( m_VarianceWaitingTimeAccQueue, 0,
00264             sizeof( m_VarianceWaitingTimeAccQueue ));
00265     memset( m_SamplesWaitingTimeAccQueue, 0,
00266             sizeof( m_SamplesWaitingTimeAccQueue ));
00267 
00268     memset( m_AverageBufferTimeAccClients, 0,
00269             sizeof( m_AverageBufferTimeAccClients ));
00270     memset( m_VarianceBufferTimeAccClients, 0,
00271             sizeof( m_VarianceBufferTimeAccClients ));
00272     memset( m_SamplesBufferTimeAccClients, 0,
00273             sizeof( m_SamplesBufferTimeAccClients ));
00274 
00275     m_EstimatedTimeParameter        = 0.0;
00276     m_NetworkRate                   = 0.0;
00277     m_AdmittedClientsList           = NULL;
00278     m_NumberOfAdmittedClients       = 0;
00279     m_NumberOfWaitingClients        = 0;
00280 
00281     memset( ClientBuffer,           0, sizeof( ClientBuffer )           );
00282     memset( PlayBlock,              0, sizeof( PlayBlock )              );
00283     memset( TotalBlocks,            0, sizeof( TotalBlocks )            );
00284     memset( NextBlockToSend,        0, sizeof( NextBlockToSend )        );
00285     memset( ServerBuffer,           0, sizeof( ServerBuffer )           );
00286     memset( NReqWhenEmptyBuffer,    0, sizeof( NReqWhenEmptyBuffer )    );
00287     memset( NHiccups,               0, sizeof( NHiccups )               );
00288     memset( Initial_time,           0, sizeof( Initial_time )           );
00289     memset( ClientRTT,              0, sizeof( ClientRTT )              );
00290     memset( TimeBufferEmpty,        0, sizeof( TimeBufferEmpty )        );
00291     memset( MaxIntervalBufferEmpty, 0, sizeof( MaxIntervalBufferEmpty ) );
00292 
00293     // Inicializacao da nova variavel com o nome do diretorio onde os logs
00294     // serao salvos.
00295     m_LogsDirectory = NULL;
00296     
00297     // Inicializacao da nova variavel com um ponteiro para o objeto da nova
00298     // classe de gerneciamento de rede.
00299     m_NetInterface = NULL;
00300 }

CStreamManager::~CStreamManager (  ) 

Member Function Documentation

PSTREAMLIST CStreamManager::CanAdmit ( PSEQBLOCKLIST  NewClient,
RioStream stream,
double  RTT,
int  clientbuffer 
)

Definition at line 3205 of file server/StreamManager.cpp.

03208 {
03209     struct timeval initial_time, final_time;
03210     long double timetosimul, when;
03211 
03212     //pthread_mutex_lock( &MutexRunCAC ); //it is before getInfoAboutReqList
03213 
03214     #ifdef RIO_DEBUG2
03215     if( m_log.is_open() )
03216     {
03217         m_log<<"---------------------------------------------------------------- "<<endl;
03218         m_log<<"                     CanAdmit Client "            << stream->m_Id <<endl;
03219         m_log<<"---------------------------------------------------------------- "<<endl;
03220     }
03221     #endif
03222 
03223     initial_time = Timer.CurrentTime();
03224 
03225     PSTREAMLIST PositionAtClientsList = NULL;
03226 
03227     #ifdef RIO_DEBUG2
03228     if( m_log.is_open() )
03229     {
03230         m_log<<"OpenObject: VBR Traffic: Running Control Admission Algorithm ..."<<endl;
03231         m_log<< "CanAdmit: Can Admit  ? " << endl;
03232     }
03233     #endif
03234 
03235     if( m_NumberOfAdmittedClients > 0 )
03236     {
03237         // get number of disks
03238         int           disks = m_MaxNumberOfDisks;
03239         PSEQEVENTLIST EventList;
03240         //size of disk queue - requests waiting
03241         unsigned int *disk_queue = (unsigned int*) malloc( disks * sizeof(unsigned int) );
03242         unsigned int *max_disk_queue = (unsigned int*) malloc( disks * sizeof(unsigned int) );
03243         //time to service next request
03244         long double   *disk_time = (long double*) malloc( disks * sizeof(long double) );
03245         double        timeToTransmit_msec=( m_BlockSize/m_NetworkRate )*1000.0;
03246         long double   last_tx = 0;
03247         double        interval;
03248         int      NumberOfEvents = 0;
03249         bool          CanNotAdmit = false;
03250         int           eventToDisk, eventToClient, blockToClient, blockToDisk;
03251 
03252         //To each client
03253         memset( ClientBuffer,   -1, sizeof( ClientBuffer ));
03254         memset( ServerBuffer,    0, sizeof( ServerBuffer ));
03255         memset( PlayBlock,       0, sizeof( PlayBlock ));
03256         memset( TotalBlocks,     0, sizeof( TotalBlocks ));
03257         memset( NextBlockToSend, 0, sizeof( NextBlockToSend ));
03258         memset( NHiccups,        0, sizeof( NHiccups ));
03259         memset( NReqWhenEmptyBuffer,    0, sizeof( NReqWhenEmptyBuffer ));
03260         memset( Initial_time,    0, sizeof( Initial_time ));
03261         memset( TimeBufferEmpty, 0, sizeof( TimeBufferEmpty ));
03262         memset( MaxIntervalBufferEmpty, 0, sizeof( MaxIntervalBufferEmpty ));
03263         memset( disk_queue,      0, disks*sizeof(unsigned int) );
03264         memset( max_disk_queue,  0, disks*sizeof(unsigned int) );
03265         memset( disk_time,       0, disks*sizeof(long double) );
03266 
03267         PositionAtClientsList = InsertNewClient( NewClient, stream );
03268 
03269         #ifdef RIO_DEBUG2
03270         if( m_log.is_open() ) m_log << "CanAdmit: sorting all streams ... ";
03271         #endif
03272 
03273         pthread_mutex_lock( &MutexModifyClientsList );
03274         EventList = sortAllStreamsGeneratingOneStream( m_AdmittedClientsList,
03275                                                        stream->m_Id );
03276         pthread_mutex_unlock( &MutexModifyClientsList );
03277 
03278         initial_time = Timer.CurrentTime();
03279         blockToClient = 0;
03280         blockToDisk = 0;
03281 
03282         #ifdef RIO_DEBUG2
03283         if( m_log.is_open() ) m_log << "CanAdmit: starting simulation. " << endl;
03284         #endif
03285 
03286         // process each event
03287         while(( EventList != NULL )  && ( CanNotAdmit == false ))
03288         {
03289             eventToDisk = eventToClient = -1;
03290             NumberOfEvents++;
03291 
03292             if( NumberOfEvents % 50000 == 0 )
03293                 if( m_log.is_open() ) m_log << "CanAdmit: NumberOfEvents -- "<< NumberOfEvents
03294                       << " triggers"<< endl;
03295             switch ( EventList->event )
03296             {
03297                 case RequestArrival:
03298                 case RequestFullBuffer:
03299                 case RequestFullClientBuffer:
03300                         if( ( EventList->event ==  RequestArrival ) ||
03301                            ( EventList->event ==  RequestFullClientBuffer ))
03302                         {
03303                             if( ServerBuffer [ EventList->id ] > 0 )
03304                             {
03305                                 //send block if possible
03306                                 eventToClient = BlockArrival;
03307                                 #ifdef RIO_DEBUG2
03308                                 blockToClient = NextBlockToSend[ EventList->id ] ;
03309                                 NextBlockToSend[ EventList->id ]++;
03310                                 #endif
03311                             }
03312                             //decrement server buffer
03313                             ServerBuffer [ EventList->id ]--;
03314                         }
03315 
03316                         if( EventList->event ==  RequestArrival )
03317                         {
03318                             #ifdef RIO_DEBUG2
03319                             //increment the play block
03320                             PlayBlock[ EventList->id ]++;
03321                             #endif
03322                             //decrement client buffer and
03323                             // verifies if there was a hiccup
03324                             ClientBuffer[ EventList->id ]--;
03325                             if( ClientBuffer[ EventList->id ] < 0 )
03326                             {
03327                                 NReqWhenEmptyBuffer[ EventList->id ]++;
03328                             }
03329                             if( ClientBuffer[ EventList->id ] == -1 )
03330                             {
03331                                 NHiccups[ EventList->id ]++;
03332                                 Initial_time[ EventList->id ] = EventList->csystime;
03333 
03334                                 if( ( NHiccups[ EventList->id ] * 100.0 / TotalBlocks[EventList->id] ) >
03335                                   ( m_NumberOfEmptyTimes * 1.0 ))
03336                                 {
03337                                     CanNotAdmit = true;
03338                                     break;
03339                                 }
03340                             }
03341                         }
03342                         //prefetch next block
03343                         eventToDisk = DiskService;
03344                         blockToDisk = EventList->block;
03345                         break;
03346 
03347                 case BlockArrival:
03348                         //increment client buffer
03349                         ClientBuffer [ EventList->id ]++;
03350                         //verify if buffer was empty
03351                         if( ClientBuffer [ EventList->id ] == 0 )
03352                         {
03353                             interval = EventList->csystime - Initial_time[ EventList->id ];
03354 
03355                             TimeBufferEmpty[ EventList->id ] += interval;
03356                             //get maximum interval the buffer was empty
03357                             if( MaxIntervalBufferEmpty[ EventList->id ] < interval )
03358                             {
03359                                 MaxIntervalBufferEmpty[ EventList->id ] = interval;
03360                             }
03361 
03362                             if( MaxIntervalBufferEmpty[ EventList->id ] > m_MaxIntervalEmpty )
03363                             {
03364                                 CanNotAdmit = true;
03365                                 break;
03366                             }
03367                         }
03368                         break;
03369 
03370                 case DiskService:
03371                         //send block if necessary
03372                         if( ServerBuffer [ EventList->id ] < 0 )
03373                         {
03374                             eventToClient = BlockArrival;
03375                             blockToClient = NextBlockToSend[ EventList->id ] ;
03376                             NextBlockToSend[ EventList->id ]++;
03377                         }
03378                         ServerBuffer [ EventList->id ]++;
03379                         disk_queue[ EventList->disk ]--;
03380                         break;
03381 
03382                 default:
03383                         RioErr << "Invalid event at simulation. "<<  endl;
03384             }
03385             if( CanNotAdmit == true )
03386                 break;
03387 
03388             if( eventToClient != -1 )
03389             {
03390                 if( EventList->csystime < last_tx )
03391                     last_tx += timeToTransmit_msec;
03392                 else
03393                     last_tx = EventList->csystime + timeToTransmit_msec;
03394                 when = last_tx + ClientRTT[ EventList->id ];
03395 
03396                 if( insertevent( EventList, (EventTypeSimulation) eventToClient,
03397                                  blockToClient, when ) == -1 )
03398                 {
03399                     RioErr <<"processevent: Could not insert event." << endl;
03400 
03401                     free( disk_queue );
03402                     free( max_disk_queue );
03403                     free( disk_time );
03404 
03405                     return NULL;
03406                 }
03407             }
03408             if( eventToDisk != -1 )
03409             {
03410                 if( disk_time[ EventList->disk ] > EventList->csystime )
03411                     when = disk_time[ EventList->disk ];
03412                 else
03413                     when = EventList->csystime;
03414                 when += GetDiskServiceTime( EventList->disk,
03415                                             disk_queue[ EventList->disk ] );
03416                 disk_queue[ EventList->disk ]++;
03417 
03418                 disk_time[ EventList->disk ] = when;
03419                 if( disk_queue[ EventList->disk ] >
03420                     max_disk_queue[ EventList->disk ]
03421                   )
03422                 {
03423                     max_disk_queue[ EventList->disk ] =
03424                                             disk_queue[ EventList->disk ];
03425                 }
03426                 if( insertevent( EventList, (EventTypeSimulation) eventToDisk,
03427                                  blockToDisk, when ) == -1 )
03428                 {
03429                     RioErr <<"processevent: Could not insert event." << endl;
03430 
03431                     free( disk_queue );
03432                     free( max_disk_queue );
03433                     free( disk_time );
03434 
03435                     return NULL;
03436                 }
03437             }
03438 
03439             if( EventList->next != NULL )
03440             {
03441                 EventList = EventList->next;
03442                 if( EventList->prev != NULL )
03443                 {
03444                     free( EventList->prev );
03445                     EventList->prev = NULL;
03446                 }
03447             }
03448             else
03449             {
03450                 free( EventList );
03451                 EventList = NULL;
03452             }
03453         }
03454         if( m_log.is_open() ) m_log << "CanAdmit: Simulation finished. Number of events "
03455               << NumberOfEvents << endl;
03456         //Time to admission
03457         final_time = Timer.CurrentTime();
03458         timetosimul = getInterval( initial_time, final_time );
03459         stream->m_SampleSimulationTime = timetosimul;
03460         SetAverageSimulationTime( timetosimul, stream );
03461         if( m_log.is_open() ) m_log << "CanAdmit: Time (only cac) = " << timetosimul << " msec" << endl;
03462 
03463         if( CanNotAdmit == true )
03464         {
03465             if( m_log.is_open() ) m_log << "Could not admit new client: Client " << EventList->id
03466                   << " TotalBlocks " << TotalBlocks[EventList->id]
03467                   << " Hiccups " << NHiccups[ EventList->id ]
03468                   << "(" << NHiccups[ EventList->id ]*100.0/TotalBlocks[EventList->id]
03469                   <<" %)"
03470                   << " QoS parameter " << m_NumberOfEmptyTimes * 1.0
03471                   << " % TimeBufferEmpty " << TimeBufferEmpty[ EventList->id ]
03472                   << " MaxIntervalBufferEmpty " << MaxIntervalBufferEmpty[ EventList->id ]
03473                   << " NReqWhenEmptyBuffer " << NReqWhenEmptyBuffer[ EventList->id ]
03474                   << endl;
03475 
03476             #ifdef RIO_DEBUG2
03477             if( m_log.is_open() ) m_log << "DiskQueue:  " ;
03478             for(int i = 1; i < disks; i++ )
03479             {
03480                 if( max_disk_queue[ i ] != 0 )
03481                 {
03482                     if( m_log.is_open() ) m_log << " D_" << i << "=" << max_disk_queue[ i ];
03483                 }
03484             }
03485             if( m_log.is_open() ) m_log << endl;
03486             #endif
03487 
03488             while( EventList != NULL )
03489             {
03490                 if( EventList->next != NULL )
03491                 {
03492                     EventList = EventList->next;
03493                     if( EventList->prev != NULL )
03494                     {
03495                         free( EventList->prev );
03496                         EventList->prev = NULL;
03497                     }
03498                 }
03499                 else
03500                 {
03501                     free( EventList );
03502                     EventList = NULL;
03503                 }
03504             }
03505             RemoveClient( PositionAtClientsList );
03506             //pthread_mutex_unlock( &MutexRunCAC );
03507 
03508             free( disk_queue );
03509             free( max_disk_queue );
03510             free( disk_time );
03511 
03512             return NULL;
03513         }
03514 
03515         #ifdef RIO_DEBUG2
03516         if( m_log.is_open() ) m_log << "DiskQueue:  " ;
03517         for(int i = 1; i < disks; i++ )
03518         {
03519             if( max_disk_queue[ i ] != 0 )
03520             {
03521                 if( m_log.is_open() ) m_log << " D_" << i << "=" << max_disk_queue[ i ];
03522             }
03523         }
03524         if( m_log.is_open() ) m_log << endl;
03525 
03526         //check if the client buffer is empty at some time
03527         for( uint i = 0; i < 1001; i++ )
03528         {
03529             if( ClientBuffer[ i ] != -1 )
03530             {
03531                 if( m_log.is_open() ) m_log << " Client " << i
03532                       << " TotalBlocks " << TotalBlocks[i]
03533                       << " Hiccups " << NHiccups[ i ]
03534                       << "(" << NHiccups[ i ]*100.0/TotalBlocks[ i ] <<" %)"
03535                       << " QoS parameter % " << m_NumberOfEmptyTimes * 1.0
03536                       << " TimeBufferEmpty " << TimeBufferEmpty[ i ]
03537                       << " MaxIntervalBufferEmpty " << MaxIntervalBufferEmpty[ i ]
03538                       << " NReqWhenEmptyBuffer " << NReqWhenEmptyBuffer[ i ]
03539                       << endl;
03540             }
03541         }
03542         #endif
03543     }
03544     else
03545     {
03546         PositionAtClientsList = InsertNewClient( NewClient, stream );
03547     }
03548 
03549     if( PositionAtClientsList != NULL ) {
03550         PositionAtClientsList->nextBlockToBeFetched = NewClient;
03551     }
03552     if( m_log.is_open() ) m_log << "CanAdmit:  Number of admitted clients " << m_NumberOfAdmittedClients << endl ;
03553 
03554     //pthread_mutex_unlock( &MutexRunCAC );
03555     return PositionAtClientsList;
03556 }

HRESULT CStreamManager::CanStart ( const RioStreamId  Stream  ) 

Definition at line 412 of file interface/StreamManager.cpp.

00413 {
00414     char          *Parameter = (char*) malloc( MaxTCPDataSize*sizeof(char) );
00415     char          *Result;
00416     unsigned int ParameterSize;
00417     unsigned int ResultSize;
00418     HRESULT       status;
00419 
00420     if( Parameter == NULL )
00421     {
00422         RioErr << "malloc error CanStart:" << strerror(errno) << endl;
00423         free( Parameter );
00424 
00425         return ERROR_RIOPROXY + ERROR_MEMORY;
00426     }
00427 
00428     memset( Parameter, 0, MaxTCPDataSize*sizeof( char ) );
00429 
00430     ParameterSize = 2 * MAX_LONG_STRING_SIZE;
00431 
00432     // Check if Parameter buffer is large enough
00433     if(ParameterSize > (unsigned int) MaxTCPDataSize)
00434     {
00435         free( Parameter );
00436 
00437         return ERROR_RIOPROXY + ERROR_CALL_MESSAGE_OVERFLOW;
00438     }
00439 
00440     int offset = 0;
00441 
00442     // Set parameters on asii buffer
00443     // Set Stream Id
00444     SetLong(Parameter,Stream.Version,&offset);
00445     SetLong(Parameter,Stream.Index,&offset);
00446 
00447     // Update Parameter size to actual size
00448     // (previously was maximum possible size)
00449     ParameterSize = offset;
00450 
00451     // Set Maximum result size (Size of buffer)
00452     //ResultSize = MaxTCPDataSize;
00453 
00454     // Call remote method
00455     status = m_TCPconnection->Call(RioClassStreamManager,
00456                                    RioMethodStreamManagerClientCanStart,
00457                                    ParameterSize,
00458                                    Parameter,
00459                                    &ResultSize,
00460                                    &Result);
00461 
00462     // check if call failed
00463     if(FAILED (status))
00464     {
00465         if( ( unsigned int ) status == ( ERROR_RIOTCP + ERROR_MEMORY ) )
00466             status = ERROR_RIOPROXY + ERROR_MEMORY;
00467         free( Parameter );
00468         if( Result != NULL )
00469             free( Result );
00470 
00471         return status;
00472     }
00473 
00474     offset = 0;
00475     // Get results from result buffer
00476     // Get method return value
00477     if(!GetLong(Result,ResultSize,&status,&offset))
00478     {
00479         free( Parameter );
00480         free( Result );
00481 
00482         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00483     }
00484 
00485     free( Parameter );
00486     free( Result );
00487 
00488     return status;
00489 }

HRESULT CStreamManager::Close ( const RioStreamId  Stream  ) 

Definition at line 53 of file interface/StreamManager.cpp.

00054 {
00055     char          *Parameter = (char*) malloc( MaxTCPDataSize*sizeof(char) );
00056     char          *Result;
00057     unsigned int ParameterSize;
00058     unsigned int ResultSize;
00059     HRESULT       status;
00060 
00061     if( Parameter == NULL )
00062     {
00063         RioErr << "malloc error Close:" << strerror(errno) << endl;
00064         free( Parameter );
00065 
00066         return ERROR_RIOPROXY + ERROR_MEMORY;
00067     }
00068 
00069     memset( Parameter, 0, MaxTCPDataSize*sizeof( char ) );
00070 
00071     ParameterSize = 2*MAX_LONG_STRING_SIZE;
00072     // Check if Parameter buffer is large enough
00073     if(ParameterSize > (unsigned int) MaxTCPDataSize)
00074     {
00075         free( Parameter );
00076 
00077         return ERROR_RIOPROXY + ERROR_CALL_MESSAGE_OVERFLOW;
00078     }
00079 
00080     int offset = 0;
00081 
00082     // Set parameters on asii buffer
00083     // Set Stream Id
00084     SetLong(Parameter,Stream.Version,&offset);
00085     SetLong(Parameter,Stream.Index,&offset);
00086 
00087     // Update Parameter size to actual size
00088     // (previously was maximum possible size)
00089     ParameterSize = offset;
00090 
00091     // Call remote method
00092     status = m_TCPconnection->Call(RioClassStreamManager,
00093                                    RioMethodStreamManagerClose,
00094                                    ParameterSize,
00095                                    Parameter,
00096                                    &ResultSize,
00097                                    &Result);
00098 
00099     // check if call failed
00100     if( FAILED( status ) )
00101     {
00102         if( ( unsigned int ) status == ( ERROR_RIOTCP + ERROR_MEMORY ) )
00103             status = ERROR_RIOPROXY + ERROR_MEMORY;
00104         free( Parameter );
00105         if( Result != NULL )
00106             free( Result );
00107 
00108         return status;
00109     }
00110 
00111     offset = 0;
00112     // Get results from result buffer
00113     // Get method return value
00114     if(!GetLong(Result,ResultSize,&status,&offset))
00115     {
00116         free( Parameter );
00117         free( Result );
00118 
00119         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00120     }
00121 
00122     free( Parameter );
00123     free( Result );
00124 
00125     return status;
00126 }

PSTREAMLIST CStreamManager::firstRequestOfAllStreams ( PSTREAMLIST  stream_list,
long double *  now 
) [private]

Definition at line 3948 of file server/StreamManager.cpp.

03951 {
03952     PSTREAMLIST auxStream, minStream;
03953     double min = MAXDOUBLE;
03954 
03955     minStream = NULL;
03956     auxStream = stream_reqlist;
03957     if( auxStream )
03958     {
03959         //min = auxStream->aux->csystime - now[ auxStream->stream->GetId() ];
03960         min = auxStream->aux->csystime - now[ auxStream->aux->id ];
03961         minStream = auxStream;
03962         auxStream = auxStream->auxnext;
03963 
03964         while( auxStream )
03965         {
03966             //if( min >= ( auxStream->aux->csystime - now[ auxStream->stream->GetId()]) )
03967             if( min >= ( auxStream->aux->csystime - now[ auxStream->aux->id] ) )
03968             {
03969                 //min = auxStream->aux->csystime - now[ auxStream->stream->GetId()];
03970                 min = auxStream->aux->csystime - now[ auxStream->aux->id] ;
03971                 minStream = auxStream;
03972             }
03973             auxStream = auxStream->auxnext;
03974         }
03975     }
03976     return minStream;
03977 }

void CStreamManager::generateTraceOfAllStreams ( double  time  ) 
PSTREAMLIST CStreamManager::GetAdmittedClientsList (  ) 

Definition at line 3981 of file server/StreamManager.cpp.

03982 {
03983     return m_AdmittedClientsList;
03984 }

double CStreamManager::GetAverageCACTime (  ) 

Definition at line 4000 of file server/StreamManager.cpp.

04001 {
04002     return m_AverageCACTime;
04003 }

unsigned int CStreamManager::GetBlockSize (  ) 

Definition at line 4293 of file server/StreamManager.cpp.

04294 {
04295     return m_BlockSize;
04296 }

int CStreamManager::GetBurstSizeOfEachClient (  ) 

Definition at line 3995 of file server/StreamManager.cpp.

03996 {
03997     return m_BurstSizeOfEachClient;
03998 }

double CStreamManager::GetDiskQueueTime ( int  disk,
int  queuesize 
)

Definition at line 4342 of file server/StreamManager.cpp.

04343 {
04344     int index;
04345     if( queuesize < (m_MaxPendingRequests + 1) )
04346         index = 0;
04347     else if( queuesize > (m_MaxPendingRequests + 300) )
04348         index = 300;
04349     else
04350         index = queuesize - m_MaxPendingRequests;
04351 
04352     while(( m_EstimatedDiskQueueTime[ disk ][ index ] == 0 ) && ( index > 0))
04353     {
04354         index--;
04355     }
04356     return m_EstimatedDiskQueueTime[ disk ][ index ];
04357 }

double CStreamManager::GetDiskResponseTime ( int  disk,
int  index 
)

Definition at line 4371 of file server/StreamManager.cpp.

04372 {
04373     if( index > m_MaxPendingRequests )
04374         index = m_MaxPendingRequests;
04375 
04376     while(( m_EstimatedDiskResponseTime[ disk ][ index ] == 0 ) && ( index > 0))
04377     {
04378         index--;
04379     }
04380     return m_EstimatedDiskResponseTime[ disk ][ index ];
04381 }

double CStreamManager::GetDiskServiceTime ( int  disk,
int  index 
)

Definition at line 4359 of file server/StreamManager.cpp.

04360 {
04361     if( index > m_MaxPendingRequests )
04362         index = m_MaxPendingRequests;
04363 
04364     while(( m_EstimatedDiskServiceTime[ disk ][ index ] == 0 ) && ( index > 0))
04365     {
04366         index--;
04367     }
04368     return m_EstimatedDiskServiceTime[ disk ][ index ];
04369 }

int CStreamManager::GetDiskServiceTime (  ) 

Definition at line 4306 of file server/StreamManager.cpp.

04307 {
04308     int num = m_Router->GetMaxNumberOfDisks();
04309 
04310     EventStorageRequest* Event;
04311 
04312     // send disk service time info request for each disk
04313     // disk 0 is not used
04314     for( int i = 1; i <= num; i++ )
04315     {
04316         Event = ( EventStorageRequest* ) EventManager.New(EventTypeStorageRequest);
04317         Event->StorageRequest.DiskServiceTimeInfo.Type   = MSG_RSS_DISKSERVICETIMEINFO_REQ;
04318         Event->StorageRequest.DiskServiceTimeInfo.Size   = SizeMsgRSSdiskServiceTimeInfoReq;
04319         Event->StorageRequest.DiskServiceTimeInfo.Token  = RSS_TOKEN_ROUTER;
04320         Event->StorageRequest.DiskServiceTimeInfo.DiskId = i;
04321 
04322         m_Router->GetDiskServiceTime( Event, i );
04323     }
04324     return 0;
04325 }

double CStreamManager::GetEstimatedDiskQueueTime ( int  disk  ) 

Definition at line 4288 of file server/StreamManager.cpp.

04289 {
04290     return m_EstimatedDiskQueueTimeOfDisk[disk];
04291 }

double CStreamManager::GetEstimatedDiskResponseTime ( int  disk  ) 

Definition at line 4283 of file server/StreamManager.cpp.

04284 {
04285     return m_EstimatedDiskResponseTimeOfDisk[ disk ];
04286 }

double CStreamManager::GetEstimatedDiskServiceTime ( int  disk  ) 

Definition at line 4278 of file server/StreamManager.cpp.

04279 {
04280     return m_EstimatedDiskServiceTimeOfDisk[ disk ];
04281 }

CNetInterface * CStreamManager::getNetInterface ( void   ) 

Nova funcao para obter o objeto da classe NetInterface associado a classe CStreamManager.

Returns:
ponteiro para o objeto da classe NetInterface.

Definition at line 4582 of file server/StreamManager.cpp.

04583 {
04584     return m_NetInterface;
04585 }

int CStreamManager::GetNetInterfaceIPAddr (  ) 

Definition at line 4561 of file server/StreamManager.cpp.

04562 {
04563     if( m_NetInterface != NULL )
04564         return( m_NetInterface->getipaddr() );
04565     else return( GetNetMgrIPAddr() );    
04566 }

NetMgr * CStreamManager::getNetMgr ( void   ) 

Nova funcao para obter o objeto da classe NetMgr associado a classe CStreamManager.

Returns:
ponteiro para o objeto da classe NetMgr.

Definition at line 4575 of file server/StreamManager.cpp.

04576 {
04577     return m_NetMgr;
04578 }

int CStreamManager::GetNetMgrIPAddr (  ) 

Definition at line 4553 of file server/StreamManager.cpp.

04554 {
04555     return( m_NetMgr->getipaddr() );
04556 }

long double CStreamManager::GetNetworkRate (  ) 

Definition at line 4299 of file server/StreamManager.cpp.

04300 {
04301     return m_NetworkRate;
04302 }

int CStreamManager::GetNumberOfBuffersForEachClient (  ) 

Definition at line 3987 of file server/StreamManager.cpp.

03988 {
03989     return m_NumberOfBuffersForEachClient;
03990 }

int CStreamManager::Initialize ( StreamManagerConfig Config  ) 

Definition at line 342 of file server/StreamManager.cpp.

00343 {
00344     // Variavel usada para compor os nomes dos arquivos com os logs.
00345     char LogFileName[ MaxPathSize ];
00346     // Inicializa a variavel com o diretorio onde os logs devem ser salvos.
00347     m_LogsDirectory = Config->LogsDirectory;
00348 
00349     #ifdef RIO_DEBUG1
00350     RioErr << "### [CStreamManager - Initialize] Start" << endl;
00351     #endif
00352 
00353     if( Config->GenerateLogs )
00354     {
00355         // Compoe o nome do arquivo com o log.
00356         strcpy( LogFileName, m_LogsDirectory );
00357         strcat( LogFileName, LOGFILE );
00358         m_log.open( LogFileName );
00359         // Check if log file was opened successfully
00360         if( !m_log.is_open() )
00361         {
00362             return ERROR_STREAMMANAGER + ERROR_LOGFILE;
00363         }
00364     }
00365 
00366     // Check if not initialized yet
00367     if( m_initialized )
00368     {
00369         RioErr << "Initialize(): Tried to initialize component already"
00370                  " initialized" << endl;
00371         return ERROR_STREAMMANAGER + ERROR_INITIALIZED;
00372     }
00373 
00374     // check if    mutex to control table access was successfully created
00375     if( !m_MutexTable->IsOpen() )
00376     {
00377         RioErr << "Initialize(): Failed to create mutex" << endl;
00378         RioErr << "    SYSTEM ERROR: " << strerror(errno) << endl;
00379         return ERROR_STREAMMANAGER + ERROR_CREATE_MUTEX;
00380     }
00381 
00382     #ifdef RIO_DEBUG2
00383     RioErr << "### [CStreamManager - Initialize] Antes de new m_VectorStream" 
00384            << endl;
00385     #endif
00386 
00387     // create vector with stream entries
00388     m_VectorStream = new RioStream [Config->MaxStreams];
00389     if( m_VectorStream == 0 )
00390     {
00391         RioErr  << "StreamManager.Start: Failed to alocate memory for Streams"
00392                 << endl;
00393         return ERROR_STREAMMANAGER + ERROR_MEMORY;
00394 
00395     }
00396 
00397     // Insert all streams on free list
00398     for( int i = 0; i < Config->MaxStreams; i++ )
00399     {
00400         m_VectorStream[i].Initialize( this );
00401         m_FreeList.Put( &m_VectorStream[i] );
00402     }
00403 
00404     #ifdef RIO_DEBUG2
00405     RioErr << "### [CStreamManager - Initialize] Depois de new m_VectorStream" 
00406            << endl;
00407     #endif
00408 
00409     //O parâmetro true abaixo (serverInstance) serve apenas pra indicar que esta
00410     //instância da NetMgr foi criada pelo servidor. A NetMgr precisa saber isto
00411     //para tomar atitudes distintas dependendo de ter sido instanciada pelo
00412     //servidor ou pelo cliente.
00413     m_NetMgr = new NetMgr( true );
00414     if( !m_NetMgr )
00415     {
00416         delete[] m_VectorStream;
00417         m_VectorStream = 0;
00418         RioErr << "StreamManager.Start: new NetMgr() failed" << endl;
00419         return ERROR_STREAMMANAGER + ERROR_MEMORY;
00420     }
00421     
00422     // Tenta criar um novo objeto da classe CNetInterface.
00423     // Obs: 
00424     try
00425     {
00426         m_NetInterface = new CNetInterface;
00427     }
00428     catch( bad_alloc& ba )
00429     {
00430         delete[] m_NetMgr;
00431         m_NetMgr = NULL;
00432         RioErr << "StreamManager.Start: new CNetInterface failed with error " 
00433                << ba.what() << endl;
00434         return ERROR_STREAMMANAGER + ERROR_MEMORY;
00435     }
00436 
00437     // A porta foi fixada para RIOSERVERUDPPORT, pois precisamos de uma porta
00438     // fixa para a implementacao que da suporte aos clientes atras de NAT.
00439     #ifdef RIO_DEBUG_FILE
00440     char tmpName[ MaxPathSize ];
00441     char tmpDomain[ MaxPathSize ];
00442     // Compoe o nome do arquivo com o log gerado pela RioNeti.
00443     // Inicializa o nome com o diretorio com os logs.
00444     strcpy( LogFileName, m_LogsDirectory );
00445     // Obtem o nome da maquina em que o servidor de despacho esta executando.
00446     gethostname( tmpName,  MaxPathSize - 1 ); 
00447     // Obtem o dominio da maquina em que o servidor de despacho esta executando.
00448     getdomainname( tmpDomain,  MaxPathSize - 1 );
00449     // Adiciona o nome que identifica o log da RioNeti com o log do servidor de
00450     // despacho.
00451     strcat( LogFileName, "RIOServerEmul_" );
00452     strcat( LogFileName, tmpName );
00453     // Verifica se o dominio esta no nome da maquina e o adiciona ao log caso 
00454     // nao esteja.
00455     if( strstr( tmpName, tmpDomain ) == NULL ) 
00456     {
00457         strcat( LogFileName, "." );
00458         strcat( LogFileName, tmpDomain );
00459     }
00460     strcat( LogFileName, ".log" );
00461     
00462     int rc = m_NetMgr->Start( htons( RIOSERVERUDPPORT ), Config->BlockSize, 
00463                               FRAGMENTSIZE, LogFileName );
00464 
00465     // Inicializa a nova classe de gerenciamento de rede.
00466     int rc2 = m_NetInterface->Start( htons( RIOSERVERUDPPORT ), 
00467                                      Config->ClientsTimeOut, 
00468                                      NETTCPTIMEOUTSECONDS, 
00469                                      NETTCPTIMEOUTSECONDS, NULL, false, 
00470                                      LogFileName );
00471                                                                
00472     #else
00473     
00474     int rc = m_NetMgr->Start( htons( RIOSERVERUDPPORT ), Config->BlockSize, 
00475                               FRAGMENTSIZE );
00476     
00477     // Inicializa a nova classe de gerenciamento de rede.
00478     int rc2 = m_NetInterface->Start( htons( RIOSERVERUDPPORT ), 
00479                                      Config->ClientsTimeOut, 
00480                                      NETTCPTIMEOUTSECONDS, 
00481                                      NETTCPTIMEOUTSECONDS );                          
00482     
00483     #endif
00484                               
00485     if( rc )
00486     {
00487         RioErr << "StreamManager.Start: NetMgr.Start failed rc "
00488                << (int *) rc << endl;
00489         return rc;
00490     }
00491 
00492     if( rc2 )
00493     {
00494         RioErr << "StreamManager.Start: NetInterface.Start failed rc2 "
00495                << (int *) rc2 << endl;
00496         return rc2;
00497     }
00498 
00499     m_TimerId = Timer.GetTimer( Timeout );
00500     if( m_TimerId < 0 )
00501     {
00502         RioErr << "Initialize(): Failed to Get timer" << endl;
00503         return m_TimerId;
00504     }
00505 
00506     // use existing and already initialized ObjectManager!
00507     m_ObjectManager = Config->ObjectManager;
00508     m_SystemManager = Config->SystemManager;
00509 
00510     // used existing Router
00511     m_Router = Config->Router;
00512     m_used = 0;
00513     m_BlockSize = Config->BlockSize;
00514     m_MaxStreams = Config->MaxStreams;
00515     m_TotalRate = Config->TotalRate;
00516     m_NRTReservedRate = Config->NRTReservedRate;
00517 
00518     m_FileRoot = new char[ strlen( Config->FileRoot ) + 1 ];
00519     strcpy( m_FileRoot, Config->FileRoot );
00520     m_MaxPendingRequests            = Config->MaxPending;
00521     m_NumberOfBuffersForEachClient  = Config->NumberOfBuffersForEachClient;
00522     m_BurstSizeOfEachClient         = Config->BurstSizeOfEachClient;
00523     m_EstimatedTimeParameter        = Config->EstimatedTimeParameter;
00524     m_NetworkRate                   = Config->NetworkRate; //in Bps
00525     m_UseServerSideBuffers          = Config->UseServerSideBuffers;
00526     m_UseNewCAC                     = Config->UseNewCAC;
00527     m_CollectMeasures               = Config->CollectMeasures;
00528     m_NumberOfEmptyTimes            = Config->NumberOfEmptyTimes;
00529     m_MaxIntervalEmpty              = Config->MaxIntervalEmpty;
00530     m_MaxNumberOfDisks              = m_Router->GetMaxNumberOfDisks();
00531 
00532     if( m_CollectMeasures )
00533     {
00534         // open files to save measures
00535         // Compoe o nome do arquivo com os outros logs.
00536         strcpy( LogFileName, m_LogsDirectory );
00537         strcat( LogFileName, LOGSCACT );
00538         m_logSCACT.open( LogFileName );
00539         strcpy( LogFileName, m_LogsDirectory );
00540         strcat( LogFileName, LOGECACT );
00541         m_logECACT.open( LogFileName );
00542         strcpy( LogFileName, m_LogsDirectory );
00543         strcat( LogFileName, LOGSAPT );
00544         m_logSAPT.open( LogFileName );
00545         strcpy( LogFileName, m_LogsDirectory );
00546         strcat( LogFileName, LOGEAPT );
00547         m_logEAPT.open( LogFileName );
00548         strcpy( LogFileName, m_LogsDirectory );
00549         strcat( LogFileName, LOGSSIMULT );
00550         m_logSSIMULT.open( LogFileName );
00551         strcpy( LogFileName, m_LogsDirectory );
00552         strcat( LogFileName, LOGESIMULT );
00553         m_logESIMULT.open( LogFileName );
00554         strcpy( LogFileName, m_LogsDirectory );
00555         strcat( LogFileName, LOGSGENREQLT );
00556         m_logSGENREQLT.open( LogFileName );
00557         strcpy( LogFileName, m_LogsDirectory );
00558         strcat( LogFileName, LOGEGENREQLT );
00559         m_logEGENREQLT.open( LogFileName );
00560         strcpy( LogFileName, m_LogsDirectory );
00561         strcat( LogFileName, LOGSBUFFERT );
00562         m_logSBUFFERT.open( LogFileName );
00563         strcpy( LogFileName, m_LogsDirectory );
00564         strcat( LogFileName, LOGEBUFFERT );
00565         m_logEBUFFERT.open( LogFileName );
00566         strcpy( LogFileName, m_LogsDirectory );
00567         strcat( LogFileName, LOGSSORTLT );
00568         m_logSSORTLT.open( LogFileName );
00569         strcpy( LogFileName, m_LogsDirectory );
00570         strcat( LogFileName, LOGESORTLT );
00571         m_logESORTLT.open( LogFileName );
00572         strcpy( LogFileName, m_LogsDirectory );
00573         strcat( LogFileName, LOGSWAITINGT );
00574         m_logSWAITINGT.open( LogFileName );
00575         strcpy( LogFileName, m_LogsDirectory );
00576         strcat( LogFileName, LOGEWAITINGT );
00577         m_logEWAITINGT.open( LogFileName );
00578 
00579         m_logSCACT     << "# Measures in msec." << endl;
00580         m_logECACT     << "# Measures in msec." << endl;
00581         m_logSSIMULT   << "# Measures in msec." << endl;
00582         m_logESIMULT   << "# Measures in msec." << endl;
00583         m_logSGENREQLT << "# Measures in msec." << endl;
00584         m_logEGENREQLT << "# Measures in msec." << endl;
00585         m_logSBUFFERT  << "# Measures in msec." << endl;
00586         m_logEBUFFERT  << "# Measures in msec." << endl;
00587         m_logSSORTLT   << "# Measures in msec." << endl;
00588         m_logESORTLT   << "# Measures in msec." << endl;
00589         m_logSWAITINGT << "# Measures in msec." << endl;
00590         m_logEWAITINGT << "# Measures in msec." << endl;
00591     }
00592     // ------------------------------------------------------------------------
00593 
00594     m_UsedRate = 0.0;
00595     m_HoldingNonRealTime = false;
00596     m_ActiveRequests = 0;
00597     m_NextNRTStream = 0;
00598     m_nDisks = Config->nDisks;
00599 
00600     m_ExtraStreamRequests = Config->ExtraStreamRequests;
00601 
00602     // Compute Maximum number of active requests (per disk):
00603     //   requests queued at router disk queue +
00604     //   some (2) being transmitted/received to/from clients
00605     m_MaxActiveRequests = (Config->MaxDiskQueueSize + 2);
00606 
00607     // Active requests for all disks (multiply by number of disks)
00608     m_MaxActiveRequests *= Config->nDisks;
00609 
00610     // Total number of Non real-time requests needs to be larger to include
00611     // requests queued at StreamManager
00612     //   m_MaxNRTrequests = 2*m_MaxActiveRequests;
00613 
00614     // Total number of requests needs to be larger to include requests
00615     // requests queued at StreamManager
00616     // Keep real-time and non real-time requests in independent pools
00617     // such that non real-time streams can not consume all
00618     // real time requests
00619 
00620     m_MaxRTRequests += m_MaxActiveRequests * 3 +
00621                       ( m_ExtraStreamRequests * 2 * m_MaxStreams );
00622 
00623     m_MaxNRTRequests += m_MaxActiveRequests * 3 +
00624                        ( m_ExtraStreamRequests * 2 * m_MaxStreams );
00625 
00626     #ifdef RIO_DEBUG2
00627     if( m_log.is_open() )
00628     {
00629         m_log <<"m_MaxRTRequests = m_MaxNRTRequests = " << m_MaxRTRequests
00630               << endl <<"m_MaxActiveRequests = " << m_MaxActiveRequests << endl
00631               << "m_NumberOfEmptyTimes = " << m_NumberOfEmptyTimes * 1.0 << endl
00632               <<"m_MaxIntervalEmpty = " << m_MaxIntervalEmpty << endl;
00633     }
00634     #endif
00635     // ------------------------------------------------------------------------
00636 
00637     #ifdef RIO_DEBUG2
00638     RioErr << "### [CStreamManager - Initialize] Antes de criar os eventos " 
00639            << "EventTypeRTDataRequest - numero de eventos " << m_MaxRTRequests 
00640            << endl;
00641     #endif
00642 
00643     EventManager.Initialize( EventTypeRTDataRequest, m_MaxRTRequests );
00644 
00645     #ifdef RIO_DEBUG2
00646     RioErr << "### [CStreamManager - Initialize] Antes de criar os eventos " 
00647            << "EventTypeNRTDataRequest - numero de eventos " << m_MaxNRTRequests 
00648            << endl;
00649     #endif
00650 
00651     EventManager.Initialize( EventTypeNRTDataRequest, m_MaxNRTRequests );
00652 
00653     #ifdef RIO_DEBUG2
00654     RioErr << "### [CStreamManager - Initialize] Depois de criar os eventos" 
00655            << endl;
00656     #endif
00657 
00658     m_initialized = true;
00659 
00660     // Start Timer
00661     struct timeval interval;
00662     interval.tv_sec = 0;
00663     interval.tv_usec = STREAM_MANAGER_TIMER_INTERVAL;
00664     // ------------------------------------------------------------------------
00665 
00666     Timer.StartTimer( m_TimerId, interval, TimerTypePeriodic, this );
00667 
00668     #ifdef RIO_DEBUG1
00669     RioErr << "### [CStreamManager - Initialize] End" << endl;
00670     #endif
00671 
00672     return S_OK;
00673 }

int CStreamManager::insertevent ( PSEQEVENTLIST  eventblock,
EventTypeSimulation  eventtype,
int  block,
long double  time_when 
)

Definition at line 3149 of file server/StreamManager.cpp.

03152 {
03153 
03154     PSEQEVENTLIST auxblock = eventblock;
03155     PSEQEVENTLIST newevent;
03156 
03157     while(( auxblock->csystime <= time_when ) && ( auxblock->next != NULL ))
03158     {
03159         auxblock = auxblock->next;
03160     }
03161 
03162     newevent = ( PSEQEVENTLIST )malloc( sizeof( SEQEVENTLIST ) );
03163     if( newevent != NULL )
03164     {
03165         newevent->event     = eventtype;
03166         newevent->id        = eventblock->id;
03167         newevent->block     = block;
03168         newevent->disk      = eventblock->disk;
03169         newevent->csystime  = time_when;
03170 
03171         if( auxblock->csystime < time_when )
03172         {
03173             auxblock->next = newevent;
03174             newevent->prev = auxblock;
03175             newevent->next = NULL;
03176         }
03177         else
03178         {
03179             auxblock->prev->next = newevent;
03180             newevent->prev = auxblock->prev;
03181             auxblock->prev = newevent;
03182             newevent->next = auxblock;
03183         }
03184     }
03185     else
03186     {
03187         RioErr << "insertevent: Could not allocate memory" << endl;
03188         return -1;
03189     }
03190     return 1;
03191 }

PSTREAMLIST CStreamManager::InsertNewClient ( PSEQBLOCKLIST  NewClient,
RioStream stream 
)

Definition at line 3559 of file server/StreamManager.cpp.

03561 {
03562     pthread_mutex_lock( &MutexModifyClientsList );
03563 
03564     PSEQBLOCKLIST reqlistaux;
03565     PSTREAMLIST curlist, prevlist;
03566 
03567     prevlist = m_AdmittedClientsList;
03568     while(( prevlist != NULL ) && ( prevlist->next != NULL ))
03569     {
03570         prevlist = prevlist->next;
03571     }
03572 
03573     reqlistaux = NewClient;
03574     curlist = ( PSTREAMLIST )malloc( sizeof( STREAMLIST ) );
03575     if( curlist )
03576     {
03577         curlist->stream = stream;
03578         curlist->reqlist = reqlistaux;
03579         curlist->next = NULL;
03580         curlist->nextBlockToBeRequested = reqlistaux;
03581         curlist->nextBlockToBeFetched   = reqlistaux;
03582 
03583         curlist->aux = reqlistaux;
03584         curlist->auxnext = NULL;
03585 
03586         if( prevlist )
03587         {
03588             curlist->prev = prevlist;
03589             prevlist->next = curlist;
03590 
03591             curlist->auxprev = prevlist;
03592             prevlist->auxnext = curlist;
03593         }
03594         else
03595         {
03596             m_AdmittedClientsList = curlist;
03597             curlist->prev = NULL;
03598             curlist->auxprev = NULL;
03599         }
03600         m_NumberOfAdmittedClients++;
03601     }
03602     else
03603     {
03604         #ifdef RIO_DEBUG2
03605         if( m_log.is_open() ) m_log << "InsertNewClient: Could not insert new client!" << endl;
03606         #endif
03607         return NULL;
03608     }
03609 
03610     #ifdef RIO_DEBUG2
03611     if( m_log.is_open() ) m_log << "InsertNewClient: OK!" << endl;
03612     #endif
03613     pthread_mutex_unlock( &MutexModifyClientsList );
03614 
03615     return curlist; //success
03616 }

HRESULT CStreamManager::MaxRequests ( RioStreamId  Stream  ) 

Definition at line 492 of file interface/StreamManager.cpp.

00493 {
00494     char          *Parameter = (char*) malloc( MaxTCPDataSize*sizeof(char) );
00495     char          *Result;
00496     unsigned int ParameterSize;
00497     unsigned int ResultSize;
00498     HRESULT       status;
00499 
00500     if( Parameter == NULL )
00501     {
00502         RioErr << "malloc error MaxRequests:" << strerror(errno) << endl;
00503         free( Parameter );
00504 
00505         return ERROR_RIOPROXY + ERROR_MEMORY;
00506     }
00507 
00508     memset( Parameter, 0, MaxTCPDataSize*sizeof( char ) );
00509 
00510     ParameterSize = 2 * MAX_LONG_STRING_SIZE;
00511 
00512     // Check if Parameter buffer is large enough
00513     if(ParameterSize > (unsigned int) MaxTCPDataSize)
00514     {
00515         free( Parameter );
00516 
00517         return ERROR_RIOPROXY + ERROR_CALL_MESSAGE_OVERFLOW;
00518     }
00519 
00520     int offset = 0;
00521 
00522     // Set parameters on asii buffer
00523     // Set Stream Id
00524     SetLong(Parameter,Stream.Version,&offset);
00525     SetLong(Parameter,Stream.Index,&offset);
00526 
00527     // Update Parameter size to actual size
00528     // (previously was maximum possible size)
00529     ParameterSize = offset;
00530 
00531     // Call remote method
00532     status = m_TCPconnection->Call(RioClassStreamManager,
00533                                    RioMethodStreamManagerMaxRequests,
00534                                    ParameterSize,
00535                                    Parameter,
00536                                    &ResultSize,
00537                                    &Result);
00538 
00539     // check if call failed
00540     if( FAILED(status) )
00541     {
00542         if( ( unsigned int ) status == ( ERROR_RIOTCP + ERROR_MEMORY ) )
00543             status = ERROR_RIOPROXY + ERROR_MEMORY;
00544         free( Parameter );
00545         if( Result != NULL )
00546             free( Result );
00547 
00548         return status;
00549     }
00550 
00551     offset = 0;
00552     // Get results from result buffer
00553     // Get method return value
00554     if( !GetLong( Result, ResultSize, &status, &offset ) )
00555     {
00556         free( Parameter );
00557         free( Result );
00558 
00559         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00560     }
00561 
00562     free( Parameter );
00563     free( Result );
00564 
00565     return status;
00566 }

int CStreamManager::Open ( RioStreamTraffic Traffic,
RioStream **  Stream,
SOCKADDR_IN  RemoteAddress 
)

Definition at line 676 of file server/StreamManager.cpp.

00678 {
00679     // Make sure we are initialized
00680     if( !m_initialized )
00681     {
00682         RioErr << "Open(): Component not initialized" << endl;
00683         return( ERROR_STREAMMANAGER + ERROR_NOT_INITIALIZED);
00684     }
00685 
00686     RioTrafficType type = Traffic->Type;
00687     RioStreamDirection direction = Traffic->Direction;
00688 
00689     // Make sure write streams are non real time
00690     if(( direction != RioStreamDirectionRead ) && ( type != RIO_TRAFFIC_NRT ))
00691     {
00692         return ERROR_STREAMMANAGER + ERROR_INVALID_DIRECTION;
00693     }
00694     // Get exclusive access to table
00695     m_MutexTable->Wait();
00696 
00697     // check if there is an entry available in table
00698     if( m_used >= m_MaxStreams )
00699     {
00700         m_MutexTable->Release();
00701         RioErr  << "StreamManager: max number of streams !!!" << endl;
00702         return ERROR_STREAMMANAGER + ERROR_MAX_STREAMS;
00703     }
00704 
00705     double rate = 0.0;
00706 
00707     // Check traffic type and do admission control if traffic is real time
00708     if( type == RIO_TRAFFIC_CBR )
00709     {
00710         rate = Traffic->TrafficCBR.Rate;
00711 
00712         if( m_log.is_open() )
00713         {
00714             char *info = myInfo();
00715             m_log << endl
00716                   << "-------------" << info << "---------------"
00717                   << endl
00718                   << "StreamManager: active streams  " << m_used
00719                   << ". New traffic CBR. Used rate " << m_UsedRate
00720                   << " request rate " << rate << endl;
00721             free( info );
00722         }
00723 
00724         // Do admission control (old version)
00725         //(Check if available rate is sufficient for new Stream)
00726         if( ( m_UsedRate + rate ) > ( m_TotalRate - m_NRTReservedRate ) )
00727         {
00728             m_MutexTable->Release();
00729 
00730             if( m_log.is_open() )
00731                 m_log << "StreamManager: Could not admit new user." << endl;
00732 
00733             return ERROR_STREAMMANAGER + ERROR_STREAM_REFUSED;
00734         }
00735     }
00736     else
00737     {
00738         if( ( type != RIO_TRAFFIC_NRT ) && ( type != RIO_TRAFFIC_VBR ) )
00739         {
00740             m_MutexTable->Release();
00741             return ERROR_STREAMMANAGER + ERROR_INVALID_TRAFFIC;
00742         }
00743         if( type == RIO_TRAFFIC_VBR )
00744         {
00745             rate = Traffic->TrafficVBR.Rate;
00746         }
00747 
00748         if( m_log.is_open() )
00749         {
00750             char *info = myInfo();
00751             m_log << endl
00752                   << "-------------" << info << "---------------"
00753                   << endl
00754                   << "StreamManager: active streams  " << m_used
00755                   << ". New traffic NRT or VBR." << endl;
00756             free( info );
00757         }
00758     }
00759 
00760     if( Traffic->LogicalBlockSize != m_BlockSize )
00761     {
00762         if( m_log.is_open() )
00763             m_log << "[StreamManager] Error: Invalid BlockSize." << endl;
00764 
00765         m_MutexTable->Release();
00766         return ERROR_STREAMMANAGER + ERROR_INVALID_BLOCKSIZE;
00767     }
00768 
00769     // Get stream from free list
00770     RioStream *sp = m_FreeList.First();
00771     m_FreeList.Remove( sp );
00772 
00773     // Increment counter of active streams
00774     m_used++;
00775 
00776     // Maximum number of requests this stream may have active
00777     int MaxRequests;
00778     int UserMaxRequests;
00779 
00780     // Non real time requests can generate very high bandwidth traffic
00781     // and may require lots of requests
00782     // Accept 10% more than what user is told to avoid problems that happens
00783     // when user receives data from previous request but stream manager
00784     // has not freed request yet
00785     if( type == RIO_TRAFFIC_NRT )
00786     {
00787         MaxRequests = ( int ) ( m_MaxActiveRequests * 1.1 );
00788         UserMaxRequests = m_MaxActiveRequests;
00789     }
00790 
00791     // number of Real time requests depends on stream rate
00792     // Reserve at least twice the amount required to hold
00793     // requests above stream rate (also reserve a minimum,
00794     // constant for all streams)
00795     else
00796     {
00797         /*
00798         MaxRequests = ( int )( m_ExtraStreamRequests * 2 +
00799                       ( 3 * ( rate / m_TotalRate ) * m_MaxActiveRequests ));
00800         UserMaxRequests = ( int )( m_ExtraStreamRequests +
00801                       ( 2 * ( rate / m_TotalRate ) * m_MaxActiveRequests ));
00802         */
00803         //As formulas acima deve ser reestudadas e as atribuicoes abaixo devem
00804         //ser eliminadas: Esta formula nao leva em consideracao o tamanho do
00805         //playoutbuffer do cliente nem se ele eh multicast, fatores que fazem
00806         //com que a formula acima nao seja satisfatoria para tais cenarios pois
00807         //os clientes tendem a ser finalizados por escederem o limite de
00808         //requisicoes.
00809         MaxRequests = 50;
00810         UserMaxRequests = 50;
00811         // Update maximum number of requests across all streams
00812     }
00813 
00814     // Updates specific for real-time streams
00815     if(type == RIO_TRAFFIC_CBR )
00816     {
00817         // Increment Total Used Bandwidth by active streams
00818         m_UsedRate += rate;
00819     }
00820     //Traffic NRT and VBR does not allocate rate.
00821 
00822     // Update stream variables
00823     sp->s_Rate              = rate;
00824     sp->s_Direction         = Traffic->Direction;
00825 
00826     sp->m_RemoteAddress     = RemoteAddress;
00827     sp->m_Status            = RioStream::StreamStatusOpened;
00828     sp->m_Type              = type;
00829     sp->m_nRequests         = 0;
00830     sp->m_nQueue            = 0;
00831     sp->m_MaxRequests       = MaxRequests;
00832     sp->m_UserMaxRequests   = UserMaxRequests;
00833 
00834     sp->m_AverageRTTtoClient_msec    = 0;
00835     sp->m_ClientBufferSize           = 0;
00836     sp->m_ServerBufferStatus         = 0;
00837     sp->m_RequestList                = NULL;
00838     sp->m_PosAtClientsList           = NULL;
00839     sp->m_Buffer                     = NULL;
00840     sp->m_TokensToSend               = 0;
00841     sp->m_PrefetchedAllBlocks        = false;
00842     sp->m_StartingToSend             = true;
00843     sp->m_TokensToDisk               = 0;
00844     sp->m_WaitingQueueSize           = 0;
00845     sp->m_TotalBlocks                = 0;
00846     sp->m_SampleGenReqListTime       = 0;
00847     sp->m_SampleSortListTime         = 0;
00848     sp->m_SampleSimulationTime       = 0;
00849     sp->m_SampleCACTime              = 0;
00850     sp->m_SampleAdmissionProcessTime = 0;
00851     sp->m_SampleWaitingTime          = 0;
00852     sp->m_WaitingQueueSize           = 0;
00853     // to change id!
00854     sp->m_Id                         = m_lastID = (m_lastID + 1 ) % 65000;
00855     // ------------------------------------------------------------------------
00856 
00857     sp->s_streamid            = (int) m_used;
00858     sp->m_RequestCount        = 0;
00859     sp->m_QueuedCount         = 0;
00860     sp->m_NextRelease.tv_sec  = 0;
00861     sp->m_NextRelease.tv_usec = 0;
00862 
00863     long double arrival_interval  = (( m_BlockSize / rate ) * 1000000.0 );
00864     sp->m_ArrivalInterval.tv_sec  = (u64) ( arrival_interval / 1000000 );
00865     sp->m_ArrivalInterval.tv_usec = (u64) ( (u64) arrival_interval % 1000000 );
00866 
00867     #ifdef RIO_DEBUG2
00868     char Logfile[ MaxPathSize ]; // 100
00869     sprintf( Logfile, "%sStream.%d.log", m_LogsDirectory, sp->m_Id );
00870     sp->m_log.open( Logfile );
00871     if( sp->m_log.is_open() )
00872     {
00873         sp->m_log << "Arrival Interval " << arrival_interval/1000.0 << " msec"
00874                   << endl << "MaxRequests: " << sp->m_MaxRequests
00875                   << " UserMaxRequests: " << sp->m_UserMaxRequests << endl;
00876     }
00877     #endif
00878 
00879     #ifdef RIO_DEBUG2
00880     char LogRfile[ MaxPathSize ]; // 100
00881     sprintf( LogRfile, "%sStream.%d.requests", m_LogsDirectory, sp->m_Id);
00882     sp->m_log_requests.open( LogRfile );
00883     if( sp->m_log_requests.is_open() )
00884         sp->m_log_requests << "Block\tEstimatedTime\tTime"<<endl;
00885     #endif
00886 
00887     *Stream = sp;
00888 
00889     m_MutexTable->Release();
00890 
00891     return S_OK;
00892 }

HRESULT CStreamManager::OpenObject ( const RioStreamId  Stream,
const char *  ObjectName,
const RioAccess  Access,
struct timeval  RTT_average,
int  BufferSize,
ObjectHandle Handle,
RioObjectSize Size,
int *  m_FlagRequests,
int *  m_PID 
)

Definition at line 247 of file interface/StreamManager.cpp.

00258 {
00259     char          *Parameter = (char*) malloc( MaxTCPDataSize*sizeof(char) );
00260     char          *Result;
00261     unsigned int ParameterSize;
00262     unsigned int ResultSize;
00263     HRESULT       status;
00264 
00265     if( Parameter == NULL )
00266     {
00267         RioErr << "malloc error OpenObject:" << strerror(errno) << endl;
00268         free( Parameter );
00269 
00270         return ERROR_RIOPROXY + ERROR_MEMORY;
00271     }
00272 
00273     memset( Parameter, 0, MaxTCPDataSize*sizeof( char ) );
00274 
00275     ParameterSize = 5 * MAX_LONG_STRING_SIZE +
00276                     strlen(ObjectName) + 1 +
00277                     MAX_ULONG_STRING_SIZE;
00278     // Check if Parameter buffer is large enough
00279     if(ParameterSize > (unsigned int) MaxTCPDataSize)
00280     {
00281         free( Parameter );
00282 
00283         return ERROR_RIOPROXY + ERROR_CALL_MESSAGE_OVERFLOW;
00284     }
00285 
00286     int offset = 0;
00287 
00288     // Set parameters on asii buffer
00289     // Set Stream Id
00290     SetLong(Parameter,Stream.Version,&offset);
00291     SetLong(Parameter,Stream.Index,&offset);
00292     // Set Object Name
00293     SetString(Parameter,ObjectName,&offset);
00294     // Set Access type
00295     SetULong(Parameter,Access,&offset);
00296 
00297     // Set RTT
00298     #ifdef WINDOWS
00299     // Windows implementation
00300     SetLong(Parameter,RTT_average.wSecond,&offset);
00301     SetLong(Parameter,RTT_average.wMilliseconds,&offset);
00302     #else
00303     // Linux implementation
00304     SetLong(Parameter,RTT_average.tv_sec,&offset);
00305     SetLong(Parameter,RTT_average.tv_usec,&offset);
00306     #endif
00307     /* ------------------------------------------ */
00308 
00309     // Set buffersize
00310     SetLong(Parameter,BufferSize,&offset);
00311 
00312     // Update Parameter size to actual size
00313     // (previously was maximum possible size)
00314     ParameterSize = offset;
00315 
00316     // Call remote method
00317     status = m_TCPconnection->Call(RioClassStreamManager,
00318                                    RioMethodStreamManagerOpenObjectSubmitToCAC,
00319                                    ParameterSize,
00320                                    Parameter,
00321                                    &ResultSize,
00322                                    &Result);
00323 
00324     // check if call failed
00325     if(FAILED (status))
00326     {
00327         if( ( unsigned int ) status == ( ERROR_RIOTCP + ERROR_MEMORY ) )
00328             status = ERROR_RIOPROXY + ERROR_MEMORY;
00329         free( Parameter );
00330         if( Result != NULL )
00331             free( Result );
00332 
00333         return status;
00334     }
00335 
00336     offset = 0;
00337     // Get results from result buffer
00338     // Get method return value
00339     if(!GetLong(Result,ResultSize,&status,&offset))
00340     {
00341         free( Parameter );
00342         free( Result );
00343 
00344         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00345     }
00346     // Get Object Hanlde
00347     if(!GetLong(Result,ResultSize,&(Handle->Version),&offset))
00348     {
00349         free( Parameter );
00350         free( Result );
00351 
00352         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00353     }
00354     if(!GetLong(Result,ResultSize,&(Handle->Index),&offset))
00355     {
00356         free( Parameter );
00357         free( Result );
00358 
00359         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00360     }
00361     // Get Object Size - reduce 1 RTT
00362     unsigned int highpart;
00363     unsigned int lowpart;
00364     if( !GetULong( Result,ResultSize,&highpart,&offset ))
00365     {
00366         free( Parameter );
00367         free( Result );
00368 
00369         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00370     }
00371     if( !GetULong( Result,ResultSize,&lowpart,&offset ))
00372     {
00373         free( Parameter );
00374         free( Result );
00375 
00376         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00377     }
00378     *Size = ((( RioObjectSize ) highpart ) << 32 ) | lowpart ;
00379 
00380     if(!GetLong(Result,ResultSize,m_FlagRequests,&offset))
00381     {
00382         free( Parameter );
00383         free( Result );
00384 
00385         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00386     }
00387 
00388     // Se Flag com valor 2 - Ignorar pois n�o haver� transmiss�o multicast
00389     // Se Flag com valor 1 - Deve fazer os pedidos pelos blocos de dados
00390     // pois este � o l�der do grupo multicast
00391     // Se Flag com valor 0 - N�o deve fazer pedidos pelos blocos, pois n�o
00392     // n�o � o l�der do grupo
00393 
00394     if( *m_FlagRequests != 2 )
00395     {
00396         if(!GetLong(Result,ResultSize,m_PID,&offset))
00397         {
00398             free( Parameter );
00399             free( Result );
00400 
00401             return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00402         }
00403     }
00404 
00405     free( Parameter );
00406     free( Result );
00407 
00408     return status;
00409 }

HRESULT CStreamManager::OpenObject ( const RioStreamId  Stream,
const char *  ObjectName,
const RioAccess  Access,
ObjectHandle Handle 
)

Definition at line 129 of file interface/StreamManager.cpp.

00133 {
00134     char          *Parameter = (char*) malloc( MaxTCPDataSize*sizeof(char) );
00135     char          *Result;
00136     unsigned int ParameterSize;
00137     unsigned int ResultSize;
00138     HRESULT       status;
00139 
00140     if( Parameter == NULL )
00141     {
00142         RioErr << "malloc error OpenObject:" << strerror(errno) << endl;
00143         free( Parameter );
00144 
00145         return ERROR_RIOPROXY + ERROR_MEMORY;
00146     }
00147 
00148     memset( Parameter, 0, MaxTCPDataSize*sizeof( char ) );
00149 
00150     ParameterSize = 2 * MAX_LONG_STRING_SIZE +
00151                     strlen(ObjectName) + 1 +
00152                     MAX_ULONG_STRING_SIZE;
00153     // Check if Parameter buffer is large enough
00154     if(ParameterSize > (unsigned int) MaxTCPDataSize)
00155     {
00156         free( Parameter );
00157 
00158         return ERROR_RIOPROXY + ERROR_CALL_MESSAGE_OVERFLOW;
00159     }
00160 
00161     int offset = 0;
00162 
00163     // Set parameters on asii buffer
00164     // Set Stream Id
00165     SetLong(Parameter,Stream.Version,&offset);
00166     SetLong(Parameter,Stream.Index,&offset);
00167     // Set Object Name
00168     SetString(Parameter,ObjectName,&offset);
00169     // Set Access type
00170     SetULong(Parameter,Access,&offset);
00171 
00172     // Update Parameter size to actual size
00173     // (previously was maximum possible size)
00174     ParameterSize = offset;
00175 
00176     #ifdef RIO_DEBUG2
00177     RioErr << "RioMethodStreamManagerOpenObject "
00178            << RioMethodStreamManagerOpenObject << endl;
00179     #endif // RIO_DEBUG2
00180 
00181     // Call remote method
00182     status = m_TCPconnection->Call(RioClassStreamManager,
00183                                    RioMethodStreamManagerOpenObject,
00184                                    ParameterSize,
00185                                    Parameter,
00186                                    &ResultSize,
00187                                    &Result);
00188     // check if call failed
00189     if(FAILED (status))
00190     {
00191         if( ( unsigned int ) status == ( ERROR_RIOTCP + ERROR_MEMORY ) )
00192             status = ERROR_RIOPROXY + ERROR_MEMORY;
00193         free( Parameter );
00194         if( Result != NULL )
00195             free( Result );
00196 
00197         return status;
00198     }
00199 
00200     offset = 0;
00201     // Get results from result buffer
00202     // Get method return value
00203     if(!GetLong(Result,ResultSize,&status,&offset))
00204     {
00205         free( Parameter );
00206         free( Result );
00207 
00208         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00209     }
00210     // Get Object Hanlde
00211     if(!GetLong(Result,ResultSize,&(Handle->Version),&offset))
00212     {
00213         free( Parameter );
00214         free( Result );
00215 
00216         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00217     }
00218     if(!GetLong(Result,ResultSize,&(Handle->Index),&offset))
00219     {
00220         free( Parameter );
00221         free( Result );
00222 
00223         return ERROR_RIOPROXY + ERROR_MESSAGE_DATA_FORMAT;
00224     }
00225 
00226     free( Parameter );
00227     free( Result );
00228 
00229     return status;
00230 }

void CStreamManager::PostITEvent ( MonitorEvent event  ) 

Definition at line 895 of file server/StreamManager.cpp.

00896 {
00897     m_SystemManager->PostITEvent( event );
00898 }

void CStreamManager::RemoveClient ( PSTREAMLIST  ClientPosition  ) 

Definition at line 3618 of file server/StreamManager.cpp.

03619 {
03620     #ifdef RIO_DEBUG2
03621     if( m_log.is_open() ) m_log << "Removing Client: " << ClientPosition->stream->GetId();
03622     #endif
03623 
03624     pthread_mutex_lock( &MutexModifyClientsList );
03625 
03626     m_NumberOfAdmittedClients--;
03627 
03628     // if it is the first
03629     if( ClientPosition == m_AdmittedClientsList )
03630     {
03631         m_AdmittedClientsList = ClientPosition->next;
03632 
03633         //if there is another stream, this is the first now
03634         if( m_AdmittedClientsList != NULL )
03635         {
03636             m_AdmittedClientsList->prev = NULL;
03637             m_AdmittedClientsList->auxprev = NULL;
03638 
03639         }
03640     }
03641     else
03642     {
03643        if( ClientPosition->next != NULL )
03644        {
03645            ClientPosition->next->prev = ClientPosition->prev;
03646            ClientPosition->next->auxprev = ClientPosition->prev;
03647        }
03648        ClientPosition->prev->next = ClientPosition->next;
03649        ClientPosition->prev->auxnext = ClientPosition->next;
03650     }
03651 
03652     PSEQBLOCKLIST current = ClientPosition->reqlist;
03653     while( current != NULL )
03654     {
03655         if( current->next != NULL )
03656         {
03657             current = current->next;
03658             if( current->prev != NULL )
03659             {
03660                 free( current->prev );
03661                 current->prev = NULL;
03662 
03663             }
03664         }
03665         else
03666         {
03667             free( current );
03668             current = NULL;
03669         }
03670     }
03671 
03672     free( ClientPosition );
03673     ClientPosition = NULL;
03674 
03675     pthread_mutex_unlock( &MutexModifyClientsList );
03676 
03677     #ifdef RIO_DEBUG2
03678     if( m_log.is_open() ) m_log << " OK!" <<  endl;
03679     #endif
03680 }

int CStreamManager::SaveMeasures (  ) 

Definition at line 4385 of file server/StreamManager.cpp.

04386 {
04387     FILE *log, *logDSRT, *logDQT;
04388     char logname[100]  = "";
04389 
04390     if( m_CollectMeasures )
04391     {
04392         pthread_mutex_lock( &MutexUpdateMeasures );
04393 
04394         GetDiskServiceTime();
04395         #ifdef RIO_DEBUG2
04396         if( m_log.is_open() ) m_log << "Updating disk service time and queue time info ... ";
04397         #endif
04398 
04399         UpdateDiskQueueAndServiceTime();
04400 
04401         #ifdef RIO_DEBUG2
04402         if( m_log.is_open() ) m_log << "OK." << endl;
04403         #endif
04404 
04405         // saving CAC information
04406         if( (log = fopen( "AverageCACTimeaccClients.log", "w" )) == NULL )
04407         {
04408             pthread_mutex_unlock( &MutexUpdateMeasures );
04409             return -1;
04410         }
04411         fprintf( log, "# AverageCACTime %f msec (Variance %f)\n", m_AverageCACTime, m_VarianceCACTime );
04412         fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04413         fprintf( log, "# Measures in msec\n" );
04414         for( int j = 1; ( j < 1001 ) && ( m_AverageCACTimeAccClients[ j ] != 0); j++ )
04415         {
04416             fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageCACTimeAccClients[ j ], m_VarianceCACTimeAccClients[ j ], m_SamplesCACTimeAccClients[ j ]  );
04417         }
04418         fclose( log );
04419 
04420         // saving Admission information
04421         if( (log = fopen( "AverageAdmissionProcessTimeaccClients.log", "w" )) == NULL )
04422         {
04423             pthread_mutex_unlock( &MutexUpdateMeasures );
04424             return -1;
04425         }
04426         fprintf( log, "# AverageAdmissionProcessTime %f msec (Variance %f)\n", m_AverageAdmissionProcessTime, m_VarianceAdmissionProcessTime );
04427         fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04428         fprintf( log, "# Measures in msec\n" );
04429         for( int j = 1; ( j < 1001 ) && ( m_AverageAdmissionProcessTimeAccClients[ j ] != 0); j++ )
04430         {
04431             fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageAdmissionProcessTimeAccClients[ j ], m_VarianceAdmissionProcessTimeAccClients[ j ], m_SamplesAdmissionProcessTimeAccClients[ j ]  );
04432         }
04433         fclose( log );
04434 
04435         // saving Sort information
04436         if( (log = fopen( "AverageSortTimeaccClients.log", "w" )) == NULL )
04437         {
04438             pthread_mutex_unlock( &MutexUpdateMeasures );
04439             return -1;
04440         }
04441         fprintf( log, "# AverageSortTime %f msec (Variance %f)\n", m_AverageSortListTime, m_VarianceSortListTime);
04442         fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04443         fprintf( log, "# Measures in msec\n" );
04444         for( int j = 1; j < 1001; j++ )
04445         {
04446             if( m_AverageSortListTimeAccClients[ j ] != 0 )
04447                 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageSortListTimeAccClients[ j ], m_VarianceSortListTimeAccClients[ j ], m_SamplesSortListTimeAccClients[ j ] );
04448         }
04449         fclose( log );
04450 
04451         // saving Simulation information
04452         if( (log = fopen( "AverageSimulationTimeaccClients.log", "w" )) == NULL )
04453         {
04454             pthread_mutex_unlock( &MutexUpdateMeasures );
04455             return -1;
04456         }
04457         fprintf( log, "# AverageSimulationTime %f msec (Variance %f)\n", m_AverageSimulationTime, m_VarianceSimulationTime );
04458         fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04459         fprintf( log, "# Measures in msec\n" );
04460         for( int j = 1; j < 1001; j++ )
04461         {
04462             if( m_AverageSimulationTimeAccClients[ j ] != 0 )
04463                 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageSimulationTimeAccClients[ j ], m_VarianceSimulationTimeAccClients[ j ], m_SamplesSimulationTimeAccClients[ j ] );
04464         }
04465         fclose( log );
04466 
04467         // saving Waiting information
04468         if( (log = fopen( "AverageWaitingTimeaccClients.log", "w" )) == NULL )
04469         {
04470             pthread_mutex_unlock( &MutexUpdateMeasures );
04471             return -1;
04472         }
04473         fprintf( log, "# AverageWaitingTime %f msec (Variance %f)\n", m_AverageWaitingTime, m_VarianceWaitingTime );
04474         fprintf( log, "# Number of Samples %lld msec\n", m_NumberOfSamples );
04475         fprintf( log, "# Measures in msec\n" );
04476         for( int j = 1; j < 1001; j++ )
04477         {
04478             if( m_AverageWaitingTimeAccClients[ j ] != 0 )
04479                 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageWaitingTimeAccClients[ j ], m_VarianceWaitingTimeAccClients[ j ], m_SamplesWaitingTimeAccClients[ j ] );
04480         }
04481         fclose( log );
04482 
04483         // saving Waiting information
04484         if( (log = fopen( "AverageWaitingTimeaccQueue.log", "w" )) == NULL )
04485         {
04486             pthread_mutex_unlock( &MutexUpdateMeasures );
04487             return -1;
04488         }
04489         fprintf( log, "# Measures in msec\n" );
04490         for( int j = 0; j < 1001; j++ )
04491         {
04492             if( m_AverageWaitingTimeAccQueue[ j ] != 0 )
04493                 fprintf( log, "%d\t%f\t# %f %d\n", j, m_AverageWaitingTimeAccQueue[ j ], m_VarianceWaitingTimeAccQueue[ j ], m_SamplesWaitingTimeAccQueue[ j ] );
04494         }
04495         fclose( log );
04496 
04497         if( m_log.is_open() ) m_log << "CStreamManager::SaveMeasures: number of disks "<< m_MaxNumberOfDisks << endl;
04498 
04499         for( int i = 1; i < m_MaxNumberOfDisks; i++ )
04500         {
04501            if( m_EstimatedDiskServiceTimeOfDisk[ i ] != -1 )
04502            {
04503                 sprintf( logname, "EstimatedDisk_%d_ServiceAndResponseTime.log", i );
04504                 if( (logDSRT = fopen( logname, "w" )) == NULL )
04505                 {
04506                     pthread_mutex_unlock( &MutexUpdateMeasures );
04507                     return -1;
04508                 }
04509                 sprintf( logname, "EstimatedDisk_%d_QueueTime.log", i );
04510                 if( (logDQT = fopen( logname, "w" )) == NULL )
04511                 {
04512                     pthread_mutex_unlock( &MutexUpdateMeasures );
04513                     return -1;
04514                 }
04515                 fprintf( logDSRT, "# EstimatedDiskServiceTime %f msec\n",
04516                          m_EstimatedDiskServiceTimeOfDisk[ i ]);
04517 
04518                 fprintf( logDQT, "# EstimatedDiskQueueTime %f msec\n",
04519                          m_EstimatedDiskQueueTimeOfDisk[ i ]);
04520 
04521                 fprintf( logDSRT, "# Measures in msec \n#AccThreads\tServiceTime\t\t\tResponseAccThreads\n");
04522 
04523                 fprintf( logDQT, "# Measures in msec \n#AccQueueSize\tTime\n");
04524                 fprintf( logDQT, "%d\t\t%f\n", m_MaxPendingRequests, m_EstimatedDiskQueueTime[ i ][ 0 ] );
04525                 for( int j = 1; j < 301; j++ )
04526                 {
04527 
04528                     fprintf( logDSRT, "%d\t\t%f\t\t%f\t\t%f\n", j, m_EstimatedDiskServiceTime[ i ][ j ], m_EstimatedDiskResponseTime[ i ][ j ], m_DeviationDiskResponseTime[ i ][ j ]);
04529                     fprintf( logDQT, "%d\t\t%f\n", j + m_MaxPendingRequests, m_EstimatedDiskQueueTime[ i ][ j ]);
04530                 }
04531                 fclose(logDSRT);
04532                 fclose(logDQT);
04533             }
04534         }
04535         //m_log << "CStreamManager::SaveMeasures finished." << endl;
04536         pthread_mutex_unlock( &MutexUpdateMeasures );
04537         return S_OK;
04538     }
04539     else
04540     {
04541         return -1;
04542     }
04543 }

void CStreamManager::SetAverageAdmissionProcessTime ( double  sample,
RioStream stream,
int  index 
)

Definition at line 4151 of file server/StreamManager.cpp.

04152 {
04153     double average;
04154 
04155     average  = m_AverageAdmissionProcessTime;
04156 
04157     m_AverageAdmissionProcessTime = m_AverageAdmissionProcessTime +
04158           (( sample - m_AverageAdmissionProcessTime )/( m_NumberOfSamples + 1 ));
04159 
04160     if( m_NumberOfSamples >= 1 )
04161     {
04162         m_VarianceAdmissionProcessTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceAdmissionProcessTime
04163                            + (m_NumberOfSamples + 1) * pow((m_AverageAdmissionProcessTime - average), 2);
04164     }
04165 
04166     average  = m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ];
04167 
04168     m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] = m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]+
04169           (( sample - m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] )/( m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1 ));
04170 
04171     if( m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] >= 1 )
04172     {
04173         m_VarianceAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] = (1 - (float) 1/m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] ) * m_VarianceAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]
04174                    + (m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1) * pow((m_AverageAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]- average), 2);
04175     }
04176 
04177     m_SamplesAdmissionProcessTimeAccClients[ m_NumberOfAdmittedClients+index ]++;
04178     if( m_CollectMeasures )
04179     {
04180         if( m_logSAPT.is_open() )
04181             m_logSAPT << stream->m_Id << "\t"<< sample << endl;
04182         if( m_logEAPT.is_open() )
04183             m_logEAPT << stream->m_Id << "\t"<< m_AverageAdmissionProcessTime
04184                                       << "\t# "<< m_VarianceAdmissionProcessTime << endl;
04185     }
04186 }

void CStreamManager::SetAverageBufferTime ( double  sample,
RioStream stream 
)

Definition at line 4070 of file server/StreamManager.cpp.

04071 {
04072     double average;
04073 
04074     average  = m_AverageBufferTime;
04075 
04076     m_AverageBufferTime = m_AverageBufferTime +
04077           (( sample - m_AverageBufferTime )/( m_NumberOfSamples + 1 ));
04078 
04079     if( m_NumberOfSamples >= 1 )
04080     {
04081         m_VarianceBufferTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceBufferTime
04082                            + (m_NumberOfSamples + 1) * pow((m_AverageBufferTime - average), 2);
04083     }
04084 
04085     average  = m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ];
04086 
04087     m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] = m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] +
04088           (( sample - m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] )/( m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] + 1 ));
04089 
04090     if( m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] >= 1 )
04091     {
04092         m_VarianceBufferTimeAccClients[ m_NumberOfAdmittedClients ] = (1 - (float) 1/m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] ) * m_VarianceBufferTimeAccClients[ m_NumberOfAdmittedClients ]
04093                            + (m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ] + 1) * pow((m_AverageBufferTimeAccClients[ m_NumberOfAdmittedClients ] - average), 2);
04094     }
04095 
04096     m_SamplesBufferTimeAccClients[ m_NumberOfAdmittedClients ]++;
04097 
04098     if( m_CollectMeasures )
04099     {
04100         if( m_logSBUFFERT.is_open() )
04101             m_logSBUFFERT << stream->m_Id << "\t" << sample << endl;
04102         if( m_logEBUFFERT.is_open() )
04103             m_logEBUFFERT << stream->m_Id << "\t" << m_AverageBufferTime
04104                                           <<"\t# "<< m_VarianceBufferTime<< endl;
04105     }
04106 }

void CStreamManager::SetAverageCACTime ( double  sample,
RioStream stream,
int  index 
)

Definition at line 4109 of file server/StreamManager.cpp.

04110 {
04111     double average;
04112 
04113     average  = m_AverageCACTime;
04114 
04115     m_AverageCACTime = m_AverageCACTime + (( sample - m_AverageCACTime )/( m_NumberOfSamples + 1 ));
04116 
04117     if( m_NumberOfSamples >= 1 )
04118     {
04119         m_VarianceCACTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceCACTime
04120                            + (m_NumberOfSamples + 1) * pow((m_AverageCACTime - average), 2);
04121     }
04122 
04123     average  = m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ];
04124 
04125     m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] = m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] +
04126           (( sample - m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] )/( m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1 ));
04127 
04128     if( m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] >= 1 )
04129     {
04130         m_VarianceCACTimeAccClients[ m_NumberOfAdmittedClients+index ] = (1 - (float) 1/m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] ) * m_VarianceCACTimeAccClients[ m_NumberOfAdmittedClients+index ]
04131                            + (m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1) * pow((m_AverageCACTimeAccClients[ m_NumberOfAdmittedClients+index ] - average), 2);
04132     }
04133 
04134     m_SamplesCACTimeAccClients[ m_NumberOfAdmittedClients+index ]++;
04135     m_NumberOfSamples++;
04136 
04137     UpdateNumberOfWaitingClients(-1);
04138     pthread_mutex_unlock( &MutexRunCAC );
04139 
04140     if( m_CollectMeasures )
04141     {
04142         if( m_logSCACT.is_open() )
04143             m_logSCACT << stream->m_Id << "\t"  << sample << endl;
04144         if( m_logECACT.is_open() )
04145             m_logECACT << stream->m_Id << "\t"  << m_AverageCACTime
04146                                        <<"\t# " << m_VarianceCACTime<< endl;
04147     }
04148 }

void CStreamManager::SetAverageGenReqListTime ( double  sample,
RioStream stream 
)

Definition at line 4006 of file server/StreamManager.cpp.

04007 {
04008     double average;
04009 
04010     average  = m_AverageGenReqListTime;
04011 
04012     m_AverageGenReqListTime = m_AverageGenReqListTime +
04013           (( sample - m_AverageGenReqListTime )/( m_NumberOfSamples + 1 ));
04014 
04015     if( m_NumberOfSamples >= 1 )
04016     {
04017         m_VarianceGenReqListTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceGenReqListTime
04018                            + (m_NumberOfSamples + 1) * pow((m_AverageGenReqListTime - average), 2);
04019     }
04020 
04021     if( m_CollectMeasures )
04022     {
04023         if( m_logSGENREQLT.is_open() )
04024             m_logSGENREQLT << stream->m_Id << "\t"   << sample << endl;
04025         if( m_logEGENREQLT.is_open() )
04026             m_logEGENREQLT << stream->m_Id << "\t"   << m_AverageGenReqListTime
04027                                            << "\t# " << m_VarianceGenReqListTime<<  endl;
04028     }
04029 }

void CStreamManager::SetAverageSimulationTime ( double  sample,
RioStream stream 
)

Definition at line 4189 of file server/StreamManager.cpp.

04190 {
04191     double average;
04192 
04193     average  = m_AverageSimulationTime;
04194 
04195     m_AverageSimulationTime = m_AverageSimulationTime + (( sample - m_AverageSimulationTime )/( m_NumberOfSamples + 1 ));
04196 
04197     if( m_NumberOfSamples >= 1 )
04198     {
04199         m_VarianceSimulationTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceSimulationTime
04200                            + (m_NumberOfSamples + 1) * pow((m_AverageSimulationTime - average), 2);
04201     }
04202 
04203     average  = m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ];
04204 
04205     m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ] = m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ]+
04206           (( sample - m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ] )/( m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] + 1 ));
04207 
04208     if( m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] >= 1 )
04209     {
04210         m_VarianceSimulationTimeAccClients[ m_NumberOfAdmittedClients ] = (1 - (float) 1/m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] ) * m_VarianceSimulationTimeAccClients[ m_NumberOfAdmittedClients ]
04211                    + (m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ] + 1) * pow((m_AverageSimulationTimeAccClients[ m_NumberOfAdmittedClients ]- average), 2);
04212     }
04213 
04214     m_SamplesSimulationTimeAccClients[ m_NumberOfAdmittedClients ]++;
04215 
04216     if( m_CollectMeasures )
04217     {
04218         if( m_logSSIMULT.is_open() )
04219             m_logSSIMULT << stream->m_Id << "\t"   << sample << endl;
04220         if( m_logESIMULT.is_open() )
04221             m_logESIMULT << stream->m_Id << "\t"   << m_AverageSimulationTime
04222                                          << "\t# " << m_VarianceSimulationTime << endl;
04223     }
04224 }

void CStreamManager::SetAverageSortListTime ( double  sample,
RioStream stream 
)

Definition at line 4032 of file server/StreamManager.cpp.

04033 {
04034     double average;
04035 
04036     average  = m_AverageSortListTime;
04037 
04038     m_AverageSortListTime = m_AverageSortListTime +
04039           (( sample - m_AverageSortListTime )/( m_NumberOfSamples + 1 ));
04040 
04041     if( m_NumberOfSamples >= 1 )
04042     {
04043         m_VarianceSortListTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceSortListTime
04044                            + (m_NumberOfSamples + 1) * pow((m_AverageSortListTime - average), 2);
04045     }
04046 
04047     average  = m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ];
04048 
04049     m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] = m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] +
04050           (( sample - m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] )/( m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] + 1 ));
04051 
04052     if( m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] >= 1 )
04053     {
04054         m_VarianceSortListTimeAccClients[ m_NumberOfAdmittedClients ] = (1 - (float) 1/m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] ) * m_VarianceSortListTimeAccClients[ m_NumberOfAdmittedClients ]
04055                            + (m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ] + 1) * pow((m_AverageSortListTimeAccClients[ m_NumberOfAdmittedClients ] - average), 2);
04056     }
04057     m_SamplesSortListTimeAccClients[ m_NumberOfAdmittedClients ]++;
04058 
04059     if( m_CollectMeasures )
04060     {
04061         if( m_logSSORTLT.is_open() )
04062             m_logSSORTLT << stream->m_Id << "\t"  << sample << endl;
04063         if( m_logESORTLT.is_open() )
04064             m_logESORTLT << stream->m_Id << "\t"  << m_AverageSortListTime
04065                                          << "\t# "<< m_VarianceSortListTime <<  endl;
04066     }
04067 }

void CStreamManager::SetAverageWaitingTime ( double  sample,
RioStream stream,
int  index,
int  queue 
)

Definition at line 4227 of file server/StreamManager.cpp.

04228 {
04229     double average;
04230 
04231     average  = m_AverageWaitingTime;
04232 
04233     m_AverageWaitingTime = m_AverageWaitingTime + (( sample - m_AverageWaitingTime )/( m_NumberOfSamples + 1 ));
04234 
04235     if( m_NumberOfSamples >= 1 )
04236     {
04237         m_VarianceWaitingTime = (1 - (float) 1/m_NumberOfSamples ) * m_VarianceWaitingTime
04238                            + (m_NumberOfSamples + 1) * pow((m_AverageWaitingTime - average), 2);
04239     }
04240 
04241     average  = m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ];
04242 
04243     m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]= m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]+
04244           (( sample - m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] )/( m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1 ));
04245 
04246     if( m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] >= 1 )
04247     {
04248         m_VarianceWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] = (1 - (float) 1/m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] ) * m_VarianceWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]
04249                    + (m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ] + 1) * pow((m_AverageWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]- average), 2);
04250     }
04251 
04252     m_SamplesWaitingTimeAccClients[ m_NumberOfAdmittedClients+index ]++;
04253 
04254     average  = m_AverageWaitingTimeAccQueue[ queue ];
04255 
04256     m_AverageWaitingTimeAccQueue[ queue ]= m_AverageWaitingTimeAccQueue[ queue ]+
04257           (( sample - m_AverageWaitingTimeAccQueue[ queue ] )/( m_SamplesWaitingTimeAccQueue[ queue ] + 1 ));
04258 
04259     if( m_SamplesWaitingTimeAccQueue[ queue ] >= 1 )
04260     {
04261         m_VarianceWaitingTimeAccQueue[ queue ] = (1 - (float) 1/m_SamplesWaitingTimeAccQueue[ queue ] ) * m_VarianceWaitingTimeAccQueue[ queue ]
04262                    + (m_SamplesWaitingTimeAccQueue[ queue ] + 1) * pow((m_AverageWaitingTimeAccQueue[ queue ] - average), 2);
04263     }
04264 
04265     m_SamplesWaitingTimeAccQueue[ queue ]++;
04266 
04267     if( m_CollectMeasures )
04268     {
04269         if( m_logSWAITINGT.is_open() )
04270             m_logSWAITINGT << stream->m_Id << "\t"   << sample << "\t# " << queue << endl;
04271         if( m_logEWAITINGT.is_open() )
04272             m_logEWAITINGT << stream->m_Id << "\t"   << m_AverageWaitingTime
04273                                            << "\t# " << m_VarianceWaitingTime  << endl;
04274     }
04275 }

void CStreamManager::SetNumberOfDisks ( unsigned int  NumberOfDisks  ) 

Funcao para alterar o valor do numero de discos do servidor.

Ela foi necessaria porque, quando os servidores de armazenamento inicializam apos o servidor.

Parameters:
NumberOfDisks novo numero de discos.

Definition at line 953 of file server/StreamManager.cpp.

00954 {
00955     m_nDisks = NumberOfDisks;
00956 }

PSEQEVENTLIST CStreamManager::sortAllStreamsGeneratingOneStream ( PSTREAMLIST  stream_list,
uint  newclientid 
) [private]

Definition at line 3683 of file server/StreamManager.cpp.

03686 {
03687     PSEQEVENTLIST  l_list       = NULL;
03688     PSEQEVENTLIST  curblock     = NULL;
03689     PSEQEVENTLIST  prevblock    = NULL;
03690     PSTREAMLIST    auxStreamList;
03691     PSTREAMLIST    auxStream;
03692     PSTREAMLIST    original_stream_reqlist;
03693     RioStream     *stream       = NULL;
03694 
03695     struct timeval serverclock = Timer.CurrentTime();
03696     long double servertime = serverclock.tv_sec * 1000.0 + serverclock.tv_usec/1000.0;
03697     #ifdef RIO_DEBUG2
03698     if( m_log.is_open() ) m_log << "sortAllStreamsGeneratingOneStream: servertime now " << servertime << endl;
03699     #endif
03700     struct timeval streamlastarrivaltime;
03701     long double streamlastarrival, streamtime;
03702 
03703     struct timeval initial_time, final_time;
03704     long double time;
03705 
03706     initial_time = serverclock;
03707 
03708     // ------------------------------------------------------------------------
03709     // sets nextBlockToBeFetched - load to disk
03710     // and gets actual time of each stream to calculate next 'relative' request
03711 
03712     long double *now = (long double*) malloc(1001*sizeof(long double));
03713     memset( now, 0 , 1001*sizeof(long double));
03714 
03715     original_stream_reqlist = auxStreamList = stream_reqlist;
03716 
03717     while( auxStreamList )
03718     {
03719         auxStreamList->aux = auxStreamList->nextBlockToBeFetched;
03720 
03721         #ifdef RIO_DEBUG2
03722         if( m_log.is_open() ) m_log << "sortAll: id " << auxStreamList->stream->GetId();
03723         #endif
03724 
03725         if( auxStreamList->stream->getPrefetchedAllBlocks() )
03726         {
03727             #ifdef RIO_DEBUG2
03728             if( m_log.is_open() ) m_log << " finished!" << endl;
03729             #endif
03730             // if it is different of the first stream,
03731             // removes it from stream list
03732             if( auxStreamList != stream_reqlist )
03733             {
03734                 auxStreamList->auxprev->auxnext = auxStreamList->auxnext;
03735 
03736                 // if it is not at the end of the stream list
03737                 if( auxStreamList->auxnext != NULL )
03738                     auxStreamList->auxnext->auxprev = auxStreamList->auxprev;
03739             }
03740             else
03741             {
03742                 // if it was the stream list head
03743                 // removes it from the stream list
03744                 stream_reqlist = auxStreamList->auxnext;
03745             }
03746         }
03747         else
03748         {
03749             #ifdef RIO_DEBUG2
03750             if( m_log.is_open() )
03751                 m_log << " nbtbf " << auxStreamList->aux->block;
03752             if( auxStreamList->aux->prev != NULL )
03753             {
03754                 if( m_log.is_open() )
03755                     m_log << " prev time " << auxStreamList->aux->prev->csystime;
03756             }
03757             #endif
03758 
03759             //if( auxStreamList->stream->GetId() != newclientid )
03760             if( auxStreamList->aux->id != newclientid )
03761             {
03762                 streamlastarrivaltime = auxStreamList->stream->getLastArrivalTime();
03763                 streamlastarrival = streamlastarrivaltime.tv_sec * 1000.0 + streamlastarrivaltime.tv_usec / 1000.0;
03764                 if( auxStreamList->aux->prev != NULL  )
03765                 {
03766                     streamtime = auxStreamList->aux->prev->csystime +
03767                                  ( servertime - streamlastarrival );
03768                     if( auxStreamList->aux->csystime < streamtime )
03769                     {
03770                         //client paused - assumes he is restarting now.
03771                         streamtime = auxStreamList->aux->csystime;
03772                         #ifdef RIO_DEBUG2
03773                         if( m_log.is_open() ) m_log << " client paused.";
03774                         #endif
03775                     }
03776                 }
03777                 else
03778                     streamtime = 0;
03779 
03780                 #ifdef RIO_DEBUG2
03781                 if( m_log.is_open() )
03782                 {
03783                     m_log << " ( lastarrival " << streamlastarrival<< " ). "
03784                           << "streamtime " << streamtime << endl;
03785                     m_log << " nbtbf in "
03786                           << auxStreamList->aux->csystime - streamtime << endl;
03787                 }
03788                 #endif
03789                 now[ auxStreamList->aux->id ] = streamtime;
03790                 PlayBlock[ auxStreamList->aux->id ] =
03791                                         auxStreamList->stream->getPlayBlock();
03792             }
03793             else
03794             {
03795                 stream = auxStreamList->stream;
03796                 now[ newclientid ] = 0;
03797                 PlayBlock[ auxStreamList->aux->id ] = 0;
03798                 #ifdef RIO_DEBUG2
03799                 if( m_log.is_open() ) m_log << endl;
03800                 #endif
03801             }
03802 
03803             //get buffers, RTT
03804             TotalBlocks[ auxStreamList->aux->id ]  =
03805                                 auxStreamList->stream->getNumberOfBlocks();
03806             ClientBuffer[ auxStreamList->aux->id ] =
03807                                 auxStreamList->stream->getClientBufferSize();
03808             ServerBuffer[ auxStreamList->aux->id ] =
03809                                 m_NumberOfBuffersForEachClient;
03810             ClientRTT[ auxStreamList->aux->id] =
03811                                 auxStreamList->stream->getRTTtoClient();
03812             NextBlockToSend[ auxStreamList->aux->id ] =
03813                                 auxStreamList->nextBlockToBeRequested->block;
03814 
03815             #ifdef RIO_DEBUG2
03816             if( m_log.is_open() )
03817                 m_log   << "sortAll: Client " << auxStreamList->aux->id
03818                         << " RTT " << ClientRTT[ auxStreamList->aux->id ]
03819                         << " CB Size " << auxStreamList->stream->getClientBufferSize()
03820                         << " SB Status "  << ServerBuffer[ auxStreamList->aux->id ]
03821                         << " CB Status "  << ClientBuffer[ auxStreamList->aux->id ]
03822                         << " PlayBlock "  << PlayBlock[ auxStreamList->aux->id ]
03823                         << " NextBlockToSend " << NextBlockToSend[ auxStreamList->aux->id ]
03824                         << " now " << now[ auxStreamList->aux->id ]
03825                         << endl;
03826             #endif
03827         }
03828 
03829         auxStreamList = auxStreamList->next;
03830     }
03831     // ------------------------------------------------------------------------
03832 
03833     auxStreamList = stream_reqlist;
03834     l_list = prevblock = NULL;
03835 
03836     int NumberOfEvents = 0;
03837 
03838     // ------------------------------------------------------------------------
03839     // creates event list
03840 
03841     while( auxStreamList )
03842     {
03843         auxStream = firstRequestOfAllStreams( auxStreamList, now );
03844         if( auxStream == NULL )
03845         {
03846             RioErr << "sortAllStreamsGeneratingOneStream: Could not get "
03847                    << "the minimum value." << endl;
03848 
03849             free( now );
03850 
03851             return l_list;
03852         }
03853         curblock = ( PSEQEVENTLIST )malloc( sizeof( SEQEVENTLIST ) );
03854         if( curblock != NULL )
03855         {
03856             if( auxStream->aux->block < (u32) m_NumberOfBuffersForEachClient )
03857             {
03858                 curblock->event = RequestFullBuffer;
03859                 ServerBuffer[ auxStream->aux->id ]--;
03860             }
03861             else if( auxStream->aux->block  < ((u32 )
03862                      (m_NumberOfBuffersForEachClient+auxStream->stream->getClientBufferSize())))
03863             {
03864                 curblock->event = RequestFullClientBuffer;
03865                 ClientBuffer[ auxStream->aux->id ]--;
03866             }
03867             else
03868             {
03869                 curblock->event = RequestArrival;
03870             }
03871             //curblock->id        = auxStream->stream->GetId();
03872             curblock->id        = auxStream->aux->id;
03873             curblock->block     = auxStream->aux->block;
03874             curblock->disk      = auxStream->aux->disk;
03875             curblock->csystime  = auxStream->aux->csystime -
03876                                   now[auxStream->stream->GetId()];
03877             curblock->next      = NULL;
03878             curblock->prev      = NULL;
03879             NumberOfEvents++;
03880             auxStream->aux     = auxStream->aux->next;
03881 
03882             // if it was the last block of the stream (and it is different of
03883             // the first stream) removes it from stream list
03884             if( ( auxStream->aux == NULL ) && ( auxStream != auxStreamList ) )
03885             {
03886                 auxStream->auxprev->auxnext = auxStream->auxnext;
03887 
03888                 // if it is not at the end of the stream list
03889                 if( auxStream->auxnext != NULL )
03890                     auxStream->auxnext->auxprev = auxStream->auxprev;
03891             }
03892             else
03893             {
03894                 // if it was the last block of the stream and it was the
03895                 // stream list head removes it from the stream list
03896                 if( ( auxStream->aux == NULL ) && ( auxStream == auxStreamList ) )
03897                     auxStreamList = auxStream->auxnext;
03898             }
03899         }
03900         else {
03901                 RioErr << "sortAllStreamsGeneratingOneStream: Could not "
03902                        << "allocate memory " << endl;
03903 
03904                 free( now );
03905 
03906                 return NULL;
03907         }
03908         if( prevblock )
03909         {
03910             curblock->prev = prevblock;
03911             prevblock = prevblock->next = curblock;
03912         }
03913         else
03914         {
03915             l_list = prevblock = curblock;
03916         }
03917     }
03918     // ------------------------------------------------------------------------
03919 
03920     //sets aux and next, prev
03921     auxStreamList = original_stream_reqlist;
03922     while( auxStreamList ) {
03923        auxStreamList->aux = auxStreamList->reqlist;
03924        auxStreamList->auxnext = auxStreamList->next;
03925        auxStreamList->auxprev = auxStreamList->prev;
03926        auxStreamList = auxStreamList->next;
03927     }
03928 
03929     final_time = Timer.CurrentTime();
03930     time = getInterval( initial_time, final_time );
03931     stream->m_SampleSortListTime = time;
03932     SetAverageSortListTime( time, stream );
03933     #ifdef RIO_DEBUG2
03934     if( m_log.is_open() )
03935         m_log   << "sortAll: Finished" << endl;
03936     #endif
03937     if( m_log.is_open() )
03938         m_log   << "(NumberOfEvents = "<< NumberOfEvents <<")" << endl;
03939 
03940     free( now );
03941 
03942     return l_list;
03943 }

void CStreamManager::Timeout ( void *  Param  )  [static, private]

Definition at line 900 of file server/StreamManager.cpp.

00901 {
00902     CStreamManager* Mgr = ( CStreamManager* ) Param;
00903 
00904     // Get exclusive access to stream manager variables
00905     Mgr->m_MutexTable->Wait();
00906 
00907     // Scan list of real-time streams holding requests
00908     // (Start from begining of list)
00909     RioStream* next;
00910     RioStream* stream = Mgr->m_RTHoldList.First();
00911 
00912     while( stream != 0 )
00913     {
00914         next = stream->Next;
00915         stream->ProcessRTQueue();
00916         stream = next;
00917     }
00918 
00919     // Scan list of non real-time streams holding requests
00920     // (Start from position left last time)
00921     stream = Mgr->m_NextNRTStream;
00922     if( stream == 0 )
00923     {
00924         stream = Mgr->m_NRTHoldList.First();
00925     }
00926 
00927     while((stream != 0) && (Mgr->m_ActiveRequests < Mgr->m_MaxActiveRequests))
00928     {
00929         next = stream->Next;
00930         stream->ProcessNRTQueue();
00931         stream = next;
00932         // at end of list return to begining
00933         if( stream == 0 )
00934         {
00935             stream = Mgr->m_NRTHoldList.First();
00936         }
00937     }
00938 
00939     // Remember which stream should be processed first next time
00940     Mgr->m_NextNRTStream = stream;
00941 
00942     // Release exclusive access to stream manager variables
00943     Mgr->m_MutexTable->Release();
00944 }

void CStreamManager::UpdateDiskQueueAndServiceTime (  ) 
int CStreamManager::UpdateNumberOfWaitingClients ( int  client  ) 

Definition at line 3194 of file server/StreamManager.cpp.

03195 {
03196     int queue;
03197     pthread_mutex_lock( &MutexUpdateNumberOfWaitingClients );
03198     queue = m_NumberOfWaitingClients;
03199     m_NumberOfWaitingClients += client;
03200     pthread_mutex_unlock( &MutexUpdateNumberOfWaitingClients );
03201     return queue;
03202 }


Friends And Related Function Documentation

friend class RioStream [friend]

Definition at line 595 of file server/StreamManager.h.

friend class RioStreamObj [friend]

Definition at line 596 of file server/StreamManager.h.


Field Documentation

int CStreamManager::ClientBuffer[1001] [private]

Definition at line 567 of file server/StreamManager.h.

double CStreamManager::ClientRTT[1001] [private]

Definition at line 575 of file server/StreamManager.h.

long double CStreamManager::Initial_time[1001] [private]

Definition at line 574 of file server/StreamManager.h.

Definition at line 454 of file server/StreamManager.h.

Definition at line 562 of file server/StreamManager.h.

Definition at line 513 of file server/StreamManager.h.

Definition at line 535 of file server/StreamManager.h.

Definition at line 519 of file server/StreamManager.h.

Definition at line 555 of file server/StreamManager.h.

Definition at line 508 of file server/StreamManager.h.

Definition at line 531 of file server/StreamManager.h.

Definition at line 522 of file server/StreamManager.h.

Definition at line 516 of file server/StreamManager.h.

Definition at line 539 of file server/StreamManager.h.

Definition at line 525 of file server/StreamManager.h.

Definition at line 543 of file server/StreamManager.h.

Definition at line 528 of file server/StreamManager.h.

Definition at line 547 of file server/StreamManager.h.

Definition at line 551 of file server/StreamManager.h.

unsigned int CStreamManager::m_BlockSize [private]

Definition at line 442 of file server/StreamManager.h.

Definition at line 480 of file server/StreamManager.h.

Definition at line 461 of file server/StreamManager.h.

Definition at line 602 of file server/StreamManager.h.

double CStreamManager::m_DeviationDiskResponseTime[100][301] [private]

Definition at line 502 of file server/StreamManager.h.

Definition at line 503 of file server/StreamManager.h.

double CStreamManager::m_EstimatedDiskQueueTime[100][301] [private]

Definition at line 505 of file server/StreamManager.h.

Definition at line 506 of file server/StreamManager.h.

double CStreamManager::m_EstimatedDiskResponseTime[100][301] [private]

Definition at line 500 of file server/StreamManager.h.

Definition at line 501 of file server/StreamManager.h.

double CStreamManager::m_EstimatedDiskServiceTime[100][301] [private]

Definition at line 498 of file server/StreamManager.h.

Definition at line 499 of file server/StreamManager.h.

Definition at line 559 of file server/StreamManager.h.

Definition at line 451 of file server/StreamManager.h.

Definition at line 599 of file server/StreamManager.h.

Definition at line 462 of file server/StreamManager.h.

Definition at line 453 of file server/StreamManager.h.

Definition at line 474 of file server/StreamManager.h.

Definition at line 497 of file server/StreamManager.h.

ofstream CStreamManager::m_log [private]

Definition at line 475 of file server/StreamManager.h.

ofstream CStreamManager::m_logEAPT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logEBUFFERT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logECACT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logEGENREQLT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logESIMULT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logESORTLT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logEWAITINGT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logSAPT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logSBUFFERT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logSCACT [private]

Definition at line 489 of file server/StreamManager.h.

Definition at line 590 of file server/StreamManager.h.

ofstream CStreamManager::m_logSGENREQLT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logSSIMULT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logSSORTLT [private]

Definition at line 489 of file server/StreamManager.h.

ofstream CStreamManager::m_logSWAITINGT [private]

Definition at line 489 of file server/StreamManager.h.

Definition at line 448 of file server/StreamManager.h.

Definition at line 481 of file server/StreamManager.h.

Definition at line 450 of file server/StreamManager.h.

Definition at line 478 of file server/StreamManager.h.

Definition at line 477 of file server/StreamManager.h.

Definition at line 449 of file server/StreamManager.h.

Definition at line 443 of file server/StreamManager.h.

Definition at line 468 of file server/StreamManager.h.

int CStreamManager::m_nDisks [private]

Definition at line 455 of file server/StreamManager.h.

Definition at line 593 of file server/StreamManager.h.

Definition at line 472 of file server/StreamManager.h.

Definition at line 560 of file server/StreamManager.h.

Definition at line 466 of file server/StreamManager.h.

Definition at line 460 of file server/StreamManager.h.

Definition at line 447 of file server/StreamManager.h.

Definition at line 563 of file server/StreamManager.h.

Definition at line 479 of file server/StreamManager.h.

Definition at line 482 of file server/StreamManager.h.

long long int CStreamManager::m_NumberOfSamples [private]

Definition at line 511 of file server/StreamManager.h.

Definition at line 564 of file server/StreamManager.h.

Definition at line 469 of file server/StreamManager.h.

Definition at line 471 of file server/StreamManager.h.

Definition at line 459 of file server/StreamManager.h.

Definition at line 537 of file server/StreamManager.h.

Definition at line 557 of file server/StreamManager.h.

Definition at line 533 of file server/StreamManager.h.

Definition at line 541 of file server/StreamManager.h.

Definition at line 545 of file server/StreamManager.h.

Definition at line 549 of file server/StreamManager.h.

Definition at line 553 of file server/StreamManager.h.

Definition at line 470 of file server/StreamManager.h.

Definition at line 65 of file interface/StreamManager.h.

Definition at line 457 of file server/StreamManager.h.

double CStreamManager::m_TotalRate [private]

Definition at line 446 of file server/StreamManager.h.

int CStreamManager::m_used [private]

Definition at line 444 of file server/StreamManager.h.

double CStreamManager::m_UsedRate [private]

Definition at line 445 of file server/StreamManager.h.

Definition at line 601 of file server/StreamManager.h.

Definition at line 600 of file server/StreamManager.h.

Definition at line 514 of file server/StreamManager.h.

Definition at line 536 of file server/StreamManager.h.

Definition at line 520 of file server/StreamManager.h.

Definition at line 556 of file server/StreamManager.h.

Definition at line 509 of file server/StreamManager.h.

Definition at line 532 of file server/StreamManager.h.

Definition at line 523 of file server/StreamManager.h.

Definition at line 517 of file server/StreamManager.h.

Definition at line 540 of file server/StreamManager.h.

Definition at line 526 of file server/StreamManager.h.

Definition at line 544 of file server/StreamManager.h.

Definition at line 529 of file server/StreamManager.h.

Definition at line 548 of file server/StreamManager.h.

Definition at line 552 of file server/StreamManager.h.

Definition at line 464 of file server/StreamManager.h.

long double CStreamManager::MaxIntervalBufferEmpty[1001] [private]

Definition at line 577 of file server/StreamManager.h.

pthread_mutex_t CStreamManager::MutexModifyClientsList [private]

Definition at line 484 of file server/StreamManager.h.

pthread_mutex_t CStreamManager::MutexRunCAC [private]

Definition at line 484 of file server/StreamManager.h.

pthread_mutex_t CStreamManager::MutexUpdateMeasures [private]

Definition at line 484 of file server/StreamManager.h.

Definition at line 484 of file server/StreamManager.h.

unsigned int CStreamManager::NextBlockToSend[1001] [private]

Definition at line 570 of file server/StreamManager.h.

unsigned int CStreamManager::NHiccups[1001] [private]

Definition at line 573 of file server/StreamManager.h.

unsigned int CStreamManager::NReqWhenEmptyBuffer[1001] [private]

Definition at line 572 of file server/StreamManager.h.

unsigned int CStreamManager::PlayBlock[1001] [private]

Definition at line 568 of file server/StreamManager.h.

int CStreamManager::ServerBuffer[1001] [private]

Definition at line 571 of file server/StreamManager.h.

long double CStreamManager::TimeBufferEmpty[1001] [private]

Definition at line 576 of file server/StreamManager.h.

unsigned int CStreamManager::TotalBlocks[1001] [private]

Definition at line 569 of file server/StreamManager.h.


The documentation for this class was generated from the following files:
Generated on Wed Jul 4 16:03:34 2012 for RIO by  doxygen 1.6.3