RioStream Class Reference

#include <StreamManager.h>

Public Member Functions

 ~RioStream ()
int Close ()
void Initialize (CStreamManager *)
int MaxRequests ()
int OpenObject (char *ObjectName, RioAccess Access, RioStreamObj **StreamObj)
int Get_streamid ()
int OpenObject (char *ObjectName, RioAccess Access, struct timeval RTT, int BufferSize, RioStreamObj **StreamObj)
RioTrafficType GetType ()
uint GetId ()
void releaseMutexUpdateEvent ()
void getMutexUpdateEvent ()
int getDirection ()
void updateNextBlockToBeFetched (PSEQBLOCKLIST Next)
void prefetchBlock (int posAtBuffer, EventDataRequest *event, RioStreamObj *StreamObject)
int getTokensToSend ()
void incTokensToSend ()
void decTokensToSend ()
void setTokensToSend (int value)
void decTokensToDisk ()
int getTokensToDisk ()
void updateTokensToDisk (struct timeval time)
u32 getMinimumBlockToSend ()
void incMinimumBlockToSend ()
void setMinimumBlockToSend (u32 Block)
u32 getMaximumBlockToSend ()
struct timeval getLastArrivalTime ()
double getRTTtoClient ()
int getClientBufferStatus ()
int getPlayBlock ()
int getNumberOfBlocks ()
int getServerBufferStatus ()
void decServerBufferStatus ()
int getNumberOfBuffersForEachClient ()
double getAverageCACTime ()
double getEstimatedDiskQueueTime (int disk)
double getEstimatedDiskServiceTime (int disk)
double getEstimatedDiskResponseTime (int disk)
long double getNetworkRate ()
unsigned int getBlockSize ()
int getQueueSize ()
int getClientBufferSize ()
void cancelBlockAtBuffer (int positionAtBuffer)
bool getPrefetchedAllBlocks ()
int ClientCanStart ()
int startingStream ()
void NewBlockArrival ()

Data Fields

ofstream m_log
ofstream m_log_requests

Private Types

enum  StreamStatus { StreamStatusClosed = 0, StreamStatusOpened = 1 }

Private Member Functions

 RioStream ()
 RioStream (const RioStream &x)
RioStreamoperator= (const RioStream &x)
int DataReq (u32 reqid, u16 operation, u32 ipaddr, u16 port, u32 block, u32 repbits, RioStreamObj *StreamObject)
void RTHold (EventDataRequest *event)
void NRTHold (EventDataRequest *event)
void ProcessRTQueue ()
void ProcessNRTQueue ()
void RequestCompleted (Event *Request)
PSEQBLOCKLIST getInfoAboutRequestList (char *filename, RioObject *object)

Private Attributes

RioStreamNext
RioStreamPrevious
CStreamManagers_mgr
int s_streamid
RioStreamDirection s_Direction
double s_Rate
SOCKADDR_IN m_RemoteAddress
StreamStatus m_Status
RioTrafficType m_Type
int m_nRequests
int m_MaxRequests
int m_UserMaxRequests
int m_nQueue
struct timeval m_NextRelease
struct timeval m_ArrivalInterval
u64 m_RequestCount
u64 m_QueuedCount
CRequestQueue m_Queue
struct timeval m_LastArrivalTime
struct timeval m_TokensToDiskClock
struct timeval m_TimeToBuffer
double m_FirstArrivalTime
double m_SampleGenReqListTime
double m_SampleSortListTime
double m_SampleSimulationTime
double m_SampleCACTime
double m_SampleAdmissionProcessTime
double m_SampleWaitingTime
int m_WaitingQueueSize
int m_TotalBlocks
PSEQBLOCKLIST m_RequestList
uint m_Id
PSTREAMLIST m_PosAtClientsList
ClientBufferm_Buffer
double m_AverageRTTtoClient_msec
int m_ClientBufferSize
int m_ServerBufferStatus
int m_TokensToSend
bool m_PrefetchedAllBlocks
bool m_FinishedMovie
int m_TokensToDisk
bool m_StartingToSend
u32 m_MinimumBlockToSend
pthread_mutex_t MutexTokensToSend
pthread_mutex_t MutexTokensToDisk
pthread_mutex_t MutexNextBlockToBeFetched
pthread_mutex_t MutexMinBlock
pthread_mutex_t MutexBlock
pthread_mutex_t MutexUpdateEvent
pthread_cond_t ConditionBlockArrival

Friends

class CStreamManager
class RioStreamObj

Detailed Description

Definition at line 143 of file server/StreamManager.h.


Member Enumeration Documentation

enum RioStream::StreamStatus [private]
Enumerator:
StreamStatusClosed 
StreamStatusOpened 

Definition at line 237 of file server/StreamManager.h.

00238         {
00239             StreamStatusClosed  = 0, // closed or closing
00240             StreamStatusOpened  = 1  // Opened and active
00241         } StreamStatus;


Constructor & Destructor Documentation

RioStream::RioStream (  )  [private]

Definition at line 961 of file server/StreamManager.cpp.

00962 {
00963     s_mgr             = NULL;
00964     s_streamid        = 0;
00965     s_Direction       = RioStreamDirectionRead;
00966     s_Rate            = 0.0;
00967     m_Status          = StreamStatusClosed;
00968     m_Type            = RIO_TRAFFIC_INVALID;
00969     m_nRequests       = 0;
00970     m_MaxRequests     = 0;
00971     m_UserMaxRequests = 0;
00972     m_nQueue          = 0;
00973     memset( (void *)&m_NextRelease,     0, sizeof( m_NextRelease )     );
00974     memset( (void *)&m_ArrivalInterval, 0, sizeof( m_ArrivalInterval ) );
00975     m_RequestCount    = 0;
00976     m_QueuedCount     = 0;
00977     // m_Queue does not need to be initialized
00978     memset( (void *)&m_LastArrivalTime  , 0, sizeof( m_LastArrivalTime ) );
00979     memset( (void *)&m_TokensToDiskClock, 0, sizeof( m_TokensToDiskClock ) );
00980     memset( (void *)&m_TimeToBuffer     , 0, sizeof( m_TimeToBuffer ) );
00981 
00982     m_FirstArrivalTime           = 0.0;
00983     m_SampleGenReqListTime       = 0.0;
00984     m_SampleSortListTime         = 0.0;
00985     m_SampleSimulationTime       = 0.0;
00986     m_SampleCACTime              = 0.0;
00987     m_SampleAdmissionProcessTime = 0.0;
00988     m_SampleWaitingTime          = 0.0;
00989     m_WaitingQueueSize           = 0;
00990     m_TotalBlocks                = 0;
00991     m_RequestList                = NULL;
00992     m_Id                         = 0;
00993     m_PosAtClientsList           = NULL;
00994     m_Buffer                     = NULL;
00995     m_AverageRTTtoClient_msec    = 0.0;
00996     m_ClientBufferSize           = 0;
00997     m_ServerBufferStatus         = 0;
00998     m_TokensToSend               = 0;
00999     m_PrefetchedAllBlocks        = false;
01000     m_FinishedMovie              = false;
01001     m_TokensToDisk               = 0;
01002     m_StartingToSend             = true;
01003     m_MinimumBlockToSend         = 0;
01004     pthread_mutex_init( &MutexTokensToSend        , NULL );
01005     pthread_mutex_init( &MutexTokensToDisk        , NULL );
01006     pthread_mutex_init( &MutexNextBlockToBeFetched, NULL );
01007     pthread_mutex_init( &MutexMinBlock            , NULL );
01008     pthread_mutex_init( &MutexBlock               , NULL );
01009     pthread_mutex_init( &MutexUpdateEvent         , NULL );
01010     pthread_cond_init(  &ConditionBlockArrival    , NULL );
01011 
01012     Next     = NULL;
01013     Previous = NULL;
01014     // m_log          does not need to be initialized
01015     // m_log_requests does not need to be initialized
01016 }

RioStream::RioStream ( const RioStream x  )  [private]
RioStream::~RioStream (  ) 

Definition at line 1019 of file server/StreamManager.cpp.

01020 {
01021     Close();
01022 }


Member Function Documentation

void RioStream::cancelBlockAtBuffer ( int  positionAtBuffer  ) 

Definition at line 2496 of file server/StreamManager.cpp.

02497 {
02498     getMutexUpdateEvent();
02499 
02500     if( m_Buffer[positionAtBuffer].event != NULL )
02501     {
02502         m_Buffer[positionAtBuffer].event->Request.Operation=RealTimeCancelBlock;
02503         if( m_Buffer[positionAtBuffer].event->Request.BufferId != -1 )
02504         {
02505             releaseMutexUpdateEvent();
02506             decServerBufferStatus();
02507             //if it was served by Disk
02508             s_mgr->m_Router->Put( (Event*)m_Buffer[positionAtBuffer].event );
02509             #ifdef RIO_DEBUG2
02510             if( m_log.is_open() ) m_log << "cancelBlockAtBuffer: block "
02511                 << m_Buffer[positionAtBuffer].blockInfo->block
02512                 <<" canceled!"<< endl;
02513             #endif
02514         }
02515         else
02516         {
02517             releaseMutexUpdateEvent();
02518             #ifdef RIO_DEBUG2
02519             if( m_log.is_open() )
02520                 m_log << "cancelBlockAtBuffer: setting cancel flag of block "
02521                       << m_Buffer[positionAtBuffer].blockInfo->block << endl;
02522             #endif
02523         }
02524         m_Buffer[positionAtBuffer].event = NULL;
02525     }
02526     else
02527     {
02528         releaseMutexUpdateEvent();
02529     }
02530 }

int RioStream::ClientCanStart (  ) 

Definition at line 2569 of file server/StreamManager.cpp.

02570 {
02571     struct timeval final_time;
02572     double time;
02573     // After the client sent the requests, he asks to continue
02574     // but only after the server side buffer is full he can start visualization
02575     #ifdef RIO_DEBUG2
02576     if( m_log.is_open() ) m_log << " Client asked 'can I start'? " << endl;
02577     #endif
02578     if( m_PosAtClientsList != NULL )
02579     {
02580         while(getServerBufferStatus()<s_mgr->GetNumberOfBuffersForEachClient())
02581         {
02582             pthread_mutex_lock( &MutexBlock );
02583             pthread_cond_wait( &ConditionBlockArrival, &MutexBlock );
02584             pthread_mutex_unlock( &MutexBlock );
02585         }
02586         final_time = Timer.CurrentTime();
02587         time = getInterval( m_TimeToBuffer, final_time );
02588         s_mgr->SetAverageBufferTime( time, this );
02589         m_TimeToBuffer.tv_sec  =(u64)(( time * 1000.0 ) / 1000000 );
02590         m_TimeToBuffer.tv_usec =(u64)((u64) ( time * 1000.0 ) % 1000000 );
02591         #ifdef RIO_DEBUG2
02592         if( m_log.is_open() ) m_log << "Time to buffer " << time << endl;
02593         #endif
02594 
02595         // after this point, the client will be controlled by tokenstodisk
02596         m_StartingToSend = false;
02597         //initializes the tokenstodisk
02598         m_TokensToDisk = s_mgr->GetBurstSizeOfEachClient();
02599 
02600         //initializes the clock to be used to accumulate tokenstodisk
02601         struct timeval clocknow = Timer.CurrentTime();
02602         m_TokensToDiskClock.tv_sec  = clocknow.tv_sec;
02603         m_TokensToDiskClock.tv_usec = clocknow.tv_usec;
02604     }
02605     #ifdef RIO_DEBUG2
02606     if( m_log.is_open() ) m_log << " Yes. You can start now. m_TokensToDisk : "
02607           << m_TokensToDisk << endl;
02608     #endif
02609     return S_OK;
02610 }

