00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #ifndef __STREAMMANAGER_H_
00028 #define __STREAMMANAGER_H_
00029
00030 #include <fstream>
00031 #include <sys/time.h>
00032
00033
00034 #include "StreamManagerTypes.h"
00035
00036 #include "ServerTypes.h"
00037
00038 #include "RioMutex.h"
00039 #include "RioTCP.h"
00040 #include "ObjectManager.h"
00041 #include "Router.h"
00042 #include "MonitorTable.h"
00043 #include "NetInterface.h"
00044
00045
00046
00047 class CStreamManager;
00048 class RioStreamObj;
00049 class NetMgr;
00050 class RioStream;
00051 class RioSession;
00052
00053
00054 typedef enum
00055 {
00056 RequestArrival = 0,
00057 RequestFullBuffer = 1,
00058 RequestFullClientBuffer = 2,
00059 DiskService = 3,
00060 BlockArrival = 4
00061 } EventTypeSimulation;
00062
00063
00064 typedef struct SEQBLOCKLISTTAG {
00065 uint id;
00066 u32 block;
00067 short disk;
00068 unsigned int physicalblock;
00069 float duration;
00070
00071 long double csystime;
00072 struct SEQBLOCKLISTTAG *next;
00073 struct SEQBLOCKLISTTAG *prev;
00074 } *PSEQBLOCKLIST, SEQBLOCKLIST;
00075
00076 typedef struct SEQEVENTLISTTAG {
00077 EventTypeSimulation event;
00078 uint id;
00079 u32 block;
00080 short disk;
00081 long double csystime;
00082 struct SEQEVENTLISTTAG *next;
00083 struct SEQEVENTLISTTAG *prev;
00084 } *PSEQEVENTLIST, SEQEVENTLIST;
00085
00086
00087 typedef struct STREAMLIST {
00088
00089 RioStream* stream;
00090
00091 struct SEQBLOCKLISTTAG *reqlist;
00092
00093 struct SEQBLOCKLISTTAG *nextBlockToBeRequested;
00094
00095 struct SEQBLOCKLISTTAG *nextBlockToBeFetched;
00096
00097 struct STREAMLIST *next;
00098 struct STREAMLIST *prev;
00099
00100 struct SEQBLOCKLISTTAG *aux;
00101 struct STREAMLIST *auxnext;
00102 struct STREAMLIST *auxprev;
00103
00104
00105 } *PSTREAMLIST, STREAMLIST;
00106
00107
00108 struct ClientBuffer {
00109 PSEQBLOCKLIST blockInfo;
00110 EventDataRequest* event;
00111 };
00112
00113
00114
00115
00116
00117 class CRequestQueue
00118 {
00119 public:
00120
00121 void Put( EventDataRequest *DiskRequest );
00122
00123 EventDataRequest* Get() ;
00124 private:
00125 LocalQueue m_Queue;
00126 };
00127
00128
00129
00130 class CStreamList
00131 {
00132 public:
00133 RioStream* First();
00134
00135 void Put( RioStream* Stream );
00136 void Remove( RioStream* Stream );
00137 private:
00138 LocalQueue m_Queue;
00139 };
00140
00141
00142
00143 class RioStream
00144 {
00145
00146 private:
00147 RioStream *Next;
00148 RioStream *Previous;
00149
00150 RioStream();
00151 RioStream( const RioStream &x );
00152 RioStream &operator = ( const RioStream &x );
00153 int DataReq( u32 reqid, u16 operation, u32 ipaddr, u16 port, u32 block,
00154 u32 repbits, RioStreamObj* StreamObject );
00155 void RTHold( EventDataRequest* event );
00156 void NRTHold( EventDataRequest* event );
00157 void ProcessRTQueue();
00158
00159 void ProcessNRTQueue();
00160
00161
00162 void RequestCompleted( Event* Request );
00163
00164 PSEQBLOCKLIST getInfoAboutRequestList( char *filename,
00165 RioObject* object );
00166
00167
00168 public:
00169 ~RioStream();
00170 int Close();
00171 void Initialize( CStreamManager * );
00172 int MaxRequests();
00173 int OpenObject( char *ObjectName, RioAccess Access,
00174 RioStreamObj **StreamObj );
00175
00176 int Get_streamid();
00177
00178 int OpenObject( char* ObjectName, RioAccess Access, struct timeval RTT,
00179 int BufferSize, RioStreamObj **StreamObj );
00180
00181 RioTrafficType GetType();
00182 uint GetId();
00183
00184 void releaseMutexUpdateEvent();
00185 void getMutexUpdateEvent();
00186
00187 int getDirection();
00188
00189 void updateNextBlockToBeFetched( PSEQBLOCKLIST Next );
00190 void prefetchBlock( int posAtBuffer, EventDataRequest* event,
00191 RioStreamObj *StreamObject );
00192
00193 int getTokensToSend();
00194 void incTokensToSend();
00195 void decTokensToSend();
00196 void setTokensToSend( int value );
00197
00198 void decTokensToDisk();
00199 int getTokensToDisk();
00200 void updateTokensToDisk( struct timeval time );
00201
00202 u32 getMinimumBlockToSend();
00203 void incMinimumBlockToSend();
00204 void setMinimumBlockToSend( u32 Block );
00205 u32 getMaximumBlockToSend();
00206
00207 struct timeval getLastArrivalTime();
00208 double getRTTtoClient();
00209 int getClientBufferStatus();
00210 int getPlayBlock();
00211 int getNumberOfBlocks();
00212 int getServerBufferStatus();
00213 void decServerBufferStatus();
00214
00215 int getNumberOfBuffersForEachClient();
00216 double getAverageCACTime();
00217 double getEstimatedDiskQueueTime( int disk );
00218 double getEstimatedDiskServiceTime( int disk );
00219 double getEstimatedDiskResponseTime( int disk );
00220
00221 long double getNetworkRate();
00222 unsigned int getBlockSize();
00223 int getQueueSize();
00224
00225 int getClientBufferSize();
00226 void cancelBlockAtBuffer( int positionAtBuffer );
00227 bool getPrefetchedAllBlocks();
00228
00229 int ClientCanStart();
00230 int startingStream();
00231 void NewBlockArrival();
00232
00233 ofstream m_log, m_log_requests;
00234
00235
00236 private:
00237 typedef enum
00238 {
00239 StreamStatusClosed = 0,
00240 StreamStatusOpened = 1
00241 } StreamStatus;
00242
00243 CStreamManager *s_mgr;
00244
00245 int s_streamid;
00246
00247 RioStreamDirection s_Direction;
00248 double s_Rate;
00249 SOCKADDR_IN m_RemoteAddress;
00250 StreamStatus m_Status;
00251 RioTrafficType m_Type;
00252 int m_nRequests;
00253 int m_MaxRequests;
00254 int m_UserMaxRequests;
00255 int m_nQueue;
00256 struct timeval m_NextRelease;
00257 struct timeval m_ArrivalInterval;
00258
00259 u64 m_RequestCount;
00260 u64 m_QueuedCount;
00261 CRequestQueue m_Queue;
00262
00263
00264 struct timeval m_LastArrivalTime;
00265 struct timeval m_TokensToDiskClock;
00266 struct timeval m_TimeToBuffer;
00267
00268 double m_FirstArrivalTime;
00269 double m_SampleGenReqListTime;
00270 double m_SampleSortListTime;
00271 double m_SampleSimulationTime;
00272 double m_SampleCACTime;
00273 double m_SampleAdmissionProcessTime;
00274 double m_SampleWaitingTime;
00275 int m_WaitingQueueSize;
00276 int m_TotalBlocks;
00277
00278 PSEQBLOCKLIST m_RequestList;
00279 uint m_Id;
00280 PSTREAMLIST m_PosAtClientsList;
00281 ClientBuffer* m_Buffer;
00282 double m_AverageRTTtoClient_msec;
00283 int m_ClientBufferSize;
00284 int m_ServerBufferStatus;
00285 int m_TokensToSend;
00286 bool m_PrefetchedAllBlocks;
00287 bool m_FinishedMovie;
00288 int m_TokensToDisk;
00289 bool m_StartingToSend;
00290 u32 m_MinimumBlockToSend;
00291 pthread_mutex_t MutexTokensToSend;
00292 pthread_mutex_t MutexTokensToDisk;
00293 pthread_mutex_t MutexNextBlockToBeFetched;
00294 pthread_mutex_t MutexMinBlock;
00295 pthread_mutex_t MutexBlock;
00296 pthread_mutex_t MutexUpdateEvent;
00297 pthread_cond_t ConditionBlockArrival;
00298
00299
00300 friend class CStreamManager;
00301 friend class RioStreamObj;
00302 };
00303
00304
00305
00306
00307
00308 class RioStreamObj
00309 {
00310 private:
00311 typedef enum
00312 {
00313 StreamObjStatusClosed = 0,
00314 StreamObjStatusOpened = 1
00315 } StreamObjStatus;
00316
00317 StreamObjStatus m_Status;
00318 int m_PendingRequests;
00319 CStreamManager *o_mgr;
00320 RioStream *o_stream;
00321 RioObject *o_object;
00322
00323 friend class RioStream;
00324
00325 private:
00326 RioStreamObj( CStreamManager *mgr, RioStream *stream );
00327 RioStreamObj( const RioStreamObj &x);
00328 RioStreamObj &operator = ( const RioStreamObj &x );
00329 void CompleteClose();
00330
00331 public:
00332 ~RioStreamObj();
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344 int Close( bool *RemoveStreamObj );
00345 int GetSize( RioObjectSize *ObjectSize );
00346
00347
00348
00349
00350 int SetSize( RioObjectSize ObjectSize, char *md5sum,
00351 unsigned long long int ExcludeStorages );
00352
00353 int GetType( short *Type );
00354 int GetnBlocks( RioBlock *nBlocks );
00355 int DataRequest( u32 reqid, u16 operation, u32 ipaddr, u16 port,
00356 u32 block, u32 repbits );
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 void RequestCompleted( Event* Request, bool *RemoveStreamObj );
00372
00373 RioStream* Stream();
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 int GetObjectName( char *ObjectName );
00389
00390 int GetQueueSize();
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403 int GetVideoRate( unsigned int *VideoRate );
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414 int SetVideoRate( unsigned int VideoRate );
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431 int ReallocBlocks( unsigned int Block,
00432 unsigned long long int ExcludeStorages );
00433
00434 };
00435
00436
00437
00438
00439 class CStreamManager
00440 {
00441 private:
00442 unsigned int m_BlockSize;
00443 int m_MaxStreams;
00444 int m_used;
00445 double m_UsedRate;
00446 double m_TotalRate;
00447 double m_NRTReservedRate;
00448 int m_MaxActiveRequests;
00449 int m_MaxRTRequests;
00450 int m_MaxNRTRequests;
00451 int m_ExtraStreamRequests;
00452
00453 int m_HoldingNonRealTime;
00454 int m_ActiveRequests;
00455 int m_nDisks;
00456
00457 int m_TimerId;
00458
00459 CStreamList m_RTHoldList;
00460 CStreamList m_NRTHoldList;
00461 CStreamList m_CloseList;
00462 CStreamList m_FreeList;
00463
00464 RioStream *m_VectorStream;
00465
00466 RioStream *m_NextNRTStream;
00467
00468 CMutex *m_MutexTable;
00469 CObjectManager *m_ObjectManager;
00470 CSystemManager *m_SystemManager;
00471 CRouter *m_Router;
00472 NetMgr *m_NetMgr;
00473
00474 bool m_initialized;
00475 ofstream m_log;
00476
00477 int m_MaxPendingRequests;
00478 int m_MaxNumberOfDisks;
00479 int m_NumberOfBuffersForEachClient;
00480 int m_BurstSizeOfEachClient;
00481 double m_MaxIntervalEmpty;
00482 int m_NumberOfEmptyTimes;
00483
00484 pthread_mutex_t MutexModifyClientsList,
00485 MutexRunCAC,
00486 MutexUpdateMeasures,
00487 MutexUpdateNumberOfWaitingClients;
00488
00489 ofstream m_logSCACT, m_logECACT,
00490 m_logSAPT, m_logEAPT,
00491 m_logSSIMULT, m_logESIMULT,
00492 m_logSWAITINGT, m_logEWAITINGT,
00493 m_logSGENREQLT, m_logEGENREQLT,
00494 m_logSBUFFERT, m_logEBUFFERT,
00495 m_logSSORTLT, m_logESORTLT;
00496
00497 uint m_lastID ;
00498 double m_EstimatedDiskServiceTime[ 100 ][ 301 ];
00499 double m_EstimatedDiskServiceTimeOfDisk[ 100 ];
00500 double m_EstimatedDiskResponseTime[ 100 ][ 301 ];
00501 double m_EstimatedDiskResponseTimeOfDisk[ 100 ];
00502 double m_DeviationDiskResponseTime[100][301];
00503 double m_DeviationDiskResponseTimeOfDisk[100];
00504
00505 double m_EstimatedDiskQueueTime[ 100 ][ 301 ];
00506 double m_EstimatedDiskQueueTimeOfDisk[ 100 ];
00507
00508 double m_AverageCACTime;
00509 double m_VarianceCACTime;
00510
00511 long long int m_NumberOfSamples;
00512
00513 double m_AverageAdmissionProcessTime;
00514 double m_VarianceAdmissionProcessTime;
00515
00516 double m_AverageSimulationTime;
00517 double m_VarianceSimulationTime;
00518
00519 double m_AverageBufferTime;
00520 double m_VarianceBufferTime;
00521
00522 double m_AverageGenReqListTime;
00523 double m_VarianceGenReqListTime;
00524
00525 double m_AverageSortListTime;
00526 double m_VarianceSortListTime;
00527
00528 double m_AverageWaitingTime;
00529 double m_VarianceWaitingTime;
00530
00531 double m_AverageCACTimeAccClients[ 1001 ];
00532 double m_VarianceCACTimeAccClients[ 1001 ];
00533 int m_SamplesCACTimeAccClients[ 1001 ];
00534
00535 double m_AverageAdmissionProcessTimeAccClients[ 1001 ];
00536 double m_VarianceAdmissionProcessTimeAccClients[ 1001 ];
00537 int m_SamplesAdmissionProcessTimeAccClients[ 1001 ];
00538
00539 double m_AverageSimulationTimeAccClients[ 1001 ];
00540 double m_VarianceSimulationTimeAccClients[ 1001 ];
00541 int m_SamplesSimulationTimeAccClients[ 1001 ];
00542
00543 double m_AverageSortListTimeAccClients[ 1001 ];
00544 double m_VarianceSortListTimeAccClients[ 1001 ];
00545 int m_SamplesSortListTimeAccClients[ 1001 ];
00546
00547 double m_AverageWaitingTimeAccClients[ 1001 ];
00548 double m_VarianceWaitingTimeAccClients[ 1001 ];
00549 int m_SamplesWaitingTimeAccClients[ 1001 ];
00550
00551 double m_AverageWaitingTimeAccQueue[ 1001 ];
00552 double m_VarianceWaitingTimeAccQueue[ 1001 ];
00553 int m_SamplesWaitingTimeAccQueue[ 1001 ];
00554
00555 double m_AverageBufferTimeAccClients[ 1001 ];
00556 double m_VarianceBufferTimeAccClients[ 1001 ];
00557 int m_SamplesBufferTimeAccClients[ 1001 ];
00558
00559 double m_EstimatedTimeParameter;
00560 double m_NetworkRate;
00561
00562 PSTREAMLIST m_AdmittedClientsList;
00563 int m_NumberOfAdmittedClients;
00564 int m_NumberOfWaitingClients;
00565
00566
00567 int ClientBuffer[ 1001 ];
00568 unsigned int PlayBlock[ 1001 ];
00569 unsigned int TotalBlocks[ 1001 ];
00570 unsigned int NextBlockToSend[ 1001];
00571 int ServerBuffer[ 1001 ];
00572 unsigned int NReqWhenEmptyBuffer[ 1001 ];
00573 unsigned int NHiccups[ 1001 ];
00574 long double Initial_time[ 1001 ];
00575 double ClientRTT[ 1001 ];
00576 long double TimeBufferEmpty[ 1001 ];
00577 long double MaxIntervalBufferEmpty[ 1001 ];
00578
00579 static void Timeout( void* Param );
00580
00581 PSTREAMLIST firstRequestOfAllStreams( PSTREAMLIST stream_list,
00582 long double *now );
00583 PSEQEVENTLIST sortAllStreamsGeneratingOneStream(
00584 PSTREAMLIST stream_list,
00585 uint newclientid );
00586
00587
00588
00589
00590 char *m_LogsDirectory;
00591
00592
00593 CNetInterface *m_NetInterface;
00594
00595 friend class RioStream;
00596 friend class RioStreamObj;
00597
00598 public:
00599 char *m_FileRoot;
00600 bool m_UseServerSideBuffers;
00601 bool m_UseNewCAC;
00602 bool m_CollectMeasures;
00603
00604 public:
00605 CStreamManager();
00606 ~CStreamManager();
00607
00608
00609
00610
00611 int Initialize( StreamManagerConfig *Config );
00612 int Open( RioStreamTraffic *Traffic, RioStream **Stream,
00613 SOCKADDR_IN RemoteAddress );
00614
00615 void PostITEvent( MonitorEvent *event );
00616
00617 int GetDiskServiceTime();
00618 double GetEstimatedDiskServiceTime( int disk );
00619 double GetEstimatedDiskResponseTime( int disk );
00620 double GetEstimatedDiskQueueTime( int disk );
00621
00622 double GetDiskServiceTime( int disk, int index );
00623 double GetDiskResponseTime( int disk, int index );
00624 double GetDiskQueueTime( int disk, int queuesize );
00625
00626 void UpdateDiskQueueAndServiceTime();
00627 int SaveMeasures();
00628 PSTREAMLIST InsertNewClient(PSEQBLOCKLIST NewClient, RioStream* stream);
00629 void RemoveClient( PSTREAMLIST ClientPosition );
00630 PSTREAMLIST CanAdmit ( PSEQBLOCKLIST NewClient, RioStream* stream,
00631 double RTT, int clientbuffer );
00632 PSTREAMLIST GetAdmittedClientsList();
00633 int GetNumberOfBuffersForEachClient();
00634 int insertevent( PSEQEVENTLIST eventblock,
00635 EventTypeSimulation eventtype,
00636 int block, long double time_when);
00637 int GetBurstSizeOfEachClient();
00638 unsigned int GetBlockSize();
00639 double GetAverageCACTime();
00640 void SetAverageBufferTime( double sample, RioStream *stream );
00641 void SetAverageCACTime( double sample, RioStream *stream, int index );
00642 void SetAverageAdmissionProcessTime( double sample, RioStream *stream,
00643 int index );
00644 void SetAverageGenReqListTime( double sample, RioStream *stream );
00645 void SetAverageSortListTime( double sample, RioStream *stream );
00646 void SetAverageSimulationTime( double sample, RioStream *stream );
00647 void SetAverageWaitingTime( double sample, RioStream *stream, int index,
00648 int queue );
00649 long double GetNetworkRate();
00650 int UpdateNumberOfWaitingClients( int client );
00651 void generateTraceOfAllStreams( double time );
00652
00653
00654
00655
00656
00657
00658
00659 int GetNetMgrIPAddr();
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669 int GetNetInterfaceIPAddr();
00670
00671
00672
00673
00674
00675
00676 NetMgr *getNetMgr( void );
00677
00678
00679
00680
00681
00682 CNetInterface *getNetInterface( void );
00683
00684
00685
00686
00687
00688
00689 void SetNumberOfDisks( unsigned int NumberOfDisks );
00690 };
00691
00692 #endif //__STREAMMANAGER_H_