int RioStream::Close ( void   ) 

Definition at line 1031 of file server/StreamManager.cpp.

01032 {
01033     int i = 0;
01034 
01035     // Close any opened object on this stream
01036     // ###    m_ObjectManager->CloseStream(Stream);
01037 
01038     // just return if already closed
01039     if( m_Status == StreamStatusClosed )
01040         return S_OK;
01041 
01042     s_mgr->m_MutexTable->Wait();
01043 
01044     m_Status = StreamStatusClosed;
01045 
01046     if( s_mgr->m_log.is_open() )
01047         s_mgr->m_log << "Closing Stream ( " << m_Id << " )"
01048         << " Requests "      << m_nRequests
01049         << " Queue "         << m_nQueue
01050         << " Request Count " << m_RequestCount
01051         << " Queued Count "  << m_QueuedCount << endl;
01052 
01053     EventDataRequest* request;
01054 
01055     // empty stream queue
01056     while( m_nQueue > 0 )
01057     {
01058         request = m_Queue.Get();
01059         m_nQueue--;
01060         m_nRequests--;
01061         EventManager.Free( ( Event* )request );
01062     }
01063 
01064     if( s_mgr->m_RTHoldList.First() == this )
01065         s_mgr->m_RTHoldList.Remove( s_mgr->m_RTHoldList.First() );
01066 
01067     if( ( m_PosAtClientsList != NULL ) && ( s_mgr->m_UseServerSideBuffers ) )
01068     {
01069         // need to cancel blocks at Storage
01070         // for each position at buffer, cancel block
01071         for( i = 0 ; i < s_mgr->GetNumberOfBuffersForEachClient(); i++ )
01072         {
01073             cancelBlockAtBuffer ( i );
01074         }
01075     }
01076     // ------------------------------------------------------------------------
01077 
01078     // If there are pending requests should wait request completion
01079     // before finishing close
01080     if( m_nRequests > 0 )
01081     {
01082         if( s_mgr->m_log.is_open() )
01083             s_mgr->m_log << "Should wait for request completion before closing: "
01084                          << m_nRequests << " requests pending" << endl;
01085         s_mgr->m_CloseList.Put( this );
01086     }
01087     else
01088     {
01089         // If no pending request finish closing
01090         s_mgr->m_used--;
01091         // specific for real-time streams
01092         if( m_Type == RIO_TRAFFIC_CBR )
01093         {
01094             s_mgr->m_UsedRate -= s_Rate;
01095         }
01096 
01097         if( ( m_PosAtClientsList != NULL ) &&
01098             ( s_mgr->m_UseServerSideBuffers )
01099           )
01100         {
01101             s_mgr->RemoveClient( m_PosAtClientsList );
01102             if( m_Buffer )
01103             {
01104                 delete m_Buffer;
01105                 m_Buffer = NULL;
01106             }
01107         }
01108         #ifdef RIO_DEBUG2
01109         RioErr << "Closing mlog." << endl;
01110         m_log.close();
01111         #endif
01112         #ifdef RIO_DEBUG2
01113         RioErr << "Closing mlogreq." << endl;
01114         m_log_requests.close();
01115         #endif
01116         // --------------------------------------------------------------------
01117 
01118         s_mgr->m_FreeList.Put( this );
01119         if( s_mgr->m_log.is_open() )
01120             s_mgr->m_log << "Close: Stream CLOSED. Requests: " << m_RequestCount
01121                          << ". Queued requests: "       << m_QueuedCount
01122                          << ". Active streams "         << s_mgr->m_used<< endl;
01123     }
01124 
01125     s_mgr->m_MutexTable->Release();
01126 
01127     struct in_addr clientip;
01128     clientip.s_addr = m_RemoteAddress.sin_addr.s_addr;
01129     UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip),
01130             htons( m_RemoteAddress.sin_port ), m_nRequests );
01131     s_mgr->PostITEvent( (MonitorEvent *)it_event );
01132 
01133     return S_OK;
01134 }

int RioStream::DataReq ( u32  reqid,
u16  operation,
u32  ipaddr,
u16  port,
u32  block,
u32  repbits,
RioStreamObj StreamObject 
) [private]

Definition at line 1215 of file server/StreamManager.cpp.

01219 {
01220     // Set Arrival time with current time
01221     struct timeval ArrivalTime = Timer.CurrentTime();
01222 
01223     #ifdef RIO_DEBUG2
01224     if( m_log.is_open() )
01225     {
01226         struct in_addr clientip;
01227         double interval = getInterval( m_LastArrivalTime, ArrivalTime );
01228         clientip.s_addr = ipaddr;
01229         char *info = myInfo( false );
01230         m_log << endl << info
01231               << "Stream:DataReq => Req ip " << inet_ntoa(clientip)
01232               << " port "  << port   << " reqid "        << reqid
01233               << " block " << block  << " TokensToSend " << m_TokensToSend
01234               << " (m_nRequests " << m_nRequests + 1 << ")"
01235               << " arrival " << ArrivalTime.tv_sec  << " sec "
01236               << ArrivalTime.tv_usec << " usec "
01237               << "interval " << interval << " msec"
01238               << " m_MaxRequests " << m_MaxRequests << endl;
01239         free( info );
01240     }
01241     #endif
01242 
01243     // Do not accept request if Stream is closed
01244     if( m_Status == StreamStatusClosed )
01245     {
01246         #ifdef RIO_DEBUG2
01247         if( m_log.is_open() )
01248             m_log << "Stream:DataReq:ERROR_STREAM_NOT_OPENED!!!" << endl;
01249         #endif
01250 
01251         return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
01252     }
01253 
01254     // Check if number of stream active requests does not exceed maximum
01255     if( m_nRequests >= m_MaxRequests )
01256     {
01257         #ifdef RIO_DEBUG2
01258         if( m_log.is_open() )
01259             m_log << "Stream:DataReq:Max Number of Active Requests!!! "
01260                   << m_nRequests << ">=" << m_MaxRequests << endl;
01261         #endif
01262 
01263         RioErr << "Stream:DataReq:Max Number of Active Requests!!! "
01264                << m_nRequests << ">=" << m_MaxRequests << endl;
01265 
01266         return ERROR_STREAMMANAGER + ERROR_MAX_STREAM_REQUESTS;
01267     }
01268     // Increment number of stream active requests
01269     m_nRequests++;
01270     m_RequestCount++;
01271 
01272     // Get Request slot for new request
01273     EventDataRequest* event;
01274 
01275     if( m_Type == RIO_TRAFFIC_NRT )
01276     {
01277         event = (EventDataRequest*)EventManager.New( EventTypeNRTDataRequest );
01278     }
01279     else
01280     {
01281         event = (EventDataRequest*)EventManager.New( EventTypeRTDataRequest );
01282     }
01283 
01284     if( operation == READ )
01285     {
01286         if( m_Type == RIO_TRAFFIC_NRT )
01287         {
01288             event->Request.Operation = NonRealTimeRead;
01289         }
01290         else
01291         {
01292             event->Request.Operation = RealTimeRead;
01293         }
01294     }
01295     else
01296     {
01297         event->Request.Operation = NonRealTimeWrite;
01298     }
01299 
01300     event->Request.streamobj        = StreamObject;
01301     event->Request.rioobject        = StreamObject->o_object;
01302     event->Request.Reqid            = reqid;
01303     event->Request.Target.IPaddress = ipaddr;
01304     event->Request.Target.Port      = port;
01305     event->Request.Block            = block;
01306     event->Request.RepBits          = repbits;
01307     event->Request.Status           = 0;  // ok so far
01308     event->Request.BufferId         = -2; // identifies normal use
01309 
01310     // Process request differently depending if it is real-time or not
01311     if( m_Type == RIO_TRAFFIC_NRT )
01312     {
01313         // Should hold request if there are others being hold or
01314         // if reached maximum number of pending non real-time requests
01315         if( s_mgr->m_HoldingNonRealTime ||
01316           ( s_mgr->m_ActiveRequests >= s_mgr->m_MaxActiveRequests ))
01317         {
01318             // Put request on hold until time out processing decides
01319             // it is time to proceess it
01320             NRTHold( event );
01321         }
01322         // Otherwise send request direct to router
01323         else
01324         {
01325             // Increment total number of requests sent to router
01326             s_mgr->m_ActiveRequests++;
01327 
01328             // Send request to router
01329             #ifdef RIO_DEBUG2
01330             if( m_log.is_open() )
01331                 m_log << "Stream:DataReq:Enviando pedido de NRT ao router"
01332                       << endl;
01333             #endif
01334             s_mgr->m_Router->Put( (Event*)event );
01335         }
01336     }
01337     // Real-time case
01338     else
01339     {
01340         // Check if the object was submited to new cac or
01341         // if it is using server side buffers.
01342         if( ( m_PosAtClientsList != NULL ) &&
01343             ( s_mgr->m_UseServerSideBuffers )
01344           )
01345         {
01346             m_LastArrivalTime = ArrivalTime;
01347 
01348             bool    changeSequence   = false;
01349             bool    blockWasAtBuffer = false;
01350             bool    usedEvent        = false;
01351 
01352             incTokensToSend();                  // available buffer at client
01353             updateTokensToDisk( ArrivalTime );  // available requests to Disk
01354 
01355             #ifdef RIO_DEBUG2
01356             if( m_log.is_open() )
01357                 m_log << " Looking for block at buffer ... ( tokenstodisk "
01358                       << getTokensToDisk() << ")"<< endl;
01359             #endif
01360 
01361 
01362             // verifies if the block is at the buffer and send
01363             for( int i = 0; i < s_mgr->GetNumberOfBuffersForEachClient(); i++ )
01364             {
01365                 if( ( m_Buffer[i].event != NULL ) &&
01366                    ( (u32)m_Buffer[i].event->Request.Block == block ))
01367                 {
01368                     #ifdef RIO_DEBUG2
01369                     if( m_log.is_open() )
01370                         m_log << " Found it with BufferId "
01371                               << m_Buffer[i].event->Request.BufferId
01372                               << " updates ip, port and reqid... " << endl;
01373                     #endif
01374 
01375                     blockWasAtBuffer = true;
01376 
01377                     // because Router could be using to update storage reply
01378                     getMutexUpdateEvent();
01379 
01380                     // updates information to send
01381                     m_Buffer[i].event->Request.Reqid            = reqid;
01382                     m_Buffer[i].event->Request.Target.IPaddress = ipaddr;
01383                     m_Buffer[i].event->Request.Target.Port      = port;
01384 
01385                     incMinimumBlockToSend();
01386 
01387                     if( m_Buffer[i].event->Request.BufferId == -1 )
01388                     {
01389                         #ifdef RIO_DEBUG2
01390                         //if it was not served by Disk, just wait
01391                         if( m_log.is_open() )
01392                             m_log << " prefetching and waiting "<< endl;
01393                         #endif
01394                         releaseMutexUpdateEvent();
01395                     }
01396                     else if( m_Buffer[i].event->Request.Status != 0 )
01397                     {
01398                         //if error at prefetch, cancel the block
01399                         m_Buffer[i].event->Request.Operation = 
01400                                                         RealTimeCancelBlock;
01401                         releaseMutexUpdateEvent();
01402                         s_mgr->m_Router->Put( (Event*)m_Buffer[i].event );
01403                         decTokensToSend(); //can not send block
01404                         decServerBufferStatus();
01405 
01406                         #ifdef RIO_DEBUG2
01407                         if( m_log.is_open() )
01408                             m_log << " sent to Router -> error at prefetch, "
01409                                   << " cancel block" << endl;
01410                         #endif
01411                     }
01412                     else
01413                     {
01414                         //if it was prefetched, just send the block
01415                         m_Buffer[i].event->Request.Operation=RealTimeSendBlock;
01416                         releaseMutexUpdateEvent();
01417                         s_mgr->m_Router->Put( (Event*)m_Buffer[i].event );
01418                         #ifdef RIO_DEBUG2
01419                         if( m_log.is_open() )
01420                             m_log << " sent to Router "<< endl;
01421                         #endif
01422                     }
01423 
01424                     m_Buffer[i].event = NULL;
01425                 }
01426             }
01427 
01428             // updates next block to be requested
01429             if( block != m_PosAtClientsList->nextBlockToBeRequested->block )
01430             {
01431                 //the sequence was changed, have to arrange everything
01432                 changeSequence = true;
01433 
01434                 #ifdef RIO_DEBUG2
01435                 if( m_log.is_open() )
01436                     m_log << " sequence changed from "
01437                           << m_PosAtClientsList->nextBlockToBeRequested->block
01438                           << " to " << block << endl;
01439                 #endif
01440 
01441                 while( ( block >
01442                          m_PosAtClientsList->nextBlockToBeRequested->block ) &&
01443                        ( m_PosAtClientsList->nextBlockToBeRequested->next !=
01444                          NULL )
01445                      )
01446                     m_PosAtClientsList->nextBlockToBeRequested =
01447                                m_PosAtClientsList->nextBlockToBeRequested->next;
01448 
01449                 while( ( block <
01450                          m_PosAtClientsList->nextBlockToBeRequested->block ) &&
01451                        ( m_PosAtClientsList->nextBlockToBeRequested->prev !=
01452                          NULL )
01453                      )
01454                     m_PosAtClientsList->nextBlockToBeRequested =
01455                                m_PosAtClientsList->nextBlockToBeRequested->prev;
01456 
01457                 #ifdef RIO_DEBUG2
01458                 if( m_log.is_open() )
01459                     m_log << " nbtoberequested "
01460                           << m_PosAtClientsList->nextBlockToBeRequested->block
01461                           << endl;
01462                 #endif
01463 
01464                 if( blockWasAtBuffer == false )
01465                 {
01466                     #ifdef RIO_DEBUG2
01467                     if( m_log.is_open() )
01468                         m_log << " block was not at buffer. "<< endl;
01469                     #endif
01470 
01471                     //if block was not buffered, send the block
01472                     usedEvent = true;
01473                     decTokensToSend();
01474                     if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) )
01475                     {
01476                         #ifdef RIO_DEBUG2
01477                         if( m_log.is_open() )
01478                             m_log << " (to Router) send block "
01479                                   << event->Request.Block
01480                                   << " bufferid "
01481                                   << event->Request.BufferId
01482                                   << " m_ActiveRequests "
01483                                   << s_mgr->m_ActiveRequests + 1
01484                                   << endl;
01485                         #endif
01486 
01487                         s_mgr->m_Router->Put( (Event*)event );
01488                         s_mgr->m_ActiveRequests++;
01489                         decTokensToDisk();
01490                     }
01491                     else
01492                     {
01493                         #ifdef RIO_DEBUG2
01494                         if( m_log.is_open() )
01495                             m_log << " (to queue) send block "
01496                                   << event->Request.Block
01497                                   << " bufferid "
01498                                   << event->Request.BufferId << endl;
01499                         #endif
01500 
01501                         RTHold( event );
01502                     }
01503                 }
01504             }
01505 
01506             if( m_PosAtClientsList->nextBlockToBeRequested->next != NULL )
01507             {
01508                 m_PosAtClientsList->nextBlockToBeRequested =
01509                          m_PosAtClientsList->nextBlockToBeRequested->next;
01510             }
01511             else
01512             {
01513                 // Doesn't have any more requests
01514                 #ifdef RIO_DEBUG2
01515                 if( m_log.is_open() )
01516                     m_log << " last block - prefetch all true "<< endl;
01517                 #endif
01518 
01519                 m_FinishedMovie = true;
01520                 m_PrefetchedAllBlocks = true;
01521 
01522                 if( s_mgr->GetNumberOfBuffersForEachClient() != 0 )
01523                 {
01524                     EventManager.Free( (Event *) event );
01525                 }
01526             }
01527 
01528             #ifdef RIO_DEBUG2
01529             if( m_log.is_open() ) m_log << " updated nbtoberequested "
01530                   << m_PosAtClientsList->nextBlockToBeRequested->block
01531                   << endl;
01532             #endif
01533 
01534             if( m_PrefetchedAllBlocks == true )
01535             {
01536                 m_nRequests--;
01537             }
01538 
01539             // updates next block to be fetched
01540             if( s_mgr->GetNumberOfBuffersForEachClient() == 0 )
01541             {
01542                 updateNextBlockToBeFetched(
01543                                    m_PosAtClientsList->nextBlockToBeRequested);
01544             }
01545             else
01546             {
01547                 if( changeSequence && ( m_FinishedMovie == false ) )
01548                 {
01549                     #ifdef RIO_DEBUG2
01550                     if( m_log.is_open() )
01551                         m_log << " updating nbtobefetched ... ";
01552                     #endif
01553 
01554                     if( ( block >=
01555                           m_PosAtClientsList->nextBlockToBeFetched->block ) ||
01556                         ( ( block + s_mgr->GetNumberOfBuffersForEachClient()) <
01557                           ( m_PosAtClientsList->nextBlockToBeFetched->block -
01558                             s_mgr->GetNumberOfBuffersForEachClient()
01559                           )
01560                         )
01561                       )
01562                     {
01563                         //for each position at buffer, cancel block
01564                         for( int i=0;
01565                              i < s_mgr->GetNumberOfBuffersForEachClient();
01566                              i++
01567                            )
01568                         {
01569                             cancelBlockAtBuffer ( i );
01570                             m_nRequests++;
01571                         }
01572                         updateNextBlockToBeFetched(
01573                                   m_PosAtClientsList->nextBlockToBeRequested );
01574                     }
01575                     else
01576                     if( ( block + s_mgr->GetNumberOfBuffersForEachClient() ) <
01577                               m_PosAtClientsList->nextBlockToBeFetched->block )
01578                     {
01579                         updateNextBlockToBeFetched(
01580                              m_PosAtClientsList->nextBlockToBeRequested );
01581                     }
01582                     setMinimumBlockToSend(
01583                            m_PosAtClientsList->nextBlockToBeRequested->block );
01584                     if( m_PosAtClientsList->nextBlockToBeFetched->next == NULL )
01585                     {
01586                         m_PrefetchedAllBlocks = true;
01587                     }
01588                     else
01589                     {
01590                         m_PrefetchedAllBlocks = false;
01591                     }
01592                 }
01593             }
01594 
01595             // prefetch blocks
01596 
01597             if( ( s_mgr->GetNumberOfBuffersForEachClient() == 0 ) &&
01598                ( changeSequence == false ))
01599             {
01600                 decTokensToSend();
01601                 if( m_StartingToSend == true )
01602                 {
01603                     #ifdef RIO_DEBUG2
01604                     if( m_log.is_open() )
01605                         m_log << "Stream:DataReq:Sending prefetching blocks to"
01606                               << " router"<< endl;
01607                     #endif
01608 
01609                     s_mgr->m_Router->Put( (Event*)event );
01610                     s_mgr->m_ActiveRequests++;
01611                 }
01612                 else
01613                 {
01614                     if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) )
01615                     {
01616                         #ifdef RIO_DEBUG2
01617                         if( m_log.is_open() )
01618                             m_log << "Stream:DataReq:Sending prefetching blocks "
01619                                   << "to router 2"<< endl;
01620                         #endif
01621 
01622                         s_mgr->m_Router->Put( (Event*)event );
01623                         s_mgr->m_ActiveRequests++;
01624                         decTokensToDisk();
01625                     }
01626                     else
01627                     {
01628                         RTHold( event );
01629                     }
01630                 }
01631             }
01632             int i   = 0;
01633             int pos = -1;
01634             bool prefetch;
01635 
01636             while(( i < s_mgr->GetNumberOfBuffersForEachClient()) &&
01637                   ( m_PrefetchedAllBlocks == false ))
01638             {
01639                 prefetch = false;
01640                 if( m_Buffer[i].event == NULL )
01641                 {
01642                     prefetch = true;
01643                 }
01644                 else
01645                 if( (( m_Buffer[i].blockInfo->block < getMinimumBlockToSend() )
01646                      ||( m_Buffer[i].blockInfo->block >=
01647                        ( m_PosAtClientsList->nextBlockToBeRequested->block
01648                          + s_mgr->GetNumberOfBuffersForEachClient())))
01649                    && changeSequence )
01650                 {
01651                     cancelBlockAtBuffer( i );
01652                     m_nRequests++;
01653                     prefetch = true;
01654                 }
01655                 else if( ( m_Buffer[i].blockInfo->block >=
01656                           m_PosAtClientsList->nextBlockToBeFetched->block )
01657                         && ( changeSequence ))
01658                 {
01659                     if( pos == -1 )
01660                     {
01661                         pos = i;
01662                     }
01663                     else
01664                     {
01665                         if( m_Buffer[pos].blockInfo->block <
01666                             m_Buffer[i].blockInfo->block )
01667                             pos = i;
01668                     }
01669                 }
01670                 if( prefetch == true )
01671                 {
01672                     if( usedEvent )
01673                     {
01674                         event = (EventDataRequest*)
01675                                 EventManager.New( EventTypeRTDataRequest );
01676                     }
01677                     else
01678                         usedEvent = true;
01679 
01680                     prefetchBlock( i, event, StreamObject );
01681                     updateNextBlockToBeFetched( NULL );
01682                 }
01683                 i++;
01684             }
01685 
01686             if( ( pos != -1 ) &&
01687                ( m_Buffer[pos].blockInfo->block >=
01688                         m_PosAtClientsList->nextBlockToBeFetched->block ))
01689             {
01690                  updateNextBlockToBeFetched( m_Buffer[pos].blockInfo->next );
01691             }
01692 
01693             if( usedEvent == false )
01694             {
01695                 //have to free (it was not used)
01696                 EventManager.Free( (Event *) event );
01697             }
01698             // ----------------------------------------------------------------
01699         }
01700         else
01701         {
01702 
01703             ///////////////////////////////////////////////////////////////////
01704             // Check if request satisfy rate requirement and there are no other
01705             // request queued
01706 
01707             struct timeval tolerance_time;
01708             tolerance_time.tv_sec =ArrivalTime.tv_sec;
01709             tolerance_time.tv_usec=ArrivalTime.tv_usec+RELEASE_TIME_TOLERANCE;
01710             if( tolerance_time.tv_usec > 1000000 )
01711             {
01712                 tolerance_time.tv_sec  += 1;
01713                 tolerance_time.tv_usec -= 1000000;
01714             }
01715 
01716             #ifdef RIO_DEBUG2
01717             if( m_log.is_open() ) m_log << "Stream:DataReq "
01718                   << " m_nQueue " << m_nQueue << endl
01719                   << " tolerance_time.tv_sec "
01720                   << ( unsigned ) tolerance_time.tv_sec
01721                   << " m_NextRelease.tv_sec "
01722                   << ( unsigned ) m_NextRelease.tv_sec
01723                   << endl
01724                   << " tolerance_time.tv_usec "
01725                   << ( unsigned ) tolerance_time.tv_usec
01726                   << " m_NextRelease.tv_usec "
01727                   << ( unsigned ) m_NextRelease.tv_usec
01728                   << endl;
01729             #endif
01730 
01731             //This part makes the control of client's requests queue
01732             //Now, server only sends requests to router
01733             if( ( m_nQueue == 0 ) &&
01734                (( tolerance_time.tv_sec > m_NextRelease.tv_sec ) ||
01735                (( tolerance_time.tv_sec == m_NextRelease.tv_sec ) &&
01736                 ( tolerance_time.tv_usec >= m_NextRelease.tv_usec ))))
01737             {
01738                 // Yes: Send request directly to Router
01739 
01740                 // First update new release time for next request
01741                 if( ( ArrivalTime.tv_sec > m_NextRelease.tv_sec ) ||
01742                   (( ArrivalTime.tv_sec == m_NextRelease.tv_sec) &&
01743                     (ArrivalTime.tv_usec > m_NextRelease.tv_usec)) )
01744                 {
01745                     m_NextRelease.tv_sec  = ArrivalTime.tv_sec  +
01746                                             m_ArrivalInterval.tv_sec;
01747                     m_NextRelease.tv_usec = ArrivalTime.tv_usec +
01748                                             m_ArrivalInterval.tv_usec;
01749                 }
01750                 else
01751                 {
01752                     m_NextRelease.tv_sec  += m_ArrivalInterval.tv_sec;
01753                     m_NextRelease.tv_usec += m_ArrivalInterval.tv_usec;
01754                 }
01755                 if( m_NextRelease.tv_usec > 1000000 )
01756                 {
01757                     m_NextRelease.tv_sec  += 1;
01758                     m_NextRelease.tv_usec -= 1000000;
01759                 }
01760             // ----------------------------------------------------------------
01761 
01762                 // Increment total number of requests sent to router
01763                 s_mgr->m_ActiveRequests++;
01764 
01765                 // Send request to router
01766                 #ifdef RIO_DEBUG2
01767                 if( m_log.is_open() )
01768                     m_log << "Stream:DataReq:Enviando pedido de bloco ao router"
01769                           << endl;
01770                 #endif
01771                 s_mgr->m_Router->Put( (Event*)event );
01772             }
01773             // Rate violation: Hold request
01774             else
01775             {
01776                 // Put request on hold until time out processing decides
01777                 // it is time to process it
01778                 #ifdef RIO_DEBUG2
01779                 if( m_log.is_open() )
01780                     m_log << "Stream:DataReq:Putting request on hold" << endl;
01781                 #endif
01782 
01783                 RTHold( event );
01784             }
01785 
01786             m_LastArrivalTime = ArrivalTime;
01787         }
01788     }
01789 
01790     struct in_addr clientip;
01791     clientip.s_addr = m_RemoteAddress.sin_addr.s_addr;
01792     UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip),
01793             htons( m_RemoteAddress.sin_port ), m_nRequests );
01794     s_mgr->PostITEvent( (MonitorEvent *)it_event );
01795 
01796     return S_OK;
01797 }

void RioStream::decServerBufferStatus (  ) 

Definition at line 2720 of file server/StreamManager.cpp.

02721 {
02722     pthread_mutex_lock( &MutexBlock );
02723     m_ServerBufferStatus--;
02724     pthread_mutex_unlock( &MutexBlock );
02725     #ifdef RIO_DEBUG2
02726     if( m_log.is_open() ) m_log << "BufferStatus decremented : " << m_ServerBufferStatus << endl;
02727     #endif
02728 }

void RioStream::decTokensToDisk (  ) 

Definition at line 2644 of file server/StreamManager.cpp.

02645 {
02646     pthread_mutex_lock( &MutexTokensToDisk );
02647     if( m_TokensToDisk > 0 )
02648     {
02649         m_TokensToDisk--;
02650     }
02651     pthread_mutex_unlock( &MutexTokensToDisk );
02652     #ifdef RIO_DEBUG2
02653     if( m_log.is_open() ) m_log << "TokensToDisk decremented : " << m_TokensToDisk << endl;
02654     #endif
02655 }

void RioStream::decTokensToSend (  ) 

Definition at line 2821 of file server/StreamManager.cpp.

02822 {
02823     pthread_mutex_lock( &MutexTokensToSend );
02824     m_TokensToSend--;
02825     pthread_mutex_unlock( &MutexTokensToSend );
02826     #ifdef RIO_DEBUG2
02827     if( m_log.is_open() ) m_log << "TokensToSend decremented : " << m_TokensToSend << endl;
02828     #endif
02829 }

int RioStream::Get_streamid (  ) 

Definition at line 1208 of file server/StreamManager.cpp.

01209 {
01210     return s_streamid;
01211 }

double RioStream::getAverageCACTime (  ) 

Definition at line 2737 of file server/StreamManager.cpp.

02738 {
02739     return s_mgr->GetAverageCACTime();
02740 }

unsigned int RioStream::getBlockSize (  ) 

Definition at line 2758 of file server/StreamManager.cpp.

02759 {
02760     return s_mgr->GetBlockSize();
02761 }

int RioStream::getClientBufferSize (  ) 

Definition at line 2699 of file server/StreamManager.cpp.

02700 {
02701     return m_ClientBufferSize;
02702 }

int RioStream::getClientBufferStatus (  ) 

Definition at line 2669 of file server/StreamManager.cpp.

02670 {
02671     if( m_StartingToSend )
02672     {
02673         return 0;
02674     }
02675     else
02676     {
02677         return m_ClientBufferSize - m_TokensToSend;
02678     }
02679 }

int RioStream::getDirection (  ) 

Definition at line 1136 of file server/StreamManager.cpp.

01137 {
01138     return s_Direction;
01139 }

double RioStream::getEstimatedDiskQueueTime ( int  disk  ) 

Definition at line 2753 of file server/StreamManager.cpp.

02754 {
02755     return s_mgr->GetEstimatedDiskQueueTime( disk );
02756 }

double RioStream::getEstimatedDiskResponseTime ( int  disk  ) 

Definition at line 2748 of file server/StreamManager.cpp.

02749 {
02750     return s_mgr->GetEstimatedDiskResponseTime( disk );
02751 }

double RioStream::getEstimatedDiskServiceTime ( int  disk  ) 

Definition at line 2743 of file server/StreamManager.cpp.

02744 {
02745     return s_mgr->GetEstimatedDiskServiceTime( disk );
02746 }

uint RioStream::GetId (  ) 

Definition at line 1202 of file server/StreamManager.cpp.

01203 {
01204     return m_Id;
01205 }

PSEQBLOCKLIST RioStream::getInfoAboutRequestList ( char *  filename,
RioObject object 
) [private]

Definition at line 2889 of file server/StreamManager.cpp.

02891 {
02892     FILE *file;
02893     PSEQBLOCKLIST curblock, prevblock, init_stream;
02894     struct timeval initial_time, final_time;
02895 
02896     float       duration = 0;
02897     uint        block    = 0;
02898     int         rc       = 0;
02899     int         numreps;
02900     long double time = 0; // event time
02901 
02902     RioDiskBlock Reps[MaxReplications];
02903 
02904     initial_time = Timer.CurrentTime();
02905 
02906     m_FirstArrivalTime = initial_time.tv_sec  * 1000.0 +
02907                          initial_time.tv_usec / 1000.0;
02908 
02909     uint SBufferSize  = getNumberOfBuffersForEachClient();
02910     uint CBufferSize  = m_ClientBufferSize;
02911 
02912     uint buffersize   = CBufferSize + SBufferSize;
02913 
02914     #ifdef RIO_DEBUG2
02915     if( m_log.is_open() ) m_log <<"---------------------------------------------------------------- "<<endl;
02916     if( m_log.is_open() ) m_log <<"                      getInfoAboutRequestList                    "<<endl;
02917     if( m_log.is_open() ) m_log <<"---------------------------------------------------------------- "<<endl;
02918     if( m_log.is_open() ) m_log << "Getting disk service time info ... ";
02919     #endif
02920 
02921     pthread_mutex_lock( &s_mgr->MutexUpdateMeasures );
02922     s_mgr->GetDiskServiceTime();
02923     s_mgr->UpdateDiskQueueAndServiceTime();
02924     pthread_mutex_unlock( &s_mgr->MutexUpdateMeasures );
02925 
02926     #ifdef RIO_DEBUG2
02927     if( m_log.is_open() ) m_log << "OK." << endl;
02928     #endif
02929 
02930     double msecRunCAC  = getAverageCACTime();
02931     double NetworkRate = getNetworkRate();
02932 
02933     unsigned int BlockSize = getBlockSize();
02934 
02935     double timeToTransmit_msec = ( BlockSize/NetworkRate ) * 1000.0;
02936 
02937     #ifdef RIO_DEBUG2
02938     if( m_log.is_open() ) m_log << "Client ID   " << m_Id << endl;
02939     if( m_log.is_open() ) m_log << "SBufferSize " << SBufferSize << endl;
02940     if( m_log.is_open() ) m_log << "CBufferSize " << CBufferSize << endl;
02941     if( m_log.is_open() ) m_log << "RTT (msec)  " << m_AverageRTTtoClient_msec << endl;
02942     if( m_log.is_open() ) m_log << "msecRunCAC  " << msecRunCAC << endl;
02943     if( m_log.is_open() ) m_log << "NetworkRate " << NetworkRate << "B/sec" << endl;
02944     if( m_log.is_open() ) m_log << "BlockSize   " << BlockSize << "Bytes" << endl;
02945     if( m_log.is_open() ) m_log << "timeToTransmit_msec " << timeToTransmit_msec << " msec" << endl;
02946     #endif
02947 
02948     // Open the file with times of the requests
02949     if( (file = fopen( filename, "r" )) == NULL )
02950     {
02951         if( m_log.is_open() )
02952             m_log<<"getInfoAboutRequestList: "<<filename<<": File not found."<<endl;
02953         return NULL;
02954     }
02955 
02956     // Start to construct request list
02957     init_stream = prevblock = NULL;
02958 
02959     float *req_time   = (float *) malloc( buffersize * sizeof(float) );
02960     float *disk_block = (float *) malloc( buffersize * sizeof(float) );
02961     // get number of disks
02962     int disks = s_mgr->m_MaxNumberOfDisks;
02963     //time to service next request
02964     long double *disk_time = (long double*) malloc( disks * sizeof(long double) );
02965     for( int i = 0 ; i < disks ; i++ )
02966         disk_time[ i ] = msecRunCAC;
02967 
02968     memset( req_time, 0, buffersize*sizeof(float) );
02969     memset( disk_block, 0, buffersize*sizeof(float) );
02970     memset( disk_time, 0, disks*sizeof(long double) );
02971 
02972     #ifdef RIO_DEBUG2
02973     if( m_log.is_open() ) m_log << "getInfoAboutRequestList: Starting reading ... " << endl;
02974     #endif
02975 
02976     // Read next request until the end of file
02977     while( !feof( file ) )
02978     {
02979         if( fscanf( file, "%f\n", &duration ) == 1 )
02980         {
02981             duration = duration * 1000.0; // duration in msec
02982 
02983             curblock = ( PSEQBLOCKLIST )malloc( sizeof( SEQBLOCKLIST ) );
02984             if( curblock != NULL )
02985             {
02986                 curblock->id       = m_Id; //to change
02987                 curblock->block    = block;
02988                 curblock->duration = duration;
02989 
02990                 rc = object->MapBlock( (RioBlock) block, 0, &numreps, Reps);
02991                 if( rc )
02992                 {
02993                     RioErr << "getInfoAboutRequestList: Error at MapBlock"
02994                            << endl;
02995                 }
02996                 else
02997                 {
02998                     curblock->disk          = Reps[0].disk;
02999                     curblock->physicalblock = Reps[0].block;
03000                 }
03001 
03002                 curblock->next     = NULL;
03003                 curblock->prev     = NULL;
03004 
03005                 if( block >=  buffersize )
03006                 {
03007                     curblock->csystime = req_time[block % buffersize] + time;
03008                     time += req_time[ block % buffersize ];
03009                 }
03010                 else
03011                 {
03012                     if( block < SBufferSize )
03013                     {
03014                         // When the requests to full server side buffer will be:
03015                         // shift msec to run cac
03016                         curblock->csystime = msecRunCAC;
03017                     }
03018                     else
03019                     {
03020                         // When the requests to full client buffer will be:
03021                         // shift msec to run CAC + 1 RTT
03022                         curblock->csystime = msecRunCAC +
03023                                              m_AverageRTTtoClient_msec;
03024                     }
03025 
03026                     if( disk_time[ curblock->disk ] < curblock->csystime )
03027                         disk_time[ curblock->disk ] = curblock->csystime;
03028 
03029                     disk_time[ curblock->disk ] +=
03030                             getEstimatedDiskServiceTime( curblock->disk );
03031 
03032                     disk_block[ block ] = disk_time[ curblock->disk ];
03033 
03034                     #ifdef RIO_DEBUG2
03035                     if( m_log.is_open() ) m_log << " id "     <<  curblock->id
03036                           << " block "  <<  curblock->block
03037                           << " disk "   << curblock->disk
03038                           << " servicetime "
03039                           << getEstimatedDiskServiceTime( curblock->disk )
03040                           << " disk_time "
03041                           << disk_time[ curblock->disk]  << endl;
03042                     #endif
03043 
03044                     if( (block + 1) == buffersize )
03045                     {
03046                         double maxdisktime = disk_time[ 0 ];
03047                         for( int i = 1; i < disks; i++ )
03048                         {
03049                             if( disk_time [ i ] > maxdisktime )
03050                                 maxdisktime = disk_time[ i ];
03051                         }
03052                         double timetotx = msecRunCAC +
03053                                           m_AverageRTTtoClient_msec;
03054                         for( uint i = 0; i < CBufferSize; i++ )
03055                         {
03056                             if( disk_block [ i ] > timetotx )
03057                                 timetotx = disk_block[ i ];
03058                             timetotx += timeToTransmit_msec;
03059                         }
03060 
03061                         //time is set to when the requests for Totalbuffers
03062                         //blocks will be:
03063                         //max between until tx all BC blocks and read BS + BC blocks
03064                         double maxtime;
03065                         if( timetotx > maxdisktime )
03066                             maxtime = timetotx;
03067                         else
03068                             maxtime = maxdisktime;
03069                         time = maxtime + m_AverageRTTtoClient_msec;
03070 
03071                         #ifdef RIO_DEBUG2
03072                         if( m_log.is_open() )
03073                         {
03074                             m_log << "maxdisktime " << maxdisktime << endl;
03075                             m_log << "timetotx " << timetotx << endl;
03076                             m_log << "max + RTT = time => " << time << endl;
03077                         }
03078                         #endif
03079                     }
03080                 }
03081                 req_time[ block % buffersize ] = duration;
03082             }
03083             else 
03084             {
03085                 RioErr << "getInfoAboutRequestList: Could not allocate memory. "
03086                        << endl;
03087     
03088                 free( req_time );
03089                 free( disk_block );
03090                 free( disk_time );
03091 
03092                 return NULL;
03093             }
03094             if( prevblock ) {
03095                 curblock->prev = prevblock;
03096                 prevblock = prevblock->next = curblock;
03097             }
03098             else
03099                 prevblock = init_stream = curblock;
03100             duration = 0;
03101         }
03102         else
03103         {
03104             if( !feof( file ) )
03105             {
03106                 RioErr << "getInfoAboutRequestList: Could not read from file."
03107                        << endl;
03108 
03109                 free( req_time );
03110                 free( disk_block );
03111                 free( disk_time );
03112 
03113                 return NULL;
03114             }
03115         }
03116         block++;
03117     }
03118 
03119     m_TotalBlocks = block;
03120 
03121     fclose( file );
03122 
03123     #ifdef RIO_DEBUG2
03124     if( m_log.is_open() ) m_log << "getInfoAboutRequestList: Closing file." << endl;
03125     #endif
03126 
03127     final_time = Timer.CurrentTime();
03128     time = getInterval( initial_time, final_time );
03129     m_SampleGenReqListTime = time;
03130     s_mgr->SetAverageGenReqListTime( time , this );
03131 
03132     #ifdef RIO_DEBUG2
03133     if( m_log.is_open() )
03134     {
03135         m_log<<"---------------------------------------------------------------- "<<endl;
03136         m_log<<"getInfoAboutRequestList Time = " << time << " msec "              <<endl;
03137         m_log<<"---------------------------------------------------------------- "<<endl;
03138     }
03139     #endif
03140 
03141     free( req_time );
03142     free( disk_block );
03143     free( disk_time );
03144 
03145     return init_stream;
03146 }

struct timeval RioStream::getLastArrivalTime (  )  [read]

Definition at line 2664 of file server/StreamManager.cpp.

02665 {
02666     return m_LastArrivalTime;
02667 }

u32 RioStream::getMaximumBlockToSend (  ) 

Definition at line 2844 of file server/StreamManager.cpp.

02845 {
02846     if( m_FinishedMovie == true )
02847     {
02848         return m_PosAtClientsList->nextBlockToBeRequested->block + 1;
02849     }
02850     else
02851     {
02852         return m_PosAtClientsList->nextBlockToBeRequested->block;
02853     }
02854 }

u32 RioStream::getMinimumBlockToSend (  ) 

Definition at line 2856 of file server/StreamManager.cpp.

02857 {
02858     return m_MinimumBlockToSend;
02859 }

void RioStream::getMutexUpdateEvent (  ) 

Definition at line 2395 of file server/StreamManager.cpp.

02396 {
02397     pthread_mutex_lock( &MutexUpdateEvent );
02398 }

long double RioStream::getNetworkRate (  ) 

Definition at line 2764 of file server/StreamManager.cpp.

02765 {
02766     return s_mgr->GetNetworkRate();
02767 }

int RioStream::getNumberOfBlocks (  ) 

Definition at line 2694 of file server/StreamManager.cpp.

02695 {
02696     return m_TotalBlocks;
02697 }

int RioStream::getNumberOfBuffersForEachClient (  ) 

Definition at line 2731 of file server/StreamManager.cpp.

02732 {
02733     return s_mgr->GetNumberOfBuffersForEachClient();
02734 }

int RioStream::getPlayBlock (  ) 

Definition at line 2681 of file server/StreamManager.cpp.

02682 {
02683     if( m_StartingToSend )
02684     {
02685         return 0;
02686     }
02687     else
02688     {
02689         return(m_PosAtClientsList->nextBlockToBeRequested->block -
02690                getClientBufferSize());
02691     }
02692 }

bool RioStream::getPrefetchedAllBlocks (  ) 

Definition at line 2882 of file server/StreamManager.cpp.

02883 {
02884     return m_PrefetchedAllBlocks;
02885 }

int RioStream::getQueueSize (  ) 

Definition at line 2770 of file server/StreamManager.cpp.

02771 {
02772     return m_nQueue;
02773 }

double RioStream::getRTTtoClient (  ) 

Definition at line 2776 of file server/StreamManager.cpp.

02777 {
02778     return m_AverageRTTtoClient_msec;
02779 }

int RioStream::getServerBufferStatus (  ) 

Definition at line 2705 of file server/StreamManager.cpp.

02706 {
02707     #ifdef RIO_DEBUG2
02708     if( m_log.is_open() ) m_log << " m_ServerBufferStatus " << m_ServerBufferStatus
02709           << " m_TokensToSend " << m_TokensToSend
02710           << endl;
02711     #endif
02712 
02713     if( m_TokensToSend > 0 )
02714         return ( m_ServerBufferStatus - m_TokensToSend );
02715     else
02716         return m_ServerBufferStatus;
02717 }

int RioStream::getTokensToDisk (  ) 

Definition at line 2658 of file server/StreamManager.cpp.

02659 {
02660     return m_TokensToDisk;
02661 }

int RioStream::getTokensToSend (  ) 

Definition at line 2804 of file server/StreamManager.cpp.

02805 {
02806     return m_TokensToSend;
02807 }

RioTrafficType RioStream::GetType (  ) 

Definition at line 1196 of file server/StreamManager.cpp.

01197 {
01198     return m_Type;
01199 }

void RioStream::incMinimumBlockToSend (  ) 

Definition at line 2861 of file server/StreamManager.cpp.

02862 {
02863     pthread_mutex_lock( &MutexMinBlock );
02864     m_MinimumBlockToSend++;
02865     pthread_mutex_unlock( &MutexMinBlock );
02866     #ifdef RIO_DEBUG2
02867     if( m_log.is_open() ) m_log << "MinimumBlockToSend incremented : " << m_MinimumBlockToSend
02868           << endl;
02869     #endif
02870 }

void RioStream::incTokensToSend (  ) 

Definition at line 2810 of file server/StreamManager.cpp.

02811 {
02812     pthread_mutex_lock( &MutexTokensToSend );
02813     m_TokensToSend++;
02814     pthread_mutex_unlock( &MutexTokensToSend );
02815     #ifdef RIO_DEBUG2
02816     if( m_log.is_open() ) m_log << "TokensToSend incremented : " << m_TokensToSend << endl;
02817     #endif
02818 }

void RioStream::Initialize ( CStreamManager mgr  ) 

Definition at line 1025 of file server/StreamManager.cpp.

01026 {
01027     s_mgr = mgr;
01028 }

int RioStream::MaxRequests (  ) 

Definition at line 1184 of file server/StreamManager.cpp.

01185 {
01186     if( m_Status != StreamStatusOpened )
01187     {
01188         return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
01189     }
01190 
01191     return m_UserMaxRequests;
01192 }

void RioStream::NewBlockArrival (  ) 

Definition at line 2791 of file server/StreamManager.cpp.

02792 {
02793     pthread_mutex_lock( &MutexBlock );
02794     m_ServerBufferStatus++;
02795     pthread_cond_broadcast(&ConditionBlockArrival);
02796     pthread_mutex_unlock( &MutexBlock );
02797     #ifdef RIO_DEBUG2
02798     if( m_log.is_open() ) m_log << "BufferStatus incremented : " << m_ServerBufferStatus << endl;
02799     #endif
02800 }

void RioStream::NRTHold ( EventDataRequest event  )  [private]

Definition at line 1819 of file server/StreamManager.cpp.

01820 {
01821     // If this is the first hold request for this stream
01822     // put stream on list of stream which are currently holding requests
01823     if( m_nQueue == 0 )
01824     {
01825         s_mgr->m_NRTHoldList.Put( this );
01826     }
01827 
01828     m_Queue.Put( event );
01829     m_nQueue++;
01830     m_QueuedCount++;
01831 }

int RioStream::OpenObject ( char *  ObjectName,
RioAccess  Access,
struct timeval  RTT,
int  BufferSize,
RioStreamObj **  StreamObj 
)

Definition at line 2157 of file server/StreamManager.cpp.

02162 {
02163     int rc;
02164 
02165     // Check if object access matches stream direction
02166     if( (s_Direction == RioStreamDirectionRead )&&( Access & RIO_WRITE_ACCESS) )
02167     {
02168         return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
02169     }
02170     if( (s_Direction == RioStreamDirectionWrite )&&( Access & RIO_READ_ACCESS) )
02171     {
02172         return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
02173     }
02174 
02175     if( m_Status != StreamStatusOpened )
02176     {
02177         return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
02178     }
02179     
02180     RioStreamObj *op = new RioStreamObj( s_mgr, this );
02181     if( op == 0 )
02182     {
02183         return ERROR_STREAMMANAGER + ERROR_MEMORY;
02184     }
02185 
02186     op->o_object = 0;
02187 
02188     rc = s_mgr->m_ObjectManager->Open( ObjectName, Access, &op->o_object );
02189 
02190     if( rc == 0 )
02191     {
02192         *StreamObj = op;
02193 
02194         // if it is VBR stream, if it is using buffer, initialize it
02195         if( (GetType() == RIO_TRAFFIC_VBR )&&( s_mgr->m_UseServerSideBuffers ) )
02196         {
02197             struct timeval initial_time, final_time;
02198             double time;
02199 
02200             initial_time = Timer.CurrentTime();
02201 
02202             m_ClientBufferSize = BufferSize;
02203             m_AverageRTTtoClient_msec = RTT.tv_sec * 1000.0 +
02204                                         RTT.tv_usec / 1000.0;
02205 
02206             // get file name (with information to admission control)
02207             char infoFileName[255]="", mmobjectname[255]="";
02208             strcpy( mmobjectname, ObjectName );
02209             char *pos = strrchr( mmobjectname, '.' );
02210             strcpy( pos+1, "cactimes" );
02211             strcpy( infoFileName, s_mgr->m_FileRoot );
02212             strcat( infoFileName, "/" ) ;
02213             strcat( infoFileName, mmobjectname ) ;
02214 
02215             m_FirstArrivalTime = initial_time.tv_sec  * 1000.0 +
02216                                  initial_time.tv_usec / 1000.0;
02217 
02218             #ifdef RIO_DEBUG2
02219             if( m_log.is_open() )
02220                 m_log << "OpenObject: Video : " << ObjectName
02221                       << endl << "OpenObject: CAC info: " << infoFileName
02222                       << endl << "OpenObject: FirstArrivalTime "
02223                       << m_FirstArrivalTime << endl;
02224             #endif
02225 
02226             m_WaitingQueueSize = s_mgr->UpdateNumberOfWaitingClients( 1 );
02227 
02228             pthread_mutex_lock( &s_mgr->MutexRunCAC );
02229 
02230             if( ( m_RequestList = getInfoAboutRequestList( infoFileName,
02231                                                       op->o_object )) != NULL )
02232             {
02233                 if( s_mgr->m_UseNewCAC )
02234                 {
02235 
02236                     m_PosAtClientsList = s_mgr->CanAdmit( m_RequestList,
02237                                                      this,
02238                                                      m_AverageRTTtoClient_msec,
02239                                                      m_ClientBufferSize );
02240                     final_time = Timer.CurrentTime();
02241                     time = getInterval( initial_time, final_time );
02242                     m_SampleCACTime = time;
02243                     int index;
02244 
02245                     if( m_PosAtClientsList != NULL )
02246                     {
02247                         index = 0;
02248                     }
02249                     else
02250                     {
02251                         index = 1;
02252                     }
02253 
02254                     m_SampleAdmissionProcessTime =  m_SampleGenReqListTime+
02255                                                     m_SampleSortListTime+
02256                                                     m_SampleSimulationTime;
02257 
02258                     m_SampleWaitingTime =  m_SampleCACTime - 
02259                                            m_SampleAdmissionProcessTime;
02260 
02261                     s_mgr->SetAverageWaitingTime( m_SampleWaitingTime,
02262                                                   this, index,
02263                                                   m_WaitingQueueSize );
02264                     s_mgr->SetAverageAdmissionProcessTime(
02265                                             m_SampleAdmissionProcessTime,
02266                                             this, index );
02267 
02268                     s_mgr->SetAverageCACTime( time, this, index );
02269 
02270                     if( s_mgr->m_log.is_open() )
02271                     {
02272                         s_mgr->m_log << "Time to generate list: "
02273                                      << m_SampleGenReqListTime << " msec."
02274                                      << endl;
02275                         s_mgr->m_log << "Time to sort list: "
02276                                      << m_SampleSortListTime << " msec."
02277                                      << endl;
02278                         s_mgr->m_log << "Time to simulate list: "
02279                                      << m_SampleSimulationTime << " msec."
02280                                      << endl;
02281                         s_mgr->m_log << "Time to admission: "
02282                                      << m_SampleCACTime << " msec."
02283                                      << endl;
02284                         s_mgr->m_log << "Waiting time: "
02285                                      <<  m_SampleWaitingTime << " msec."
02286                                      << endl;
02287                         s_mgr->m_log << "Waiting queue size: "
02288                                      <<  m_WaitingQueueSize << endl;
02289                     }
02290 
02291                     #ifdef RIO_DEBUG2
02292                     if( m_log.is_open() )
02293                         m_log << "Time to admission: " << time << " msec."
02294                               << endl;
02295                     #endif
02296                     if( m_PosAtClientsList != NULL )
02297                     {
02298                         if( s_mgr->m_log.is_open() )
02299                             s_mgr->m_log << "OpenObject: New client admitted. "
02300                                          << "(RTT " << m_AverageRTTtoClient_msec
02301                                          << " msec)" << endl;
02302 
02303                         #ifdef RIO_DEBUG2
02304                         if( m_log.is_open() )
02305                             m_log << "OpenObject: New client admitted." << endl;
02306                         #endif
02307                     }
02308                     else
02309                     {
02310                         if( s_mgr->m_log.is_open() )
02311                             s_mgr->m_log << "OpenObject: Could not admit new "
02312                                          << "client" << endl;
02313 
02314                         #ifdef RIO_DEBUG2
02315                         if( m_log.is_open() )
02316                             m_log << "OpenObject: Could not admit new client "
02317                                   << endl;
02318                         #endif
02319                         if( s_mgr->m_log.is_open() )
02320                             s_mgr->m_log << "Deleting StreamObj " << op << endl;
02321 
02322                         delete op;
02323                         return -1;
02324                     }
02325                 }
02326                 else
02327                 {
02328                     m_PosAtClientsList = s_mgr->InsertNewClient( m_RequestList,
02329                                                                  this );
02330                     s_mgr->UpdateNumberOfWaitingClients(-1);
02331                     pthread_mutex_unlock( &s_mgr->MutexRunCAC );
02332                 }
02333 
02334                 // initialize StartingToSend
02335                 m_StartingToSend = true;
02336 
02337                 setMinimumBlockToSend(
02338                         m_PosAtClientsList->nextBlockToBeRequested->block );
02339                 m_FinishedMovie = false;
02340 
02341                 // now it creates the server side buffer equals to
02342                 // s_mgr->GetNumberOfBuffersForEachClient()
02343                 if( s_mgr->GetNumberOfBuffersForEachClient() > 0 )
02344                 {
02345                     m_Buffer = new ClientBuffer[
02346                                     s_mgr->GetNumberOfBuffersForEachClient()];
02347                     if( m_Buffer == 0 )
02348                     {
02349                         RioErr << "Could not allocate buffer."<< endl;
02350                         return -1;
02351                     }
02352                     else
02353                     {
02354                         for( int i = 0;
02355                              i < s_mgr->GetNumberOfBuffersForEachClient();
02356                              i++
02357                            )
02358                         {
02359                             prefetchBlock( i,
02360                               (EventDataRequest*)EventManager.New( 
02361                                                        EventTypeRTDataRequest ),
02362                                                        op);
02363                             m_nRequests++;
02364                             updateNextBlockToBeFetched( NULL );
02365                         }
02366                     }
02367                 }
02368                 m_TimeToBuffer = Timer.CurrentTime();
02369                 //After this point, send Ok to client
02370             }
02371             else
02372             {
02373                 RioErr << "Could not find " << infoFileName
02374                        << ", allocate memory or read from file."
02375                        << "Not using new control admission and buffers."
02376                        << endl;
02377 
02378                 pthread_mutex_unlock( &s_mgr->MutexRunCAC );
02379                 return S_OK;
02380             }
02381         }
02382         return S_OK;
02383     }
02384 
02385     if( s_mgr->m_log.is_open() )
02386         s_mgr->m_log << "Deleting StreamObj " << op << endl;
02387 
02388     delete op;
02389     return rc;
02390 }

int RioStream::OpenObject ( char *  ObjectName,
RioAccess  Access,
RioStreamObj **  StreamObj 
)

Definition at line 1142 of file server/StreamManager.cpp.

01144 {
01145     int rc;
01146 
01147     // Check if object access matches stream direction
01148     if( ( s_Direction == RioStreamDirectionRead ) &&
01149        ( Access & RIO_WRITE_ACCESS ))
01150     {
01151         return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
01152     }
01153     if( ( s_Direction == RioStreamDirectionWrite ) &&
01154        ( Access & RIO_READ_ACCESS ))
01155     {
01156         return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE;
01157     }
01158 
01159     if( m_Status != StreamStatusOpened )
01160     {
01161         return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED;
01162     }
01163 
01164     RioStreamObj *op = new RioStreamObj( s_mgr, this );
01165     if( op == 0 )
01166     {
01167         return ERROR_STREAMMANAGER + ERROR_MEMORY;
01168     }
01169 
01170     op->o_object = 0;
01171 
01172     rc = s_mgr->m_ObjectManager->Open( ObjectName, Access, &op->o_object );
01173 
01174     if( rc == 0 )
01175     {
01176         *StreamObj = op;
01177         return S_OK;
01178     }
01179     delete op;
01180     return rc;
01181 }

RioStream& RioStream::operator= ( const RioStream x  )  [private]
void RioStream::prefetchBlock ( int  posAtBuffer,
EventDataRequest event,
RioStreamObj StreamObject 
)

Definition at line 2408 of file server/StreamManager.cpp.

02411 {
02412 
02413     m_Buffer[posAtBuffer].blockInfo = m_PosAtClientsList->nextBlockToBeFetched;
02414 
02415     #ifdef RIO_DEBUG2
02416     struct timeval time = Timer.CurrentTime();
02417     long double time_msec = time.tv_sec * 1000.0 + time.tv_usec/1000.0 ;
02418     if( m_log_requests.is_open() )
02419     {
02420         m_log_requests << m_Buffer[posAtBuffer].blockInfo->block << "\t\t"
02421                        << m_Buffer[posAtBuffer].blockInfo->csystime << "\t\t"
02422                        << time_msec - m_FirstArrivalTime << "\t\t"
02423                        << ( time_msec - m_FirstArrivalTime ) -
02424                           m_Buffer[posAtBuffer].blockInfo->csystime
02425                        << endl;
02426     }
02427     #endif
02428 
02429     m_Buffer[posAtBuffer].event = event;
02430     m_Buffer[posAtBuffer].event->Request.BufferId         = -1;
02431     // to be used to confirm if the request was submit to
02432     // storage before cancel it
02433     m_Buffer[posAtBuffer].event->Request.RequestCounter   = -1;
02434     m_Buffer[posAtBuffer].event->Request.Reqid            = 0;
02435     m_Buffer[posAtBuffer].event->Request.Target.IPaddress = 0;
02436     m_Buffer[posAtBuffer].event->Request.Target.Port      = 0;
02437     m_Buffer[posAtBuffer].event->Request.Block            =
02438                                  m_Buffer[posAtBuffer].blockInfo->block;
02439     m_Buffer[posAtBuffer].event->Request.Reps[0].block    =
02440                                  m_Buffer[posAtBuffer].blockInfo->physicalblock;
02441     m_Buffer[posAtBuffer].event->Request.Reps[0].disk     =
02442                                  m_Buffer[posAtBuffer].blockInfo->disk;
02443     m_Buffer[posAtBuffer].event->Request.RepBits   = 0;
02444     m_Buffer[posAtBuffer].event->Request.Status    = 0;
02445     m_Buffer[posAtBuffer].event->Request.Operation = RealTimePrefetchBlock;
02446     m_Buffer[posAtBuffer].event->Request.streamobj = StreamObject;
02447     m_Buffer[posAtBuffer].event->Request.rioobject = StreamObject->o_object;
02448 
02449     StreamObject->m_PendingRequests++;
02450 
02451     if( m_StartingToSend == true )
02452     {
02453         #ifdef RIO_DEBUG2
02454         if( m_log.is_open() )
02455             m_log << "prefetchBlock:starting prefetching block "
02456                   << m_Buffer[posAtBuffer].event->Request.Block
02457                   << " (m_nRequests " << m_nRequests << ")"
02458                   << " m_ActiveRequests "
02459                   << s_mgr->m_ActiveRequests + 1
02460                   << endl;
02461         #endif
02462         s_mgr->m_Router->Put( (Event*)m_Buffer[posAtBuffer].event );
02463         s_mgr->m_ActiveRequests++;
02464     }
02465     else
02466     {
02467         if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) )
02468         {
02469             #ifdef RIO_DEBUG2
02470             if( m_log.is_open() )
02471                 m_log << "prefetchBlock:prefetching block (Router) "
02472                       << m_Buffer[posAtBuffer].event->Request.Block
02473                       << " (m_nRequests " << m_nRequests << ")"
02474                       << " m_ActiveRequests "
02475                       << s_mgr->m_ActiveRequests + 1
02476                       << endl;
02477             #endif
02478             s_mgr->m_Router->Put( (Event*)m_Buffer[posAtBuffer].event );
02479             s_mgr->m_ActiveRequests++;
02480             decTokensToDisk();
02481         }
02482         else
02483         {
02484             #ifdef RIO_DEBUG2
02485             if( m_log.is_open() )
02486                 m_log << "prefetchBlock:prefetching block (RTQueue) "
02487                       << m_Buffer[posAtBuffer].event->Request.Block
02488                       << " (m_nRequests " << m_nRequests << ")"
02489                       << endl;
02490             #endif
02491             RTHold( m_Buffer[posAtBuffer].event );
02492         }
02493     }
02494 }

void RioStream::ProcessNRTQueue (  )  [private]

Definition at line 1930 of file server/StreamManager.cpp.

01931 {
01932     EventDataRequest* request;
01933 
01934     // Process just one queued request at time
01935     // (round robin across all non real-time streams)
01936     if( m_nQueue > 0 ) // Just in case: Not really necessary
01937     {
01938         // retrieve request from queue
01939         request = m_Queue.Get();
01940         m_nQueue--;
01941 
01942         // Increment total number of requests sent to router
01943         s_mgr->m_ActiveRequests++;
01944 
01945         // Send request to router
01946         s_mgr->m_Router->Put( (Event*)request );
01947     }
01948     // If no requests left on queue, remove stream from list of streams
01949     // with hold requests
01950     if( m_nQueue == 0 )
01951         s_mgr->m_NRTHoldList.Remove( this );
01952 }

void RioStream::ProcessRTQueue (  )  [private]

Definition at line 1834 of file server/StreamManager.cpp.

01835 {
01836 
01837     EventDataRequest* request;
01838 
01839     struct timeval t = Timer.CurrentTime();
01840     t.tv_usec += RELEASE_TIME_TOLERANCE;
01841     if( t.tv_usec > 1000000 )
01842     {
01843         t.tv_sec  += 1;
01844         t.tv_usec -= 1000000;
01845     }
01846 
01847     if( m_PosAtClientsList != NULL )
01848     {
01849         struct timeval clocknow = Timer.CurrentTime();
01850 
01851         // updates number of tokensToDisk
01852         updateTokensToDisk( clocknow );
01853 
01854         // Process maximum queued requests without violating stream rate
01855         while(( m_nQueue > 0 ) && ( getTokensToDisk() > 0 ))
01856         {
01857             #ifdef RIO_DEBUG2
01858             if( m_log.is_open() )
01859                 m_log << "Stream:DataReq 1 - ProcessRTQueue "
01860                       << " Recuperando da fila."
01861                       << endl;
01862             #endif
01863             // retrieve request from queue
01864             request = m_Queue.Get();
01865             m_nQueue--;
01866 
01867             // Increment total number of requests sent to router
01868             s_mgr->m_ActiveRequests++;
01869 
01870             // Send request to router
01871             #ifdef RIO_DEBUG2
01872             if( m_log.is_open() ) m_log << "Stream:DataReq - ProcessRTQueue "
01873                   << " Enviando pedidos recuperados da fila para o router111 "
01874                   << endl;
01875             #endif
01876             s_mgr->m_Router->Put( (Event*)request );
01877             decTokensToDisk();
01878         }
01879     }
01880     else
01881     {
01882         // Process maximum queued requests without violating stream rate
01883         while( ( m_nQueue > 0 ) &&
01884                ( ( t.tv_sec > m_NextRelease.tv_sec) ||
01885                  ( ( t.tv_sec == m_NextRelease.tv_sec  ) &&
01886                    ( t.tv_usec >= m_NextRelease.tv_usec)
01887                  )
01888                )
01889              )
01890         {
01891             // retrieve request from queue
01892             #ifdef RIO_DEBUG2
01893             if( m_log.is_open() )
01894                 m_log << "Stream:DataReq 2 - ProcessRTQueue "
01895                       << " Recuperando da fila."
01896                       << endl;
01897             #endif
01898             request = m_Queue.Get();
01899             m_nQueue--;
01900 
01901             // update new release time for next request
01902             m_NextRelease.tv_sec  += m_ArrivalInterval.tv_sec;
01903             m_NextRelease.tv_usec += m_ArrivalInterval.tv_usec;
01904             if( m_NextRelease.tv_usec > 1000000 )
01905             {
01906                 m_NextRelease.tv_sec  += 1;
01907                 m_NextRelease.tv_usec -= 1000000;
01908             }
01909 
01910             // Increment total number of requests sent to router
01911             s_mgr->m_ActiveRequests++;
01912 
01913             // Send request to router
01914             #ifdef RIO_DEBUG2
01915             if( m_log.is_open() )
01916                 m_log << "Stream:DataReq - ProcessRTQueue "
01917                       << " Enviando pedidos recuperados da fila para o router "
01918                       << endl;
01919             #endif
01920             s_mgr->m_Router->Put( (Event*)request );
01921         }
01922     }
01923     // If no requests left on queue, remove stream from list of streams
01924     // with hold requests
01925     if( m_nQueue == 0 )
01926         s_mgr->m_RTHoldList.Remove( this );
01927 }

void RioStream::releaseMutexUpdateEvent (  ) 

Definition at line 2400 of file server/StreamManager.cpp.

02401 {
02402     pthread_mutex_unlock( &MutexUpdateEvent );
02403 }

void RioStream::RequestCompleted ( Event Request  )  [private]

Definition at line 1955 of file server/StreamManager.cpp.

01956 {
01957     DataRequest* Req = & ((( EventDataRequest* )Request )->Request );
01958 
01959     // Save request info for sending reply to client after request is free
01960     DataRequestOperation Operation = Req->Operation;
01961     CommunicationAddress Target    = Req->Target;
01962     int Reqid                     = Req->Reqid;
01963     RioResult Status               = Req->Status;
01964     NATData result;
01965     // TODO: Remover depois, quando a RioNeti e a NetMgr nao forem mais usadas.
01966     unsigned int StorageNumber;
01967 
01968     // TODO: Remover depois, quando a RioNeti e a NetMgr nao forem mais usadas
01969     // (obtem o numero de storages).
01970     s_mgr->m_Router->GetNumberOfStorageNodes( &StorageNumber );
01971 
01972     // Free request
01973     EventManager.Free( Request );
01974 
01975     m_nRequests--;
01976     s_mgr->m_ActiveRequests--;
01977 
01978     #ifdef RIO_DEBUG2
01979     struct in_addr client_address;
01980     client_address.s_addr = Target.IPaddress;
01981     RioErr << "[RioStream] RequestCompleted Target: "
01982            << "IPAddr = " << inet_ntoa(client_address)
01983            << ", Port = " << Target.Port << endl;
01984     #endif
01985 
01986     // Check if stream was closed
01987     if( m_Status == StreamStatusClosed )
01988     {
01989         if( m_nRequests == 0 )
01990         {
01991             s_mgr->m_CloseList.Remove( this );
01992             s_mgr->m_used--;
01993             // specific for real-time streams
01994             if( m_Type == RIO_TRAFFIC_CBR )
01995             {
01996                 s_mgr->m_UsedRate -= s_Rate;
01997             }
01998             if( (m_PosAtClientsList != NULL) &&
01999                 (s_mgr->m_UseServerSideBuffers)
02000               )
02001             {
02002                 s_mgr->RemoveClient( m_PosAtClientsList );
02003                 if( m_Buffer )
02004                 {
02005                     delete m_Buffer;
02006                     m_Buffer = NULL;
02007                 }
02008             }
02009             #ifdef RIO_DEBUG2
02010             RioErr << "Closing mlog"    << endl;
02011             m_log.close();
02012             #endif
02013             #ifdef RIO_DEBUG2
02014             RioErr << "Closing mlogreq" << endl;
02015             m_log_requests.close();
02016             #endif
02017             s_mgr->m_FreeList.Put( this );
02018             if( s_mgr->m_log.is_open() )
02019                 s_mgr->m_log << "RequestCompleted: Stream CLOSED. "
02020                              << " Requests: " << m_RequestCount
02021                              << ". Queued requests: " << m_QueuedCount << endl;
02022         }
02023     }
02024     else // else do if( m_Status == StreamStatusClosed )
02025     {
02026         
02027         #ifdef RIO_DEBUG2
02028         RioErr << "RioStream::RequestCompleted Status = " << Status 
02029                << ", Operation = " << Operation << ", ReqID = " << Reqid 
02030                << endl;
02031         #endif       
02032                
02033         // Send reply to client if required
02034         if( ( Operation == NonRealTimeRead ) ||
02035             ( Operation == RealTimeRead    ) ||
02036             ( Operation == RealTimeCancelBlock )
02037           )
02038         {
02039             // For read, just send result to client if there was an error
02040             if( Status != 0 )
02041             {
02042                 #ifdef RIO_DEBUG2
02043                 if( m_log.is_open() )
02044                     m_log << "RequestCompleted SendResult Error " << endl;
02045                 #endif
02046                 RioErr << "RequestCompleted SendResult Error " << endl;
02047 
02048                 NATData input( Target.IPaddress, Target.Port );
02049                 
02050                 // Verifica se existe o mapeamento do servidor para a nova 
02051                 // implementacao.
02052                 // TODO: Remover este if quando a RioNeti e a NetMgr deixarem de
02053                 // ser usadas.
02054                 if( s_mgr->m_Router->m_NAT_map.findElement( input, 
02055                     StorageNumber + 1 ) )
02056                     result = s_mgr->m_Router->m_NAT_map.getElement( input, 
02057                                                             StorageNumber + 1 );
02058                 else                    
02059                     result = s_mgr->m_Router->m_NAT_map.getElement( input, 0 );
02060                 if( result.nat_addr != 0 )
02061                 {    
02062                     // A nova chamada a funcao NetInterface.FindIPAndPort e para 
02063                     // verificarmos se devemos usar a funcao do objeto da nova 
02064                     // classe ou a funcao do objeto da classe antiga.
02065                     if( s_mgr->m_NetInterface->FindIPAndPort( result.nat_addr, 
02066                                                               result.nat_port ) 
02067                       )
02068                         s_mgr->m_NetInterface->SendResult( result.nat_addr, 
02069                                                            result.nat_port,
02070                                                            Reqid, Status );
02071                     else
02072                         s_mgr->m_NetMgr->SendResult( result.nat_addr, 
02073                                                      result.nat_port,
02074                                                      Reqid, Status );
02075                 }
02076                 else
02077                 {                
02078                     // A nova chamada a funcao NetInterface.FindIPAndPort e para 
02079                     // verificarmos se devemos usar a funcao do objeto da nova 
02080                     // classe ou a funcao do objeto da classe antiga.
02081                     if( s_mgr->m_NetInterface->FindIPAndPort( Target.IPaddress, 
02082                                                               Target.Port ) )
02083                         s_mgr->m_NetInterface->SendResult( Target.IPaddress, 
02084                                                            Target.Port,
02085                                                            Reqid, Status );
02086                     else                                         
02087                         s_mgr->m_NetMgr->SendResult( Target.IPaddress, 
02088                                                      Target.Port,
02089                                                      Reqid, Status );
02090                 }
02091             }
02092         }
02093         // write case
02094         else if( Operation == NonRealTimeWrite )
02095         {
02096             NATData input( Target.IPaddress, Target.Port );
02097             // Verifica se existe o mapeamento do servidor para a nova 
02098             // implementacao.
02099             // TODO: Remover este if quando a RioNeti e a NetMgr deixarem de ser
02100             // usadas.
02101             if( s_mgr->m_Router->m_NAT_map.findElement( input, 
02102                                                            StorageNumber + 1 ) )
02103                 result = s_mgr->m_Router->m_NAT_map.getElement( input, 
02104                                                             StorageNumber + 1 );
02105             else
02106                 result = s_mgr->m_Router->m_NAT_map.getElement( input, 0 );
02107             if( result.nat_addr != 0 )
02108             {
02109                 Target.IPaddress = result.nat_addr;
02110                 Target.Port = result.nat_port;
02111             }                
02112             if( result.nat_addr != 0 )
02113             {
02114                 // A nova chamada a funcao NetInterface.FindIPAndPort e para 
02115                 // verificarmos se devemos usar a funcao do objeto da nova 
02116                 // classe ou a funcao do objeto da classe antiga.
02117                 if( s_mgr->m_NetInterface->FindIPAndPort( result.nat_addr, 
02118                                                           result.nat_port ) )
02119                     s_mgr->m_NetInterface->SendResult( result.nat_addr, 
02120                                                        result.nat_port,
02121                                                        Reqid, Status );
02122                 else                                        
02123                     s_mgr->m_NetMgr->SendResult( result.nat_addr, 
02124                                                  result.nat_port,
02125                                                  Reqid, Status );
02126             }
02127             else
02128             {          
02129                 if( s_mgr->m_NetInterface->FindIPAndPort( Target.IPaddress, 
02130                                                           Target.Port ) )
02131                     s_mgr->m_NetInterface->SendResult( Target.IPaddress, 
02132                                                        Target.Port, Reqid, 
02133                                                        Status );
02134                 else                                              
02135                     s_mgr->m_NetMgr->SendResult( Target.IPaddress, Target.Port,
02136                                                  Reqid, Status );
02137             }
02138         }
02139     } // fim do else do if( m_Status == StreamStatusClosed )
02140 
02141     struct in_addr clientip;
02142     clientip.s_addr = m_RemoteAddress.sin_addr.s_addr;
02143     UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip),
02144                             htons( m_RemoteAddress.sin_port ), m_nRequests );
02145     s_mgr->PostITEvent( (MonitorEvent *)it_event );
02146 }

void RioStream::RTHold ( EventDataRequest event  )  [private]

Definition at line 1800 of file server/StreamManager.cpp.

01801 {
01802     // If this is the first hold request for this stream
01803     // put stream on list of stream which are currently holding requests
01804     if( m_nQueue == 0 )
01805     {
01806         #ifdef RIO_DEBUG2
01807         if( m_log.is_open() )
01808             m_log << "Stream:DataReq:colocando first na fila " << endl;
01809         #endif
01810         s_mgr->m_RTHoldList.Put( this );
01811     }
01812 
01813     m_Queue.Put( event );
01814     m_nQueue++;
01815     m_QueuedCount++;
01816 }

void RioStream::setMinimumBlockToSend ( u32  Block  ) 

Definition at line 2871 of file server/StreamManager.cpp.

02872 {
02873     pthread_mutex_lock( &MutexMinBlock );
02874     m_MinimumBlockToSend = Block;
02875     pthread_mutex_unlock( &MutexMinBlock );
02876     #ifdef RIO_DEBUG2
02877     if( m_log.is_open() ) m_log << "MinimumBlockToSend set : " << m_MinimumBlockToSend << endl;
02878     #endif
02879 }

void RioStream::setTokensToSend ( int  value  ) 

Definition at line 2831 of file server/StreamManager.cpp.

02832 {
02833     pthread_mutex_lock( &MutexTokensToSend );
02834     m_TokensToSend = value;
02835     pthread_mutex_unlock( &MutexTokensToSend );
02836     #ifdef RIO_DEBUG2
02837     if( m_log.is_open() ) m_log << "TokensToSend set : " << m_TokensToSend << endl;
02838     #endif
02839 }

int RioStream::startingStream (  ) 

Definition at line 2784 of file server/StreamManager.cpp.

02785 {
02786     return m_StartingToSend;
02787 }

void RioStream::updateNextBlockToBeFetched ( PSEQBLOCKLIST  Next  ) 

Definition at line 2535 of file server/StreamManager.cpp.

02536 {
02537     if( Next == NULL )
02538     {
02539         pthread_mutex_lock( &MutexNextBlockToBeFetched );
02540         if( m_PosAtClientsList->nextBlockToBeFetched->next == NULL )
02541         {
02542             #ifdef RIO_DEBUG2
02543             if( m_log.is_open() ) m_log << "updateNextBlockToBeFetched: prefetchAll true"<< endl;
02544             #endif
02545             m_PrefetchedAllBlocks = true;
02546         }
02547         else
02548         {
02549             m_PosAtClientsList->nextBlockToBeFetched =
02550                            m_PosAtClientsList->nextBlockToBeFetched->next;
02551         }
02552         pthread_mutex_unlock( &MutexNextBlockToBeFetched );
02553     }
02554     else
02555     {
02556         pthread_mutex_lock( &MutexNextBlockToBeFetched );
02557         m_PosAtClientsList->nextBlockToBeFetched = Next;
02558         pthread_mutex_unlock( &MutexNextBlockToBeFetched );
02559     }
02560     #ifdef RIO_DEBUG2
02561     if( m_log.is_open() ) m_log << "updateNextBlockToBeFetched: "
02562           << m_PosAtClientsList->nextBlockToBeFetched->block<< endl;
02563     #endif
02564 }

void RioStream::updateTokensToDisk ( struct timeval  time  ) 

Definition at line 2615 of file server/StreamManager.cpp.

02616 {
02617     pthread_mutex_lock( &MutexTokensToDisk );
02618     while( ( (m_TokensToDiskClock.tv_sec < time.tv_sec ) ||
02619              ( ( m_TokensToDiskClock.tv_sec == time.tv_sec) &&
02620                (m_TokensToDiskClock.tv_usec < time.tv_usec )
02621              )
02622            )
02623             && (m_StartingToSend == false ))
02624     {
02625         m_TokensToDiskClock.tv_sec += m_ArrivalInterval.tv_sec;
02626         m_TokensToDiskClock.tv_usec += m_ArrivalInterval.tv_usec;
02627         if( m_TokensToDiskClock.tv_usec > 1000000 )
02628         {
02629             m_TokensToDiskClock.tv_sec  += 1;
02630             m_TokensToDiskClock.tv_usec -= 1000000;
02631         }
02632 
02633         if( m_TokensToDisk < s_mgr->GetBurstSizeOfEachClient() )
02634         {
02635             m_TokensToDisk++;
02636         }
02637     }
02638     pthread_mutex_unlock( &MutexTokensToDisk );
02639     #ifdef RIO_DEBUG2
02640     if( m_log.is_open() ) m_log << "TokensToDisk updated : " << m_TokensToDisk << endl;
02641     #endif
02642 }


Friends And Related Function Documentation

friend class CStreamManager [friend]

Definition at line 300 of file server/StreamManager.h.

friend class RioStreamObj [friend]

Definition at line 301 of file server/StreamManager.h.


Field Documentation

pthread_cond_t RioStream::ConditionBlockArrival [private]

Definition at line 297 of file server/StreamManager.h.

struct timeval RioStream::m_ArrivalInterval [private]

Definition at line 257 of file server/StreamManager.h.

Definition at line 282 of file server/StreamManager.h.

Definition at line 281 of file server/StreamManager.h.

Definition at line 283 of file server/StreamManager.h.

Definition at line 287 of file server/StreamManager.h.

Definition at line 268 of file server/StreamManager.h.

uint RioStream::m_Id [private]

Definition at line 279 of file server/StreamManager.h.

struct timeval RioStream::m_LastArrivalTime [private]

Definition at line 264 of file server/StreamManager.h.

ofstream RioStream::m_log

Definition at line 233 of file server/StreamManager.h.

Definition at line 233 of file server/StreamManager.h.

int RioStream::m_MaxRequests [private]

Definition at line 253 of file server/StreamManager.h.

Definition at line 290 of file server/StreamManager.h.

struct timeval RioStream::m_NextRelease [private]

Definition at line 256 of file server/StreamManager.h.

int RioStream::m_nQueue [private]

Definition at line 255 of file server/StreamManager.h.

int RioStream::m_nRequests [private]

Definition at line 252 of file server/StreamManager.h.

Definition at line 280 of file server/StreamManager.h.

Definition at line 286 of file server/StreamManager.h.

Definition at line 261 of file server/StreamManager.h.

Definition at line 260 of file server/StreamManager.h.

Definition at line 249 of file server/StreamManager.h.

Definition at line 259 of file server/StreamManager.h.

Definition at line 278 of file server/StreamManager.h.

Definition at line 273 of file server/StreamManager.h.

double RioStream::m_SampleCACTime [private]

Definition at line 272 of file server/StreamManager.h.

Definition at line 269 of file server/StreamManager.h.

Definition at line 271 of file server/StreamManager.h.

Definition at line 270 of file server/StreamManager.h.

Definition at line 274 of file server/StreamManager.h.

Definition at line 284 of file server/StreamManager.h.

Definition at line 289 of file server/StreamManager.h.

Definition at line 250 of file server/StreamManager.h.

struct timeval RioStream::m_TimeToBuffer [private]

Definition at line 266 of file server/StreamManager.h.

Definition at line 288 of file server/StreamManager.h.

struct timeval RioStream::m_TokensToDiskClock [private]

Definition at line 265 of file server/StreamManager.h.

Definition at line 285 of file server/StreamManager.h.

int RioStream::m_TotalBlocks [private]

Definition at line 276 of file server/StreamManager.h.

Definition at line 251 of file server/StreamManager.h.

Definition at line 254 of file server/StreamManager.h.

Definition at line 275 of file server/StreamManager.h.

pthread_mutex_t RioStream::MutexBlock [private]

Definition at line 295 of file server/StreamManager.h.

pthread_mutex_t RioStream::MutexMinBlock [private]

Definition at line 294 of file server/StreamManager.h.

pthread_mutex_t RioStream::MutexNextBlockToBeFetched [private]

Definition at line 293 of file server/StreamManager.h.

pthread_mutex_t RioStream::MutexTokensToDisk [private]

Definition at line 292 of file server/StreamManager.h.

pthread_mutex_t RioStream::MutexTokensToSend [private]

Definition at line 291 of file server/StreamManager.h.

pthread_mutex_t RioStream::MutexUpdateEvent [private]

Definition at line 296 of file server/StreamManager.h.

Definition at line 147 of file server/StreamManager.h.

Definition at line 148 of file server/StreamManager.h.

Definition at line 247 of file server/StreamManager.h.

Definition at line 243 of file server/StreamManager.h.

double RioStream::s_Rate [private]

Definition at line 248 of file server/StreamManager.h.

int RioStream::s_streamid [private]

Definition at line 245 of file server/StreamManager.h.


The documentation for this class was generated from the following files:
Generated on Wed Jul 4 16:03:37 2012 for RIO by  doxygen 1.6.3