#include <StreamManager.h>
Definition at line 143 of file server/StreamManager.h.
enum RioStream::StreamStatus [private] |
Definition at line 237 of file server/StreamManager.h.
00238 { 00239 StreamStatusClosed = 0, // closed or closing 00240 StreamStatusOpened = 1 // Opened and active 00241 } StreamStatus;
RioStream::RioStream | ( | ) | [private] |
Definition at line 961 of file server/StreamManager.cpp.
00962 { 00963 s_mgr = NULL; 00964 s_streamid = 0; 00965 s_Direction = RioStreamDirectionRead; 00966 s_Rate = 0.0; 00967 m_Status = StreamStatusClosed; 00968 m_Type = RIO_TRAFFIC_INVALID; 00969 m_nRequests = 0; 00970 m_MaxRequests = 0; 00971 m_UserMaxRequests = 0; 00972 m_nQueue = 0; 00973 memset( (void *)&m_NextRelease, 0, sizeof( m_NextRelease ) ); 00974 memset( (void *)&m_ArrivalInterval, 0, sizeof( m_ArrivalInterval ) ); 00975 m_RequestCount = 0; 00976 m_QueuedCount = 0; 00977 // m_Queue does not need to be initialized 00978 memset( (void *)&m_LastArrivalTime , 0, sizeof( m_LastArrivalTime ) ); 00979 memset( (void *)&m_TokensToDiskClock, 0, sizeof( m_TokensToDiskClock ) ); 00980 memset( (void *)&m_TimeToBuffer , 0, sizeof( m_TimeToBuffer ) ); 00981 00982 m_FirstArrivalTime = 0.0; 00983 m_SampleGenReqListTime = 0.0; 00984 m_SampleSortListTime = 0.0; 00985 m_SampleSimulationTime = 0.0; 00986 m_SampleCACTime = 0.0; 00987 m_SampleAdmissionProcessTime = 0.0; 00988 m_SampleWaitingTime = 0.0; 00989 m_WaitingQueueSize = 0; 00990 m_TotalBlocks = 0; 00991 m_RequestList = NULL; 00992 m_Id = 0; 00993 m_PosAtClientsList = NULL; 00994 m_Buffer = NULL; 00995 m_AverageRTTtoClient_msec = 0.0; 00996 m_ClientBufferSize = 0; 00997 m_ServerBufferStatus = 0; 00998 m_TokensToSend = 0; 00999 m_PrefetchedAllBlocks = false; 01000 m_FinishedMovie = false; 01001 m_TokensToDisk = 0; 01002 m_StartingToSend = true; 01003 m_MinimumBlockToSend = 0; 01004 pthread_mutex_init( &MutexTokensToSend , NULL ); 01005 pthread_mutex_init( &MutexTokensToDisk , NULL ); 01006 pthread_mutex_init( &MutexNextBlockToBeFetched, NULL ); 01007 pthread_mutex_init( &MutexMinBlock , NULL ); 01008 pthread_mutex_init( &MutexBlock , NULL ); 01009 pthread_mutex_init( &MutexUpdateEvent , NULL ); 01010 pthread_cond_init( &ConditionBlockArrival , NULL ); 01011 01012 Next = NULL; 01013 Previous = NULL; 01014 // m_log does not need to be initialized 01015 // m_log_requests does not need to be initialized 01016 }
RioStream::RioStream | ( | const RioStream & | x | ) | [private] |
RioStream::~RioStream | ( | ) |
Definition at line 1019 of file server/StreamManager.cpp.
01020 { 01021 Close(); 01022 }
void RioStream::cancelBlockAtBuffer | ( | int | positionAtBuffer | ) |
Definition at line 2496 of file server/StreamManager.cpp.
02497 { 02498 getMutexUpdateEvent(); 02499 02500 if( m_Buffer[positionAtBuffer].event != NULL ) 02501 { 02502 m_Buffer[positionAtBuffer].event->Request.Operation=RealTimeCancelBlock; 02503 if( m_Buffer[positionAtBuffer].event->Request.BufferId != -1 ) 02504 { 02505 releaseMutexUpdateEvent(); 02506 decServerBufferStatus(); 02507 //if it was served by Disk 02508 s_mgr->m_Router->Put( (Event*)m_Buffer[positionAtBuffer].event ); 02509 #ifdef RIO_DEBUG2 02510 if( m_log.is_open() ) m_log << "cancelBlockAtBuffer: block " 02511 << m_Buffer[positionAtBuffer].blockInfo->block 02512 <<" canceled!"<< endl; 02513 #endif 02514 } 02515 else 02516 { 02517 releaseMutexUpdateEvent(); 02518 #ifdef RIO_DEBUG2 02519 if( m_log.is_open() ) 02520 m_log << "cancelBlockAtBuffer: setting cancel flag of block " 02521 << m_Buffer[positionAtBuffer].blockInfo->block << endl; 02522 #endif 02523 } 02524 m_Buffer[positionAtBuffer].event = NULL; 02525 } 02526 else 02527 { 02528 releaseMutexUpdateEvent(); 02529 } 02530 }
int RioStream::ClientCanStart | ( | ) |
Definition at line 2569 of file server/StreamManager.cpp.
02570 { 02571 struct timeval final_time; 02572 double time; 02573 // After the client sent the requests, he asks to continue 02574 // but only after the server side buffer is full he can start visualization 02575 #ifdef RIO_DEBUG2 02576 if( m_log.is_open() ) m_log << " Client asked 'can I start'? " << endl; 02577 #endif 02578 if( m_PosAtClientsList != NULL ) 02579 { 02580 while(getServerBufferStatus()<s_mgr->GetNumberOfBuffersForEachClient()) 02581 { 02582 pthread_mutex_lock( &MutexBlock ); 02583 pthread_cond_wait( &ConditionBlockArrival, &MutexBlock ); 02584 pthread_mutex_unlock( &MutexBlock ); 02585 } 02586 final_time = Timer.CurrentTime(); 02587 time = getInterval( m_TimeToBuffer, final_time ); 02588 s_mgr->SetAverageBufferTime( time, this ); 02589 m_TimeToBuffer.tv_sec =(u64)(( time * 1000.0 ) / 1000000 ); 02590 m_TimeToBuffer.tv_usec =(u64)((u64) ( time * 1000.0 ) % 1000000 ); 02591 #ifdef RIO_DEBUG2 02592 if( m_log.is_open() ) m_log << "Time to buffer " << time << endl; 02593 #endif 02594 02595 // after this point, the client will be controlled by tokenstodisk 02596 m_StartingToSend = false; 02597 //initializes the tokenstodisk 02598 m_TokensToDisk = s_mgr->GetBurstSizeOfEachClient(); 02599 02600 //initializes the clock to be used to accumulate tokenstodisk 02601 struct timeval clocknow = Timer.CurrentTime(); 02602 m_TokensToDiskClock.tv_sec = clocknow.tv_sec; 02603 m_TokensToDiskClock.tv_usec = clocknow.tv_usec; 02604 } 02605 #ifdef RIO_DEBUG2 02606 if( m_log.is_open() ) m_log << " Yes. You can start now. m_TokensToDisk : " 02607 << m_TokensToDisk << endl; 02608 #endif 02609 return S_OK; 02610 }
int RioStream::Close | ( | void | ) |
Definition at line 1031 of file server/StreamManager.cpp.
01032 { 01033 int i = 0; 01034 01035 // Close any opened object on this stream 01036 // ### m_ObjectManager->CloseStream(Stream); 01037 01038 // just return if already closed 01039 if( m_Status == StreamStatusClosed ) 01040 return S_OK; 01041 01042 s_mgr->m_MutexTable->Wait(); 01043 01044 m_Status = StreamStatusClosed; 01045 01046 if( s_mgr->m_log.is_open() ) 01047 s_mgr->m_log << "Closing Stream ( " << m_Id << " )" 01048 << " Requests " << m_nRequests 01049 << " Queue " << m_nQueue 01050 << " Request Count " << m_RequestCount 01051 << " Queued Count " << m_QueuedCount << endl; 01052 01053 EventDataRequest* request; 01054 01055 // empty stream queue 01056 while( m_nQueue > 0 ) 01057 { 01058 request = m_Queue.Get(); 01059 m_nQueue--; 01060 m_nRequests--; 01061 EventManager.Free( ( Event* )request ); 01062 } 01063 01064 if( s_mgr->m_RTHoldList.First() == this ) 01065 s_mgr->m_RTHoldList.Remove( s_mgr->m_RTHoldList.First() ); 01066 01067 if( ( m_PosAtClientsList != NULL ) && ( s_mgr->m_UseServerSideBuffers ) ) 01068 { 01069 // need to cancel blocks at Storage 01070 // for each position at buffer, cancel block 01071 for( i = 0 ; i < s_mgr->GetNumberOfBuffersForEachClient(); i++ ) 01072 { 01073 cancelBlockAtBuffer ( i ); 01074 } 01075 } 01076 // ------------------------------------------------------------------------ 01077 01078 // If there are pending requests should wait request completion 01079 // before finishing close 01080 if( m_nRequests > 0 ) 01081 { 01082 if( s_mgr->m_log.is_open() ) 01083 s_mgr->m_log << "Should wait for request completion before closing: " 01084 << m_nRequests << " requests pending" << endl; 01085 s_mgr->m_CloseList.Put( this ); 01086 } 01087 else 01088 { 01089 // If no pending request finish closing 01090 s_mgr->m_used--; 01091 // specific for real-time streams 01092 if( m_Type == RIO_TRAFFIC_CBR ) 01093 { 01094 s_mgr->m_UsedRate -= s_Rate; 01095 } 01096 01097 if( ( m_PosAtClientsList != NULL ) && 01098 ( s_mgr->m_UseServerSideBuffers ) 01099 ) 01100 { 01101 s_mgr->RemoveClient( m_PosAtClientsList ); 01102 if( m_Buffer ) 01103 { 01104 delete m_Buffer; 01105 m_Buffer = NULL; 01106 } 01107 } 01108 #ifdef RIO_DEBUG2 01109 RioErr << "Closing mlog." << endl; 01110 m_log.close(); 01111 #endif 01112 #ifdef RIO_DEBUG2 01113 RioErr << "Closing mlogreq." << endl; 01114 m_log_requests.close(); 01115 #endif 01116 // -------------------------------------------------------------------- 01117 01118 s_mgr->m_FreeList.Put( this ); 01119 if( s_mgr->m_log.is_open() ) 01120 s_mgr->m_log << "Close: Stream CLOSED. Requests: " << m_RequestCount 01121 << ". Queued requests: " << m_QueuedCount 01122 << ". Active streams " << s_mgr->m_used<< endl; 01123 } 01124 01125 s_mgr->m_MutexTable->Release(); 01126 01127 struct in_addr clientip; 01128 clientip.s_addr = m_RemoteAddress.sin_addr.s_addr; 01129 UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip), 01130 htons( m_RemoteAddress.sin_port ), m_nRequests ); 01131 s_mgr->PostITEvent( (MonitorEvent *)it_event ); 01132 01133 return S_OK; 01134 }
int RioStream::DataReq | ( | u32 | reqid, | |
u16 | operation, | |||
u32 | ipaddr, | |||
u16 | port, | |||
u32 | block, | |||
u32 | repbits, | |||
RioStreamObj * | StreamObject | |||
) | [private] |
Definition at line 1215 of file server/StreamManager.cpp.
01219 { 01220 // Set Arrival time with current time 01221 struct timeval ArrivalTime = Timer.CurrentTime(); 01222 01223 #ifdef RIO_DEBUG2 01224 if( m_log.is_open() ) 01225 { 01226 struct in_addr clientip; 01227 double interval = getInterval( m_LastArrivalTime, ArrivalTime ); 01228 clientip.s_addr = ipaddr; 01229 char *info = myInfo( false ); 01230 m_log << endl << info 01231 << "Stream:DataReq => Req ip " << inet_ntoa(clientip) 01232 << " port " << port << " reqid " << reqid 01233 << " block " << block << " TokensToSend " << m_TokensToSend 01234 << " (m_nRequests " << m_nRequests + 1 << ")" 01235 << " arrival " << ArrivalTime.tv_sec << " sec " 01236 << ArrivalTime.tv_usec << " usec " 01237 << "interval " << interval << " msec" 01238 << " m_MaxRequests " << m_MaxRequests << endl; 01239 free( info ); 01240 } 01241 #endif 01242 01243 // Do not accept request if Stream is closed 01244 if( m_Status == StreamStatusClosed ) 01245 { 01246 #ifdef RIO_DEBUG2 01247 if( m_log.is_open() ) 01248 m_log << "Stream:DataReq:ERROR_STREAM_NOT_OPENED!!!" << endl; 01249 #endif 01250 01251 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED; 01252 } 01253 01254 // Check if number of stream active requests does not exceed maximum 01255 if( m_nRequests >= m_MaxRequests ) 01256 { 01257 #ifdef RIO_DEBUG2 01258 if( m_log.is_open() ) 01259 m_log << "Stream:DataReq:Max Number of Active Requests!!! " 01260 << m_nRequests << ">=" << m_MaxRequests << endl; 01261 #endif 01262 01263 RioErr << "Stream:DataReq:Max Number of Active Requests!!! " 01264 << m_nRequests << ">=" << m_MaxRequests << endl; 01265 01266 return ERROR_STREAMMANAGER + ERROR_MAX_STREAM_REQUESTS; 01267 } 01268 // Increment number of stream active requests 01269 m_nRequests++; 01270 m_RequestCount++; 01271 01272 // Get Request slot for new request 01273 EventDataRequest* event; 01274 01275 if( m_Type == RIO_TRAFFIC_NRT ) 01276 { 01277 event = (EventDataRequest*)EventManager.New( EventTypeNRTDataRequest ); 01278 } 01279 else 01280 { 01281 event = (EventDataRequest*)EventManager.New( EventTypeRTDataRequest ); 01282 } 01283 01284 if( operation == READ ) 01285 { 01286 if( m_Type == RIO_TRAFFIC_NRT ) 01287 { 01288 event->Request.Operation = NonRealTimeRead; 01289 } 01290 else 01291 { 01292 event->Request.Operation = RealTimeRead; 01293 } 01294 } 01295 else 01296 { 01297 event->Request.Operation = NonRealTimeWrite; 01298 } 01299 01300 event->Request.streamobj = StreamObject; 01301 event->Request.rioobject = StreamObject->o_object; 01302 event->Request.Reqid = reqid; 01303 event->Request.Target.IPaddress = ipaddr; 01304 event->Request.Target.Port = port; 01305 event->Request.Block = block; 01306 event->Request.RepBits = repbits; 01307 event->Request.Status = 0; // ok so far 01308 event->Request.BufferId = -2; // identifies normal use 01309 01310 // Process request differently depending if it is real-time or not 01311 if( m_Type == RIO_TRAFFIC_NRT ) 01312 { 01313 // Should hold request if there are others being hold or 01314 // if reached maximum number of pending non real-time requests 01315 if( s_mgr->m_HoldingNonRealTime || 01316 ( s_mgr->m_ActiveRequests >= s_mgr->m_MaxActiveRequests )) 01317 { 01318 // Put request on hold until time out processing decides 01319 // it is time to proceess it 01320 NRTHold( event ); 01321 } 01322 // Otherwise send request direct to router 01323 else 01324 { 01325 // Increment total number of requests sent to router 01326 s_mgr->m_ActiveRequests++; 01327 01328 // Send request to router 01329 #ifdef RIO_DEBUG2 01330 if( m_log.is_open() ) 01331 m_log << "Stream:DataReq:Enviando pedido de NRT ao router" 01332 << endl; 01333 #endif 01334 s_mgr->m_Router->Put( (Event*)event ); 01335 } 01336 } 01337 // Real-time case 01338 else 01339 { 01340 // Check if the object was submited to new cac or 01341 // if it is using server side buffers. 01342 if( ( m_PosAtClientsList != NULL ) && 01343 ( s_mgr->m_UseServerSideBuffers ) 01344 ) 01345 { 01346 m_LastArrivalTime = ArrivalTime; 01347 01348 bool changeSequence = false; 01349 bool blockWasAtBuffer = false; 01350 bool usedEvent = false; 01351 01352 incTokensToSend(); // available buffer at client 01353 updateTokensToDisk( ArrivalTime ); // available requests to Disk 01354 01355 #ifdef RIO_DEBUG2 01356 if( m_log.is_open() ) 01357 m_log << " Looking for block at buffer ... ( tokenstodisk " 01358 << getTokensToDisk() << ")"<< endl; 01359 #endif 01360 01361 01362 // verifies if the block is at the buffer and send 01363 for( int i = 0; i < s_mgr->GetNumberOfBuffersForEachClient(); i++ ) 01364 { 01365 if( ( m_Buffer[i].event != NULL ) && 01366 ( (u32)m_Buffer[i].event->Request.Block == block )) 01367 { 01368 #ifdef RIO_DEBUG2 01369 if( m_log.is_open() ) 01370 m_log << " Found it with BufferId " 01371 << m_Buffer[i].event->Request.BufferId 01372 << " updates ip, port and reqid... " << endl; 01373 #endif 01374 01375 blockWasAtBuffer = true; 01376 01377 // because Router could be using to update storage reply 01378 getMutexUpdateEvent(); 01379 01380 // updates information to send 01381 m_Buffer[i].event->Request.Reqid = reqid; 01382 m_Buffer[i].event->Request.Target.IPaddress = ipaddr; 01383 m_Buffer[i].event->Request.Target.Port = port; 01384 01385 incMinimumBlockToSend(); 01386 01387 if( m_Buffer[i].event->Request.BufferId == -1 ) 01388 { 01389 #ifdef RIO_DEBUG2 01390 //if it was not served by Disk, just wait 01391 if( m_log.is_open() ) 01392 m_log << " prefetching and waiting "<< endl; 01393 #endif 01394 releaseMutexUpdateEvent(); 01395 } 01396 else if( m_Buffer[i].event->Request.Status != 0 ) 01397 { 01398 //if error at prefetch, cancel the block 01399 m_Buffer[i].event->Request.Operation = 01400 RealTimeCancelBlock; 01401 releaseMutexUpdateEvent(); 01402 s_mgr->m_Router->Put( (Event*)m_Buffer[i].event ); 01403 decTokensToSend(); //can not send block 01404 decServerBufferStatus(); 01405 01406 #ifdef RIO_DEBUG2 01407 if( m_log.is_open() ) 01408 m_log << " sent to Router -> error at prefetch, " 01409 << " cancel block" << endl; 01410 #endif 01411 } 01412 else 01413 { 01414 //if it was prefetched, just send the block 01415 m_Buffer[i].event->Request.Operation=RealTimeSendBlock; 01416 releaseMutexUpdateEvent(); 01417 s_mgr->m_Router->Put( (Event*)m_Buffer[i].event ); 01418 #ifdef RIO_DEBUG2 01419 if( m_log.is_open() ) 01420 m_log << " sent to Router "<< endl; 01421 #endif 01422 } 01423 01424 m_Buffer[i].event = NULL; 01425 } 01426 } 01427 01428 // updates next block to be requested 01429 if( block != m_PosAtClientsList->nextBlockToBeRequested->block ) 01430 { 01431 //the sequence was changed, have to arrange everything 01432 changeSequence = true; 01433 01434 #ifdef RIO_DEBUG2 01435 if( m_log.is_open() ) 01436 m_log << " sequence changed from " 01437 << m_PosAtClientsList->nextBlockToBeRequested->block 01438 << " to " << block << endl; 01439 #endif 01440 01441 while( ( block > 01442 m_PosAtClientsList->nextBlockToBeRequested->block ) && 01443 ( m_PosAtClientsList->nextBlockToBeRequested->next != 01444 NULL ) 01445 ) 01446 m_PosAtClientsList->nextBlockToBeRequested = 01447 m_PosAtClientsList->nextBlockToBeRequested->next; 01448 01449 while( ( block < 01450 m_PosAtClientsList->nextBlockToBeRequested->block ) && 01451 ( m_PosAtClientsList->nextBlockToBeRequested->prev != 01452 NULL ) 01453 ) 01454 m_PosAtClientsList->nextBlockToBeRequested = 01455 m_PosAtClientsList->nextBlockToBeRequested->prev; 01456 01457 #ifdef RIO_DEBUG2 01458 if( m_log.is_open() ) 01459 m_log << " nbtoberequested " 01460 << m_PosAtClientsList->nextBlockToBeRequested->block 01461 << endl; 01462 #endif 01463 01464 if( blockWasAtBuffer == false ) 01465 { 01466 #ifdef RIO_DEBUG2 01467 if( m_log.is_open() ) 01468 m_log << " block was not at buffer. "<< endl; 01469 #endif 01470 01471 //if block was not buffered, send the block 01472 usedEvent = true; 01473 decTokensToSend(); 01474 if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) ) 01475 { 01476 #ifdef RIO_DEBUG2 01477 if( m_log.is_open() ) 01478 m_log << " (to Router) send block " 01479 << event->Request.Block 01480 << " bufferid " 01481 << event->Request.BufferId 01482 << " m_ActiveRequests " 01483 << s_mgr->m_ActiveRequests + 1 01484 << endl; 01485 #endif 01486 01487 s_mgr->m_Router->Put( (Event*)event ); 01488 s_mgr->m_ActiveRequests++; 01489 decTokensToDisk(); 01490 } 01491 else 01492 { 01493 #ifdef RIO_DEBUG2 01494 if( m_log.is_open() ) 01495 m_log << " (to queue) send block " 01496 << event->Request.Block 01497 << " bufferid " 01498 << event->Request.BufferId << endl; 01499 #endif 01500 01501 RTHold( event ); 01502 } 01503 } 01504 } 01505 01506 if( m_PosAtClientsList->nextBlockToBeRequested->next != NULL ) 01507 { 01508 m_PosAtClientsList->nextBlockToBeRequested = 01509 m_PosAtClientsList->nextBlockToBeRequested->next; 01510 } 01511 else 01512 { 01513 // Doesn't have any more requests 01514 #ifdef RIO_DEBUG2 01515 if( m_log.is_open() ) 01516 m_log << " last block - prefetch all true "<< endl; 01517 #endif 01518 01519 m_FinishedMovie = true; 01520 m_PrefetchedAllBlocks = true; 01521 01522 if( s_mgr->GetNumberOfBuffersForEachClient() != 0 ) 01523 { 01524 EventManager.Free( (Event *) event ); 01525 } 01526 } 01527 01528 #ifdef RIO_DEBUG2 01529 if( m_log.is_open() ) m_log << " updated nbtoberequested " 01530 << m_PosAtClientsList->nextBlockToBeRequested->block 01531 << endl; 01532 #endif 01533 01534 if( m_PrefetchedAllBlocks == true ) 01535 { 01536 m_nRequests--; 01537 } 01538 01539 // updates next block to be fetched 01540 if( s_mgr->GetNumberOfBuffersForEachClient() == 0 ) 01541 { 01542 updateNextBlockToBeFetched( 01543 m_PosAtClientsList->nextBlockToBeRequested); 01544 } 01545 else 01546 { 01547 if( changeSequence && ( m_FinishedMovie == false ) ) 01548 { 01549 #ifdef RIO_DEBUG2 01550 if( m_log.is_open() ) 01551 m_log << " updating nbtobefetched ... "; 01552 #endif 01553 01554 if( ( block >= 01555 m_PosAtClientsList->nextBlockToBeFetched->block ) || 01556 ( ( block + s_mgr->GetNumberOfBuffersForEachClient()) < 01557 ( m_PosAtClientsList->nextBlockToBeFetched->block - 01558 s_mgr->GetNumberOfBuffersForEachClient() 01559 ) 01560 ) 01561 ) 01562 { 01563 //for each position at buffer, cancel block 01564 for( int i=0; 01565 i < s_mgr->GetNumberOfBuffersForEachClient(); 01566 i++ 01567 ) 01568 { 01569 cancelBlockAtBuffer ( i ); 01570 m_nRequests++; 01571 } 01572 updateNextBlockToBeFetched( 01573 m_PosAtClientsList->nextBlockToBeRequested ); 01574 } 01575 else 01576 if( ( block + s_mgr->GetNumberOfBuffersForEachClient() ) < 01577 m_PosAtClientsList->nextBlockToBeFetched->block ) 01578 { 01579 updateNextBlockToBeFetched( 01580 m_PosAtClientsList->nextBlockToBeRequested ); 01581 } 01582 setMinimumBlockToSend( 01583 m_PosAtClientsList->nextBlockToBeRequested->block ); 01584 if( m_PosAtClientsList->nextBlockToBeFetched->next == NULL ) 01585 { 01586 m_PrefetchedAllBlocks = true; 01587 } 01588 else 01589 { 01590 m_PrefetchedAllBlocks = false; 01591 } 01592 } 01593 } 01594 01595 // prefetch blocks 01596 01597 if( ( s_mgr->GetNumberOfBuffersForEachClient() == 0 ) && 01598 ( changeSequence == false )) 01599 { 01600 decTokensToSend(); 01601 if( m_StartingToSend == true ) 01602 { 01603 #ifdef RIO_DEBUG2 01604 if( m_log.is_open() ) 01605 m_log << "Stream:DataReq:Sending prefetching blocks to" 01606 << " router"<< endl; 01607 #endif 01608 01609 s_mgr->m_Router->Put( (Event*)event ); 01610 s_mgr->m_ActiveRequests++; 01611 } 01612 else 01613 { 01614 if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) ) 01615 { 01616 #ifdef RIO_DEBUG2 01617 if( m_log.is_open() ) 01618 m_log << "Stream:DataReq:Sending prefetching blocks " 01619 << "to router 2"<< endl; 01620 #endif 01621 01622 s_mgr->m_Router->Put( (Event*)event ); 01623 s_mgr->m_ActiveRequests++; 01624 decTokensToDisk(); 01625 } 01626 else 01627 { 01628 RTHold( event ); 01629 } 01630 } 01631 } 01632 int i = 0; 01633 int pos = -1; 01634 bool prefetch; 01635 01636 while(( i < s_mgr->GetNumberOfBuffersForEachClient()) && 01637 ( m_PrefetchedAllBlocks == false )) 01638 { 01639 prefetch = false; 01640 if( m_Buffer[i].event == NULL ) 01641 { 01642 prefetch = true; 01643 } 01644 else 01645 if( (( m_Buffer[i].blockInfo->block < getMinimumBlockToSend() ) 01646 ||( m_Buffer[i].blockInfo->block >= 01647 ( m_PosAtClientsList->nextBlockToBeRequested->block 01648 + s_mgr->GetNumberOfBuffersForEachClient()))) 01649 && changeSequence ) 01650 { 01651 cancelBlockAtBuffer( i ); 01652 m_nRequests++; 01653 prefetch = true; 01654 } 01655 else if( ( m_Buffer[i].blockInfo->block >= 01656 m_PosAtClientsList->nextBlockToBeFetched->block ) 01657 && ( changeSequence )) 01658 { 01659 if( pos == -1 ) 01660 { 01661 pos = i; 01662 } 01663 else 01664 { 01665 if( m_Buffer[pos].blockInfo->block < 01666 m_Buffer[i].blockInfo->block ) 01667 pos = i; 01668 } 01669 } 01670 if( prefetch == true ) 01671 { 01672 if( usedEvent ) 01673 { 01674 event = (EventDataRequest*) 01675 EventManager.New( EventTypeRTDataRequest ); 01676 } 01677 else 01678 usedEvent = true; 01679 01680 prefetchBlock( i, event, StreamObject ); 01681 updateNextBlockToBeFetched( NULL ); 01682 } 01683 i++; 01684 } 01685 01686 if( ( pos != -1 ) && 01687 ( m_Buffer[pos].blockInfo->block >= 01688 m_PosAtClientsList->nextBlockToBeFetched->block )) 01689 { 01690 updateNextBlockToBeFetched( m_Buffer[pos].blockInfo->next ); 01691 } 01692 01693 if( usedEvent == false ) 01694 { 01695 //have to free (it was not used) 01696 EventManager.Free( (Event *) event ); 01697 } 01698 // ---------------------------------------------------------------- 01699 } 01700 else 01701 { 01702 01703 /////////////////////////////////////////////////////////////////// 01704 // Check if request satisfy rate requirement and there are no other 01705 // request queued 01706 01707 struct timeval tolerance_time; 01708 tolerance_time.tv_sec =ArrivalTime.tv_sec; 01709 tolerance_time.tv_usec=ArrivalTime.tv_usec+RELEASE_TIME_TOLERANCE; 01710 if( tolerance_time.tv_usec > 1000000 ) 01711 { 01712 tolerance_time.tv_sec += 1; 01713 tolerance_time.tv_usec -= 1000000; 01714 } 01715 01716 #ifdef RIO_DEBUG2 01717 if( m_log.is_open() ) m_log << "Stream:DataReq " 01718 << " m_nQueue " << m_nQueue << endl 01719 << " tolerance_time.tv_sec " 01720 << ( unsigned ) tolerance_time.tv_sec 01721 << " m_NextRelease.tv_sec " 01722 << ( unsigned ) m_NextRelease.tv_sec 01723 << endl 01724 << " tolerance_time.tv_usec " 01725 << ( unsigned ) tolerance_time.tv_usec 01726 << " m_NextRelease.tv_usec " 01727 << ( unsigned ) m_NextRelease.tv_usec 01728 << endl; 01729 #endif 01730 01731 //This part makes the control of client's requests queue 01732 //Now, server only sends requests to router 01733 if( ( m_nQueue == 0 ) && 01734 (( tolerance_time.tv_sec > m_NextRelease.tv_sec ) || 01735 (( tolerance_time.tv_sec == m_NextRelease.tv_sec ) && 01736 ( tolerance_time.tv_usec >= m_NextRelease.tv_usec )))) 01737 { 01738 // Yes: Send request directly to Router 01739 01740 // First update new release time for next request 01741 if( ( ArrivalTime.tv_sec > m_NextRelease.tv_sec ) || 01742 (( ArrivalTime.tv_sec == m_NextRelease.tv_sec) && 01743 (ArrivalTime.tv_usec > m_NextRelease.tv_usec)) ) 01744 { 01745 m_NextRelease.tv_sec = ArrivalTime.tv_sec + 01746 m_ArrivalInterval.tv_sec; 01747 m_NextRelease.tv_usec = ArrivalTime.tv_usec + 01748 m_ArrivalInterval.tv_usec; 01749 } 01750 else 01751 { 01752 m_NextRelease.tv_sec += m_ArrivalInterval.tv_sec; 01753 m_NextRelease.tv_usec += m_ArrivalInterval.tv_usec; 01754 } 01755 if( m_NextRelease.tv_usec > 1000000 ) 01756 { 01757 m_NextRelease.tv_sec += 1; 01758 m_NextRelease.tv_usec -= 1000000; 01759 } 01760 // ---------------------------------------------------------------- 01761 01762 // Increment total number of requests sent to router 01763 s_mgr->m_ActiveRequests++; 01764 01765 // Send request to router 01766 #ifdef RIO_DEBUG2 01767 if( m_log.is_open() ) 01768 m_log << "Stream:DataReq:Enviando pedido de bloco ao router" 01769 << endl; 01770 #endif 01771 s_mgr->m_Router->Put( (Event*)event ); 01772 } 01773 // Rate violation: Hold request 01774 else 01775 { 01776 // Put request on hold until time out processing decides 01777 // it is time to process it 01778 #ifdef RIO_DEBUG2 01779 if( m_log.is_open() ) 01780 m_log << "Stream:DataReq:Putting request on hold" << endl; 01781 #endif 01782 01783 RTHold( event ); 01784 } 01785 01786 m_LastArrivalTime = ArrivalTime; 01787 } 01788 } 01789 01790 struct in_addr clientip; 01791 clientip.s_addr = m_RemoteAddress.sin_addr.s_addr; 01792 UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip), 01793 htons( m_RemoteAddress.sin_port ), m_nRequests ); 01794 s_mgr->PostITEvent( (MonitorEvent *)it_event ); 01795 01796 return S_OK; 01797 }
void RioStream::decServerBufferStatus | ( | ) |
Definition at line 2720 of file server/StreamManager.cpp.
02721 { 02722 pthread_mutex_lock( &MutexBlock ); 02723 m_ServerBufferStatus--; 02724 pthread_mutex_unlock( &MutexBlock ); 02725 #ifdef RIO_DEBUG2 02726 if( m_log.is_open() ) m_log << "BufferStatus decremented : " << m_ServerBufferStatus << endl; 02727 #endif 02728 }
void RioStream::decTokensToDisk | ( | ) |
Definition at line 2644 of file server/StreamManager.cpp.
02645 { 02646 pthread_mutex_lock( &MutexTokensToDisk ); 02647 if( m_TokensToDisk > 0 ) 02648 { 02649 m_TokensToDisk--; 02650 } 02651 pthread_mutex_unlock( &MutexTokensToDisk ); 02652 #ifdef RIO_DEBUG2 02653 if( m_log.is_open() ) m_log << "TokensToDisk decremented : " << m_TokensToDisk << endl; 02654 #endif 02655 }
void RioStream::decTokensToSend | ( | ) |
Definition at line 2821 of file server/StreamManager.cpp.
02822 { 02823 pthread_mutex_lock( &MutexTokensToSend ); 02824 m_TokensToSend--; 02825 pthread_mutex_unlock( &MutexTokensToSend ); 02826 #ifdef RIO_DEBUG2 02827 if( m_log.is_open() ) m_log << "TokensToSend decremented : " << m_TokensToSend << endl; 02828 #endif 02829 }
int RioStream::Get_streamid | ( | ) |
Definition at line 1208 of file server/StreamManager.cpp.
01209 { 01210 return s_streamid; 01211 }
double RioStream::getAverageCACTime | ( | ) |
Definition at line 2737 of file server/StreamManager.cpp.
02738 { 02739 return s_mgr->GetAverageCACTime(); 02740 }
unsigned int RioStream::getBlockSize | ( | ) |
Definition at line 2758 of file server/StreamManager.cpp.
02759 { 02760 return s_mgr->GetBlockSize(); 02761 }
int RioStream::getClientBufferSize | ( | ) |
Definition at line 2699 of file server/StreamManager.cpp.
02700 { 02701 return m_ClientBufferSize; 02702 }
int RioStream::getClientBufferStatus | ( | ) |
Definition at line 2669 of file server/StreamManager.cpp.
02670 { 02671 if( m_StartingToSend ) 02672 { 02673 return 0; 02674 } 02675 else 02676 { 02677 return m_ClientBufferSize - m_TokensToSend; 02678 } 02679 }
int RioStream::getDirection | ( | ) |
Definition at line 1136 of file server/StreamManager.cpp.
01137 { 01138 return s_Direction; 01139 }
double RioStream::getEstimatedDiskQueueTime | ( | int | disk | ) |
Definition at line 2753 of file server/StreamManager.cpp.
02754 { 02755 return s_mgr->GetEstimatedDiskQueueTime( disk ); 02756 }
double RioStream::getEstimatedDiskResponseTime | ( | int | disk | ) |
Definition at line 2748 of file server/StreamManager.cpp.
02749 { 02750 return s_mgr->GetEstimatedDiskResponseTime( disk ); 02751 }
double RioStream::getEstimatedDiskServiceTime | ( | int | disk | ) |
Definition at line 2743 of file server/StreamManager.cpp.
02744 { 02745 return s_mgr->GetEstimatedDiskServiceTime( disk ); 02746 }
uint RioStream::GetId | ( | ) |
Definition at line 1202 of file server/StreamManager.cpp.
01203 { 01204 return m_Id; 01205 }
PSEQBLOCKLIST RioStream::getInfoAboutRequestList | ( | char * | filename, | |
RioObject * | object | |||
) | [private] |
Definition at line 2889 of file server/StreamManager.cpp.
02891 { 02892 FILE *file; 02893 PSEQBLOCKLIST curblock, prevblock, init_stream; 02894 struct timeval initial_time, final_time; 02895 02896 float duration = 0; 02897 uint block = 0; 02898 int rc = 0; 02899 int numreps; 02900 long double time = 0; // event time 02901 02902 RioDiskBlock Reps[MaxReplications]; 02903 02904 initial_time = Timer.CurrentTime(); 02905 02906 m_FirstArrivalTime = initial_time.tv_sec * 1000.0 + 02907 initial_time.tv_usec / 1000.0; 02908 02909 uint SBufferSize = getNumberOfBuffersForEachClient(); 02910 uint CBufferSize = m_ClientBufferSize; 02911 02912 uint buffersize = CBufferSize + SBufferSize; 02913 02914 #ifdef RIO_DEBUG2 02915 if( m_log.is_open() ) m_log <<"---------------------------------------------------------------- "<<endl; 02916 if( m_log.is_open() ) m_log <<" getInfoAboutRequestList "<<endl; 02917 if( m_log.is_open() ) m_log <<"---------------------------------------------------------------- "<<endl; 02918 if( m_log.is_open() ) m_log << "Getting disk service time info ... "; 02919 #endif 02920 02921 pthread_mutex_lock( &s_mgr->MutexUpdateMeasures ); 02922 s_mgr->GetDiskServiceTime(); 02923 s_mgr->UpdateDiskQueueAndServiceTime(); 02924 pthread_mutex_unlock( &s_mgr->MutexUpdateMeasures ); 02925 02926 #ifdef RIO_DEBUG2 02927 if( m_log.is_open() ) m_log << "OK." << endl; 02928 #endif 02929 02930 double msecRunCAC = getAverageCACTime(); 02931 double NetworkRate = getNetworkRate(); 02932 02933 unsigned int BlockSize = getBlockSize(); 02934 02935 double timeToTransmit_msec = ( BlockSize/NetworkRate ) * 1000.0; 02936 02937 #ifdef RIO_DEBUG2 02938 if( m_log.is_open() ) m_log << "Client ID " << m_Id << endl; 02939 if( m_log.is_open() ) m_log << "SBufferSize " << SBufferSize << endl; 02940 if( m_log.is_open() ) m_log << "CBufferSize " << CBufferSize << endl; 02941 if( m_log.is_open() ) m_log << "RTT (msec) " << m_AverageRTTtoClient_msec << endl; 02942 if( m_log.is_open() ) m_log << "msecRunCAC " << msecRunCAC << endl; 02943 if( m_log.is_open() ) m_log << "NetworkRate " << NetworkRate << "B/sec" << endl; 02944 if( m_log.is_open() ) m_log << "BlockSize " << BlockSize << "Bytes" << endl; 02945 if( m_log.is_open() ) m_log << "timeToTransmit_msec " << timeToTransmit_msec << " msec" << endl; 02946 #endif 02947 02948 // Open the file with times of the requests 02949 if( (file = fopen( filename, "r" )) == NULL ) 02950 { 02951 if( m_log.is_open() ) 02952 m_log<<"getInfoAboutRequestList: "<<filename<<": File not found."<<endl; 02953 return NULL; 02954 } 02955 02956 // Start to construct request list 02957 init_stream = prevblock = NULL; 02958 02959 float *req_time = (float *) malloc( buffersize * sizeof(float) ); 02960 float *disk_block = (float *) malloc( buffersize * sizeof(float) ); 02961 // get number of disks 02962 int disks = s_mgr->m_MaxNumberOfDisks; 02963 //time to service next request 02964 long double *disk_time = (long double*) malloc( disks * sizeof(long double) ); 02965 for( int i = 0 ; i < disks ; i++ ) 02966 disk_time[ i ] = msecRunCAC; 02967 02968 memset( req_time, 0, buffersize*sizeof(float) ); 02969 memset( disk_block, 0, buffersize*sizeof(float) ); 02970 memset( disk_time, 0, disks*sizeof(long double) ); 02971 02972 #ifdef RIO_DEBUG2 02973 if( m_log.is_open() ) m_log << "getInfoAboutRequestList: Starting reading ... " << endl; 02974 #endif 02975 02976 // Read next request until the end of file 02977 while( !feof( file ) ) 02978 { 02979 if( fscanf( file, "%f\n", &duration ) == 1 ) 02980 { 02981 duration = duration * 1000.0; // duration in msec 02982 02983 curblock = ( PSEQBLOCKLIST )malloc( sizeof( SEQBLOCKLIST ) ); 02984 if( curblock != NULL ) 02985 { 02986 curblock->id = m_Id; //to change 02987 curblock->block = block; 02988 curblock->duration = duration; 02989 02990 rc = object->MapBlock( (RioBlock) block, 0, &numreps, Reps); 02991 if( rc ) 02992 { 02993 RioErr << "getInfoAboutRequestList: Error at MapBlock" 02994 << endl; 02995 } 02996 else 02997 { 02998 curblock->disk = Reps[0].disk; 02999 curblock->physicalblock = Reps[0].block; 03000 } 03001 03002 curblock->next = NULL; 03003 curblock->prev = NULL; 03004 03005 if( block >= buffersize ) 03006 { 03007 curblock->csystime = req_time[block % buffersize] + time; 03008 time += req_time[ block % buffersize ]; 03009 } 03010 else 03011 { 03012 if( block < SBufferSize ) 03013 { 03014 // When the requests to full server side buffer will be: 03015 // shift msec to run cac 03016 curblock->csystime = msecRunCAC; 03017 } 03018 else 03019 { 03020 // When the requests to full client buffer will be: 03021 // shift msec to run CAC + 1 RTT 03022 curblock->csystime = msecRunCAC + 03023 m_AverageRTTtoClient_msec; 03024 } 03025 03026 if( disk_time[ curblock->disk ] < curblock->csystime ) 03027 disk_time[ curblock->disk ] = curblock->csystime; 03028 03029 disk_time[ curblock->disk ] += 03030 getEstimatedDiskServiceTime( curblock->disk ); 03031 03032 disk_block[ block ] = disk_time[ curblock->disk ]; 03033 03034 #ifdef RIO_DEBUG2 03035 if( m_log.is_open() ) m_log << " id " << curblock->id 03036 << " block " << curblock->block 03037 << " disk " << curblock->disk 03038 << " servicetime " 03039 << getEstimatedDiskServiceTime( curblock->disk ) 03040 << " disk_time " 03041 << disk_time[ curblock->disk] << endl; 03042 #endif 03043 03044 if( (block + 1) == buffersize ) 03045 { 03046 double maxdisktime = disk_time[ 0 ]; 03047 for( int i = 1; i < disks; i++ ) 03048 { 03049 if( disk_time [ i ] > maxdisktime ) 03050 maxdisktime = disk_time[ i ]; 03051 } 03052 double timetotx = msecRunCAC + 03053 m_AverageRTTtoClient_msec; 03054 for( uint i = 0; i < CBufferSize; i++ ) 03055 { 03056 if( disk_block [ i ] > timetotx ) 03057 timetotx = disk_block[ i ]; 03058 timetotx += timeToTransmit_msec; 03059 } 03060 03061 //time is set to when the requests for Totalbuffers 03062 //blocks will be: 03063 //max between until tx all BC blocks and read BS + BC blocks 03064 double maxtime; 03065 if( timetotx > maxdisktime ) 03066 maxtime = timetotx; 03067 else 03068 maxtime = maxdisktime; 03069 time = maxtime + m_AverageRTTtoClient_msec; 03070 03071 #ifdef RIO_DEBUG2 03072 if( m_log.is_open() ) 03073 { 03074 m_log << "maxdisktime " << maxdisktime << endl; 03075 m_log << "timetotx " << timetotx << endl; 03076 m_log << "max + RTT = time => " << time << endl; 03077 } 03078 #endif 03079 } 03080 } 03081 req_time[ block % buffersize ] = duration; 03082 } 03083 else 03084 { 03085 RioErr << "getInfoAboutRequestList: Could not allocate memory. " 03086 << endl; 03087 03088 free( req_time ); 03089 free( disk_block ); 03090 free( disk_time ); 03091 03092 return NULL; 03093 } 03094 if( prevblock ) { 03095 curblock->prev = prevblock; 03096 prevblock = prevblock->next = curblock; 03097 } 03098 else 03099 prevblock = init_stream = curblock; 03100 duration = 0; 03101 } 03102 else 03103 { 03104 if( !feof( file ) ) 03105 { 03106 RioErr << "getInfoAboutRequestList: Could not read from file." 03107 << endl; 03108 03109 free( req_time ); 03110 free( disk_block ); 03111 free( disk_time ); 03112 03113 return NULL; 03114 } 03115 } 03116 block++; 03117 } 03118 03119 m_TotalBlocks = block; 03120 03121 fclose( file ); 03122 03123 #ifdef RIO_DEBUG2 03124 if( m_log.is_open() ) m_log << "getInfoAboutRequestList: Closing file." << endl; 03125 #endif 03126 03127 final_time = Timer.CurrentTime(); 03128 time = getInterval( initial_time, final_time ); 03129 m_SampleGenReqListTime = time; 03130 s_mgr->SetAverageGenReqListTime( time , this ); 03131 03132 #ifdef RIO_DEBUG2 03133 if( m_log.is_open() ) 03134 { 03135 m_log<<"---------------------------------------------------------------- "<<endl; 03136 m_log<<"getInfoAboutRequestList Time = " << time << " msec " <<endl; 03137 m_log<<"---------------------------------------------------------------- "<<endl; 03138 } 03139 #endif 03140 03141 free( req_time ); 03142 free( disk_block ); 03143 free( disk_time ); 03144 03145 return init_stream; 03146 }
struct timeval RioStream::getLastArrivalTime | ( | ) | [read] |
Definition at line 2664 of file server/StreamManager.cpp.
02665 { 02666 return m_LastArrivalTime; 02667 }
u32 RioStream::getMaximumBlockToSend | ( | ) |
Definition at line 2844 of file server/StreamManager.cpp.
02845 { 02846 if( m_FinishedMovie == true ) 02847 { 02848 return m_PosAtClientsList->nextBlockToBeRequested->block + 1; 02849 } 02850 else 02851 { 02852 return m_PosAtClientsList->nextBlockToBeRequested->block; 02853 } 02854 }
u32 RioStream::getMinimumBlockToSend | ( | ) |
Definition at line 2856 of file server/StreamManager.cpp.
02857 { 02858 return m_MinimumBlockToSend; 02859 }
void RioStream::getMutexUpdateEvent | ( | ) |
Definition at line 2395 of file server/StreamManager.cpp.
02396 { 02397 pthread_mutex_lock( &MutexUpdateEvent ); 02398 }
long double RioStream::getNetworkRate | ( | ) |
Definition at line 2764 of file server/StreamManager.cpp.
02765 { 02766 return s_mgr->GetNetworkRate(); 02767 }
int RioStream::getNumberOfBlocks | ( | ) |
Definition at line 2694 of file server/StreamManager.cpp.
02695 { 02696 return m_TotalBlocks; 02697 }
int RioStream::getNumberOfBuffersForEachClient | ( | ) |
Definition at line 2731 of file server/StreamManager.cpp.
02732 { 02733 return s_mgr->GetNumberOfBuffersForEachClient(); 02734 }
int RioStream::getPlayBlock | ( | ) |
Definition at line 2681 of file server/StreamManager.cpp.
02682 { 02683 if( m_StartingToSend ) 02684 { 02685 return 0; 02686 } 02687 else 02688 { 02689 return(m_PosAtClientsList->nextBlockToBeRequested->block - 02690 getClientBufferSize()); 02691 } 02692 }
bool RioStream::getPrefetchedAllBlocks | ( | ) |
Definition at line 2882 of file server/StreamManager.cpp.
02883 { 02884 return m_PrefetchedAllBlocks; 02885 }
int RioStream::getQueueSize | ( | ) |
Definition at line 2770 of file server/StreamManager.cpp.
02771 { 02772 return m_nQueue; 02773 }
double RioStream::getRTTtoClient | ( | ) |
Definition at line 2776 of file server/StreamManager.cpp.
02777 { 02778 return m_AverageRTTtoClient_msec; 02779 }
int RioStream::getServerBufferStatus | ( | ) |
Definition at line 2705 of file server/StreamManager.cpp.
02706 { 02707 #ifdef RIO_DEBUG2 02708 if( m_log.is_open() ) m_log << " m_ServerBufferStatus " << m_ServerBufferStatus 02709 << " m_TokensToSend " << m_TokensToSend 02710 << endl; 02711 #endif 02712 02713 if( m_TokensToSend > 0 ) 02714 return ( m_ServerBufferStatus - m_TokensToSend ); 02715 else 02716 return m_ServerBufferStatus; 02717 }
int RioStream::getTokensToDisk | ( | ) |
Definition at line 2658 of file server/StreamManager.cpp.
02659 { 02660 return m_TokensToDisk; 02661 }
int RioStream::getTokensToSend | ( | ) |
Definition at line 2804 of file server/StreamManager.cpp.
02805 { 02806 return m_TokensToSend; 02807 }
RioTrafficType RioStream::GetType | ( | ) |
Definition at line 1196 of file server/StreamManager.cpp.
01197 { 01198 return m_Type; 01199 }
void RioStream::incMinimumBlockToSend | ( | ) |
Definition at line 2861 of file server/StreamManager.cpp.
02862 { 02863 pthread_mutex_lock( &MutexMinBlock ); 02864 m_MinimumBlockToSend++; 02865 pthread_mutex_unlock( &MutexMinBlock ); 02866 #ifdef RIO_DEBUG2 02867 if( m_log.is_open() ) m_log << "MinimumBlockToSend incremented : " << m_MinimumBlockToSend 02868 << endl; 02869 #endif 02870 }
void RioStream::incTokensToSend | ( | ) |
Definition at line 2810 of file server/StreamManager.cpp.
02811 { 02812 pthread_mutex_lock( &MutexTokensToSend ); 02813 m_TokensToSend++; 02814 pthread_mutex_unlock( &MutexTokensToSend ); 02815 #ifdef RIO_DEBUG2 02816 if( m_log.is_open() ) m_log << "TokensToSend incremented : " << m_TokensToSend << endl; 02817 #endif 02818 }
void RioStream::Initialize | ( | CStreamManager * | mgr | ) |
Definition at line 1025 of file server/StreamManager.cpp.
01026 { 01027 s_mgr = mgr; 01028 }
int RioStream::MaxRequests | ( | ) |
Definition at line 1184 of file server/StreamManager.cpp.
01185 { 01186 if( m_Status != StreamStatusOpened ) 01187 { 01188 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED; 01189 } 01190 01191 return m_UserMaxRequests; 01192 }
void RioStream::NewBlockArrival | ( | ) |
Definition at line 2791 of file server/StreamManager.cpp.
02792 { 02793 pthread_mutex_lock( &MutexBlock ); 02794 m_ServerBufferStatus++; 02795 pthread_cond_broadcast(&ConditionBlockArrival); 02796 pthread_mutex_unlock( &MutexBlock ); 02797 #ifdef RIO_DEBUG2 02798 if( m_log.is_open() ) m_log << "BufferStatus incremented : " << m_ServerBufferStatus << endl; 02799 #endif 02800 }
void RioStream::NRTHold | ( | EventDataRequest * | event | ) | [private] |
Definition at line 1819 of file server/StreamManager.cpp.
01820 { 01821 // If this is the first hold request for this stream 01822 // put stream on list of stream which are currently holding requests 01823 if( m_nQueue == 0 ) 01824 { 01825 s_mgr->m_NRTHoldList.Put( this ); 01826 } 01827 01828 m_Queue.Put( event ); 01829 m_nQueue++; 01830 m_QueuedCount++; 01831 }
int RioStream::OpenObject | ( | char * | ObjectName, | |
RioAccess | Access, | |||
struct timeval | RTT, | |||
int | BufferSize, | |||
RioStreamObj ** | StreamObj | |||
) |
Definition at line 2157 of file server/StreamManager.cpp.
02162 { 02163 int rc; 02164 02165 // Check if object access matches stream direction 02166 if( (s_Direction == RioStreamDirectionRead )&&( Access & RIO_WRITE_ACCESS) ) 02167 { 02168 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE; 02169 } 02170 if( (s_Direction == RioStreamDirectionWrite )&&( Access & RIO_READ_ACCESS) ) 02171 { 02172 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE; 02173 } 02174 02175 if( m_Status != StreamStatusOpened ) 02176 { 02177 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED; 02178 } 02179 02180 RioStreamObj *op = new RioStreamObj( s_mgr, this ); 02181 if( op == 0 ) 02182 { 02183 return ERROR_STREAMMANAGER + ERROR_MEMORY; 02184 } 02185 02186 op->o_object = 0; 02187 02188 rc = s_mgr->m_ObjectManager->Open( ObjectName, Access, &op->o_object ); 02189 02190 if( rc == 0 ) 02191 { 02192 *StreamObj = op; 02193 02194 // if it is VBR stream, if it is using buffer, initialize it 02195 if( (GetType() == RIO_TRAFFIC_VBR )&&( s_mgr->m_UseServerSideBuffers ) ) 02196 { 02197 struct timeval initial_time, final_time; 02198 double time; 02199 02200 initial_time = Timer.CurrentTime(); 02201 02202 m_ClientBufferSize = BufferSize; 02203 m_AverageRTTtoClient_msec = RTT.tv_sec * 1000.0 + 02204 RTT.tv_usec / 1000.0; 02205 02206 // get file name (with information to admission control) 02207 char infoFileName[255]="", mmobjectname[255]=""; 02208 strcpy( mmobjectname, ObjectName ); 02209 char *pos = strrchr( mmobjectname, '.' ); 02210 strcpy( pos+1, "cactimes" ); 02211 strcpy( infoFileName, s_mgr->m_FileRoot ); 02212 strcat( infoFileName, "/" ) ; 02213 strcat( infoFileName, mmobjectname ) ; 02214 02215 m_FirstArrivalTime = initial_time.tv_sec * 1000.0 + 02216 initial_time.tv_usec / 1000.0; 02217 02218 #ifdef RIO_DEBUG2 02219 if( m_log.is_open() ) 02220 m_log << "OpenObject: Video : " << ObjectName 02221 << endl << "OpenObject: CAC info: " << infoFileName 02222 << endl << "OpenObject: FirstArrivalTime " 02223 << m_FirstArrivalTime << endl; 02224 #endif 02225 02226 m_WaitingQueueSize = s_mgr->UpdateNumberOfWaitingClients( 1 ); 02227 02228 pthread_mutex_lock( &s_mgr->MutexRunCAC ); 02229 02230 if( ( m_RequestList = getInfoAboutRequestList( infoFileName, 02231 op->o_object )) != NULL ) 02232 { 02233 if( s_mgr->m_UseNewCAC ) 02234 { 02235 02236 m_PosAtClientsList = s_mgr->CanAdmit( m_RequestList, 02237 this, 02238 m_AverageRTTtoClient_msec, 02239 m_ClientBufferSize ); 02240 final_time = Timer.CurrentTime(); 02241 time = getInterval( initial_time, final_time ); 02242 m_SampleCACTime = time; 02243 int index; 02244 02245 if( m_PosAtClientsList != NULL ) 02246 { 02247 index = 0; 02248 } 02249 else 02250 { 02251 index = 1; 02252 } 02253 02254 m_SampleAdmissionProcessTime = m_SampleGenReqListTime+ 02255 m_SampleSortListTime+ 02256 m_SampleSimulationTime; 02257 02258 m_SampleWaitingTime = m_SampleCACTime - 02259 m_SampleAdmissionProcessTime; 02260 02261 s_mgr->SetAverageWaitingTime( m_SampleWaitingTime, 02262 this, index, 02263 m_WaitingQueueSize ); 02264 s_mgr->SetAverageAdmissionProcessTime( 02265 m_SampleAdmissionProcessTime, 02266 this, index ); 02267 02268 s_mgr->SetAverageCACTime( time, this, index ); 02269 02270 if( s_mgr->m_log.is_open() ) 02271 { 02272 s_mgr->m_log << "Time to generate list: " 02273 << m_SampleGenReqListTime << " msec." 02274 << endl; 02275 s_mgr->m_log << "Time to sort list: " 02276 << m_SampleSortListTime << " msec." 02277 << endl; 02278 s_mgr->m_log << "Time to simulate list: " 02279 << m_SampleSimulationTime << " msec." 02280 << endl; 02281 s_mgr->m_log << "Time to admission: " 02282 << m_SampleCACTime << " msec." 02283 << endl; 02284 s_mgr->m_log << "Waiting time: " 02285 << m_SampleWaitingTime << " msec." 02286 << endl; 02287 s_mgr->m_log << "Waiting queue size: " 02288 << m_WaitingQueueSize << endl; 02289 } 02290 02291 #ifdef RIO_DEBUG2 02292 if( m_log.is_open() ) 02293 m_log << "Time to admission: " << time << " msec." 02294 << endl; 02295 #endif 02296 if( m_PosAtClientsList != NULL ) 02297 { 02298 if( s_mgr->m_log.is_open() ) 02299 s_mgr->m_log << "OpenObject: New client admitted. " 02300 << "(RTT " << m_AverageRTTtoClient_msec 02301 << " msec)" << endl; 02302 02303 #ifdef RIO_DEBUG2 02304 if( m_log.is_open() ) 02305 m_log << "OpenObject: New client admitted." << endl; 02306 #endif 02307 } 02308 else 02309 { 02310 if( s_mgr->m_log.is_open() ) 02311 s_mgr->m_log << "OpenObject: Could not admit new " 02312 << "client" << endl; 02313 02314 #ifdef RIO_DEBUG2 02315 if( m_log.is_open() ) 02316 m_log << "OpenObject: Could not admit new client " 02317 << endl; 02318 #endif 02319 if( s_mgr->m_log.is_open() ) 02320 s_mgr->m_log << "Deleting StreamObj " << op << endl; 02321 02322 delete op; 02323 return -1; 02324 } 02325 } 02326 else 02327 { 02328 m_PosAtClientsList = s_mgr->InsertNewClient( m_RequestList, 02329 this ); 02330 s_mgr->UpdateNumberOfWaitingClients(-1); 02331 pthread_mutex_unlock( &s_mgr->MutexRunCAC ); 02332 } 02333 02334 // initialize StartingToSend 02335 m_StartingToSend = true; 02336 02337 setMinimumBlockToSend( 02338 m_PosAtClientsList->nextBlockToBeRequested->block ); 02339 m_FinishedMovie = false; 02340 02341 // now it creates the server side buffer equals to 02342 // s_mgr->GetNumberOfBuffersForEachClient() 02343 if( s_mgr->GetNumberOfBuffersForEachClient() > 0 ) 02344 { 02345 m_Buffer = new ClientBuffer[ 02346 s_mgr->GetNumberOfBuffersForEachClient()]; 02347 if( m_Buffer == 0 ) 02348 { 02349 RioErr << "Could not allocate buffer."<< endl; 02350 return -1; 02351 } 02352 else 02353 { 02354 for( int i = 0; 02355 i < s_mgr->GetNumberOfBuffersForEachClient(); 02356 i++ 02357 ) 02358 { 02359 prefetchBlock( i, 02360 (EventDataRequest*)EventManager.New( 02361 EventTypeRTDataRequest ), 02362 op); 02363 m_nRequests++; 02364 updateNextBlockToBeFetched( NULL ); 02365 } 02366 } 02367 } 02368 m_TimeToBuffer = Timer.CurrentTime(); 02369 //After this point, send Ok to client 02370 } 02371 else 02372 { 02373 RioErr << "Could not find " << infoFileName 02374 << ", allocate memory or read from file." 02375 << "Not using new control admission and buffers." 02376 << endl; 02377 02378 pthread_mutex_unlock( &s_mgr->MutexRunCAC ); 02379 return S_OK; 02380 } 02381 } 02382 return S_OK; 02383 } 02384 02385 if( s_mgr->m_log.is_open() ) 02386 s_mgr->m_log << "Deleting StreamObj " << op << endl; 02387 02388 delete op; 02389 return rc; 02390 }
int RioStream::OpenObject | ( | char * | ObjectName, | |
RioAccess | Access, | |||
RioStreamObj ** | StreamObj | |||
) |
Definition at line 1142 of file server/StreamManager.cpp.
01144 { 01145 int rc; 01146 01147 // Check if object access matches stream direction 01148 if( ( s_Direction == RioStreamDirectionRead ) && 01149 ( Access & RIO_WRITE_ACCESS )) 01150 { 01151 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE; 01152 } 01153 if( ( s_Direction == RioStreamDirectionWrite ) && 01154 ( Access & RIO_READ_ACCESS )) 01155 { 01156 return ERROR_STREAMMANAGER + ERROR_INVALID_ACCESS_TYPE; 01157 } 01158 01159 if( m_Status != StreamStatusOpened ) 01160 { 01161 return ERROR_STREAMMANAGER + ERROR_STREAM_NOT_OPENED; 01162 } 01163 01164 RioStreamObj *op = new RioStreamObj( s_mgr, this ); 01165 if( op == 0 ) 01166 { 01167 return ERROR_STREAMMANAGER + ERROR_MEMORY; 01168 } 01169 01170 op->o_object = 0; 01171 01172 rc = s_mgr->m_ObjectManager->Open( ObjectName, Access, &op->o_object ); 01173 01174 if( rc == 0 ) 01175 { 01176 *StreamObj = op; 01177 return S_OK; 01178 } 01179 delete op; 01180 return rc; 01181 }
void RioStream::prefetchBlock | ( | int | posAtBuffer, | |
EventDataRequest * | event, | |||
RioStreamObj * | StreamObject | |||
) |
Definition at line 2408 of file server/StreamManager.cpp.
02411 { 02412 02413 m_Buffer[posAtBuffer].blockInfo = m_PosAtClientsList->nextBlockToBeFetched; 02414 02415 #ifdef RIO_DEBUG2 02416 struct timeval time = Timer.CurrentTime(); 02417 long double time_msec = time.tv_sec * 1000.0 + time.tv_usec/1000.0 ; 02418 if( m_log_requests.is_open() ) 02419 { 02420 m_log_requests << m_Buffer[posAtBuffer].blockInfo->block << "\t\t" 02421 << m_Buffer[posAtBuffer].blockInfo->csystime << "\t\t" 02422 << time_msec - m_FirstArrivalTime << "\t\t" 02423 << ( time_msec - m_FirstArrivalTime ) - 02424 m_Buffer[posAtBuffer].blockInfo->csystime 02425 << endl; 02426 } 02427 #endif 02428 02429 m_Buffer[posAtBuffer].event = event; 02430 m_Buffer[posAtBuffer].event->Request.BufferId = -1; 02431 // to be used to confirm if the request was submit to 02432 // storage before cancel it 02433 m_Buffer[posAtBuffer].event->Request.RequestCounter = -1; 02434 m_Buffer[posAtBuffer].event->Request.Reqid = 0; 02435 m_Buffer[posAtBuffer].event->Request.Target.IPaddress = 0; 02436 m_Buffer[posAtBuffer].event->Request.Target.Port = 0; 02437 m_Buffer[posAtBuffer].event->Request.Block = 02438 m_Buffer[posAtBuffer].blockInfo->block; 02439 m_Buffer[posAtBuffer].event->Request.Reps[0].block = 02440 m_Buffer[posAtBuffer].blockInfo->physicalblock; 02441 m_Buffer[posAtBuffer].event->Request.Reps[0].disk = 02442 m_Buffer[posAtBuffer].blockInfo->disk; 02443 m_Buffer[posAtBuffer].event->Request.RepBits = 0; 02444 m_Buffer[posAtBuffer].event->Request.Status = 0; 02445 m_Buffer[posAtBuffer].event->Request.Operation = RealTimePrefetchBlock; 02446 m_Buffer[posAtBuffer].event->Request.streamobj = StreamObject; 02447 m_Buffer[posAtBuffer].event->Request.rioobject = StreamObject->o_object; 02448 02449 StreamObject->m_PendingRequests++; 02450 02451 if( m_StartingToSend == true ) 02452 { 02453 #ifdef RIO_DEBUG2 02454 if( m_log.is_open() ) 02455 m_log << "prefetchBlock:starting prefetching block " 02456 << m_Buffer[posAtBuffer].event->Request.Block 02457 << " (m_nRequests " << m_nRequests << ")" 02458 << " m_ActiveRequests " 02459 << s_mgr->m_ActiveRequests + 1 02460 << endl; 02461 #endif 02462 s_mgr->m_Router->Put( (Event*)m_Buffer[posAtBuffer].event ); 02463 s_mgr->m_ActiveRequests++; 02464 } 02465 else 02466 { 02467 if( ( m_nQueue == 0 ) && ( getTokensToDisk() > 0 ) ) 02468 { 02469 #ifdef RIO_DEBUG2 02470 if( m_log.is_open() ) 02471 m_log << "prefetchBlock:prefetching block (Router) " 02472 << m_Buffer[posAtBuffer].event->Request.Block 02473 << " (m_nRequests " << m_nRequests << ")" 02474 << " m_ActiveRequests " 02475 << s_mgr->m_ActiveRequests + 1 02476 << endl; 02477 #endif 02478 s_mgr->m_Router->Put( (Event*)m_Buffer[posAtBuffer].event ); 02479 s_mgr->m_ActiveRequests++; 02480 decTokensToDisk(); 02481 } 02482 else 02483 { 02484 #ifdef RIO_DEBUG2 02485 if( m_log.is_open() ) 02486 m_log << "prefetchBlock:prefetching block (RTQueue) " 02487 << m_Buffer[posAtBuffer].event->Request.Block 02488 << " (m_nRequests " << m_nRequests << ")" 02489 << endl; 02490 #endif 02491 RTHold( m_Buffer[posAtBuffer].event ); 02492 } 02493 } 02494 }
void RioStream::ProcessNRTQueue | ( | ) | [private] |
Definition at line 1930 of file server/StreamManager.cpp.
01931 { 01932 EventDataRequest* request; 01933 01934 // Process just one queued request at time 01935 // (round robin across all non real-time streams) 01936 if( m_nQueue > 0 ) // Just in case: Not really necessary 01937 { 01938 // retrieve request from queue 01939 request = m_Queue.Get(); 01940 m_nQueue--; 01941 01942 // Increment total number of requests sent to router 01943 s_mgr->m_ActiveRequests++; 01944 01945 // Send request to router 01946 s_mgr->m_Router->Put( (Event*)request ); 01947 } 01948 // If no requests left on queue, remove stream from list of streams 01949 // with hold requests 01950 if( m_nQueue == 0 ) 01951 s_mgr->m_NRTHoldList.Remove( this ); 01952 }
void RioStream::ProcessRTQueue | ( | ) | [private] |
Definition at line 1834 of file server/StreamManager.cpp.
01835 { 01836 01837 EventDataRequest* request; 01838 01839 struct timeval t = Timer.CurrentTime(); 01840 t.tv_usec += RELEASE_TIME_TOLERANCE; 01841 if( t.tv_usec > 1000000 ) 01842 { 01843 t.tv_sec += 1; 01844 t.tv_usec -= 1000000; 01845 } 01846 01847 if( m_PosAtClientsList != NULL ) 01848 { 01849 struct timeval clocknow = Timer.CurrentTime(); 01850 01851 // updates number of tokensToDisk 01852 updateTokensToDisk( clocknow ); 01853 01854 // Process maximum queued requests without violating stream rate 01855 while(( m_nQueue > 0 ) && ( getTokensToDisk() > 0 )) 01856 { 01857 #ifdef RIO_DEBUG2 01858 if( m_log.is_open() ) 01859 m_log << "Stream:DataReq 1 - ProcessRTQueue " 01860 << " Recuperando da fila." 01861 << endl; 01862 #endif 01863 // retrieve request from queue 01864 request = m_Queue.Get(); 01865 m_nQueue--; 01866 01867 // Increment total number of requests sent to router 01868 s_mgr->m_ActiveRequests++; 01869 01870 // Send request to router 01871 #ifdef RIO_DEBUG2 01872 if( m_log.is_open() ) m_log << "Stream:DataReq - ProcessRTQueue " 01873 << " Enviando pedidos recuperados da fila para o router111 " 01874 << endl; 01875 #endif 01876 s_mgr->m_Router->Put( (Event*)request ); 01877 decTokensToDisk(); 01878 } 01879 } 01880 else 01881 { 01882 // Process maximum queued requests without violating stream rate 01883 while( ( m_nQueue > 0 ) && 01884 ( ( t.tv_sec > m_NextRelease.tv_sec) || 01885 ( ( t.tv_sec == m_NextRelease.tv_sec ) && 01886 ( t.tv_usec >= m_NextRelease.tv_usec) 01887 ) 01888 ) 01889 ) 01890 { 01891 // retrieve request from queue 01892 #ifdef RIO_DEBUG2 01893 if( m_log.is_open() ) 01894 m_log << "Stream:DataReq 2 - ProcessRTQueue " 01895 << " Recuperando da fila." 01896 << endl; 01897 #endif 01898 request = m_Queue.Get(); 01899 m_nQueue--; 01900 01901 // update new release time for next request 01902 m_NextRelease.tv_sec += m_ArrivalInterval.tv_sec; 01903 m_NextRelease.tv_usec += m_ArrivalInterval.tv_usec; 01904 if( m_NextRelease.tv_usec > 1000000 ) 01905 { 01906 m_NextRelease.tv_sec += 1; 01907 m_NextRelease.tv_usec -= 1000000; 01908 } 01909 01910 // Increment total number of requests sent to router 01911 s_mgr->m_ActiveRequests++; 01912 01913 // Send request to router 01914 #ifdef RIO_DEBUG2 01915 if( m_log.is_open() ) 01916 m_log << "Stream:DataReq - ProcessRTQueue " 01917 << " Enviando pedidos recuperados da fila para o router " 01918 << endl; 01919 #endif 01920 s_mgr->m_Router->Put( (Event*)request ); 01921 } 01922 } 01923 // If no requests left on queue, remove stream from list of streams 01924 // with hold requests 01925 if( m_nQueue == 0 ) 01926 s_mgr->m_RTHoldList.Remove( this ); 01927 }
void RioStream::releaseMutexUpdateEvent | ( | ) |
Definition at line 2400 of file server/StreamManager.cpp.
02401 { 02402 pthread_mutex_unlock( &MutexUpdateEvent ); 02403 }
void RioStream::RequestCompleted | ( | Event * | Request | ) | [private] |
Definition at line 1955 of file server/StreamManager.cpp.
01956 { 01957 DataRequest* Req = & ((( EventDataRequest* )Request )->Request ); 01958 01959 // Save request info for sending reply to client after request is free 01960 DataRequestOperation Operation = Req->Operation; 01961 CommunicationAddress Target = Req->Target; 01962 int Reqid = Req->Reqid; 01963 RioResult Status = Req->Status; 01964 NATData result; 01965 // TODO: Remover depois, quando a RioNeti e a NetMgr nao forem mais usadas. 01966 unsigned int StorageNumber; 01967 01968 // TODO: Remover depois, quando a RioNeti e a NetMgr nao forem mais usadas 01969 // (obtem o numero de storages). 01970 s_mgr->m_Router->GetNumberOfStorageNodes( &StorageNumber ); 01971 01972 // Free request 01973 EventManager.Free( Request ); 01974 01975 m_nRequests--; 01976 s_mgr->m_ActiveRequests--; 01977 01978 #ifdef RIO_DEBUG2 01979 struct in_addr client_address; 01980 client_address.s_addr = Target.IPaddress; 01981 RioErr << "[RioStream] RequestCompleted Target: " 01982 << "IPAddr = " << inet_ntoa(client_address) 01983 << ", Port = " << Target.Port << endl; 01984 #endif 01985 01986 // Check if stream was closed 01987 if( m_Status == StreamStatusClosed ) 01988 { 01989 if( m_nRequests == 0 ) 01990 { 01991 s_mgr->m_CloseList.Remove( this ); 01992 s_mgr->m_used--; 01993 // specific for real-time streams 01994 if( m_Type == RIO_TRAFFIC_CBR ) 01995 { 01996 s_mgr->m_UsedRate -= s_Rate; 01997 } 01998 if( (m_PosAtClientsList != NULL) && 01999 (s_mgr->m_UseServerSideBuffers) 02000 ) 02001 { 02002 s_mgr->RemoveClient( m_PosAtClientsList ); 02003 if( m_Buffer ) 02004 { 02005 delete m_Buffer; 02006 m_Buffer = NULL; 02007 } 02008 } 02009 #ifdef RIO_DEBUG2 02010 RioErr << "Closing mlog" << endl; 02011 m_log.close(); 02012 #endif 02013 #ifdef RIO_DEBUG2 02014 RioErr << "Closing mlogreq" << endl; 02015 m_log_requests.close(); 02016 #endif 02017 s_mgr->m_FreeList.Put( this ); 02018 if( s_mgr->m_log.is_open() ) 02019 s_mgr->m_log << "RequestCompleted: Stream CLOSED. " 02020 << " Requests: " << m_RequestCount 02021 << ". Queued requests: " << m_QueuedCount << endl; 02022 } 02023 } 02024 else // else do if( m_Status == StreamStatusClosed ) 02025 { 02026 02027 #ifdef RIO_DEBUG2 02028 RioErr << "RioStream::RequestCompleted Status = " << Status 02029 << ", Operation = " << Operation << ", ReqID = " << Reqid 02030 << endl; 02031 #endif 02032 02033 // Send reply to client if required 02034 if( ( Operation == NonRealTimeRead ) || 02035 ( Operation == RealTimeRead ) || 02036 ( Operation == RealTimeCancelBlock ) 02037 ) 02038 { 02039 // For read, just send result to client if there was an error 02040 if( Status != 0 ) 02041 { 02042 #ifdef RIO_DEBUG2 02043 if( m_log.is_open() ) 02044 m_log << "RequestCompleted SendResult Error " << endl; 02045 #endif 02046 RioErr << "RequestCompleted SendResult Error " << endl; 02047 02048 NATData input( Target.IPaddress, Target.Port ); 02049 02050 // Verifica se existe o mapeamento do servidor para a nova 02051 // implementacao. 02052 // TODO: Remover este if quando a RioNeti e a NetMgr deixarem de 02053 // ser usadas. 02054 if( s_mgr->m_Router->m_NAT_map.findElement( input, 02055 StorageNumber + 1 ) ) 02056 result = s_mgr->m_Router->m_NAT_map.getElement( input, 02057 StorageNumber + 1 ); 02058 else 02059 result = s_mgr->m_Router->m_NAT_map.getElement( input, 0 ); 02060 if( result.nat_addr != 0 ) 02061 { 02062 // A nova chamada a funcao NetInterface.FindIPAndPort e para 02063 // verificarmos se devemos usar a funcao do objeto da nova 02064 // classe ou a funcao do objeto da classe antiga. 02065 if( s_mgr->m_NetInterface->FindIPAndPort( result.nat_addr, 02066 result.nat_port ) 02067 ) 02068 s_mgr->m_NetInterface->SendResult( result.nat_addr, 02069 result.nat_port, 02070 Reqid, Status ); 02071 else 02072 s_mgr->m_NetMgr->SendResult( result.nat_addr, 02073 result.nat_port, 02074 Reqid, Status ); 02075 } 02076 else 02077 { 02078 // A nova chamada a funcao NetInterface.FindIPAndPort e para 02079 // verificarmos se devemos usar a funcao do objeto da nova 02080 // classe ou a funcao do objeto da classe antiga. 02081 if( s_mgr->m_NetInterface->FindIPAndPort( Target.IPaddress, 02082 Target.Port ) ) 02083 s_mgr->m_NetInterface->SendResult( Target.IPaddress, 02084 Target.Port, 02085 Reqid, Status ); 02086 else 02087 s_mgr->m_NetMgr->SendResult( Target.IPaddress, 02088 Target.Port, 02089 Reqid, Status ); 02090 } 02091 } 02092 } 02093 // write case 02094 else if( Operation == NonRealTimeWrite ) 02095 { 02096 NATData input( Target.IPaddress, Target.Port ); 02097 // Verifica se existe o mapeamento do servidor para a nova 02098 // implementacao. 02099 // TODO: Remover este if quando a RioNeti e a NetMgr deixarem de ser 02100 // usadas. 02101 if( s_mgr->m_Router->m_NAT_map.findElement( input, 02102 StorageNumber + 1 ) ) 02103 result = s_mgr->m_Router->m_NAT_map.getElement( input, 02104 StorageNumber + 1 ); 02105 else 02106 result = s_mgr->m_Router->m_NAT_map.getElement( input, 0 ); 02107 if( result.nat_addr != 0 ) 02108 { 02109 Target.IPaddress = result.nat_addr; 02110 Target.Port = result.nat_port; 02111 } 02112 if( result.nat_addr != 0 ) 02113 { 02114 // A nova chamada a funcao NetInterface.FindIPAndPort e para 02115 // verificarmos se devemos usar a funcao do objeto da nova 02116 // classe ou a funcao do objeto da classe antiga. 02117 if( s_mgr->m_NetInterface->FindIPAndPort( result.nat_addr, 02118 result.nat_port ) ) 02119 s_mgr->m_NetInterface->SendResult( result.nat_addr, 02120 result.nat_port, 02121 Reqid, Status ); 02122 else 02123 s_mgr->m_NetMgr->SendResult( result.nat_addr, 02124 result.nat_port, 02125 Reqid, Status ); 02126 } 02127 else 02128 { 02129 if( s_mgr->m_NetInterface->FindIPAndPort( Target.IPaddress, 02130 Target.Port ) ) 02131 s_mgr->m_NetInterface->SendResult( Target.IPaddress, 02132 Target.Port, Reqid, 02133 Status ); 02134 else 02135 s_mgr->m_NetMgr->SendResult( Target.IPaddress, Target.Port, 02136 Reqid, Status ); 02137 } 02138 } 02139 } // fim do else do if( m_Status == StreamStatusClosed ) 02140 02141 struct in_addr clientip; 02142 clientip.s_addr = m_RemoteAddress.sin_addr.s_addr; 02143 UpdateQueueEvent *it_event = new UpdateQueueEvent( inet_ntoa(clientip), 02144 htons( m_RemoteAddress.sin_port ), m_nRequests ); 02145 s_mgr->PostITEvent( (MonitorEvent *)it_event ); 02146 }
void RioStream::RTHold | ( | EventDataRequest * | event | ) | [private] |
Definition at line 1800 of file server/StreamManager.cpp.
01801 { 01802 // If this is the first hold request for this stream 01803 // put stream on list of stream which are currently holding requests 01804 if( m_nQueue == 0 ) 01805 { 01806 #ifdef RIO_DEBUG2 01807 if( m_log.is_open() ) 01808 m_log << "Stream:DataReq:colocando first na fila " << endl; 01809 #endif 01810 s_mgr->m_RTHoldList.Put( this ); 01811 } 01812 01813 m_Queue.Put( event ); 01814 m_nQueue++; 01815 m_QueuedCount++; 01816 }
void RioStream::setMinimumBlockToSend | ( | u32 | Block | ) |
Definition at line 2871 of file server/StreamManager.cpp.
02872 { 02873 pthread_mutex_lock( &MutexMinBlock ); 02874 m_MinimumBlockToSend = Block; 02875 pthread_mutex_unlock( &MutexMinBlock ); 02876 #ifdef RIO_DEBUG2 02877 if( m_log.is_open() ) m_log << "MinimumBlockToSend set : " << m_MinimumBlockToSend << endl; 02878 #endif 02879 }
void RioStream::setTokensToSend | ( | int | value | ) |
Definition at line 2831 of file server/StreamManager.cpp.
02832 { 02833 pthread_mutex_lock( &MutexTokensToSend ); 02834 m_TokensToSend = value; 02835 pthread_mutex_unlock( &MutexTokensToSend ); 02836 #ifdef RIO_DEBUG2 02837 if( m_log.is_open() ) m_log << "TokensToSend set : " << m_TokensToSend << endl; 02838 #endif 02839 }
int RioStream::startingStream | ( | ) |
Definition at line 2784 of file server/StreamManager.cpp.
02785 { 02786 return m_StartingToSend; 02787 }
void RioStream::updateNextBlockToBeFetched | ( | PSEQBLOCKLIST | Next | ) |
Definition at line 2535 of file server/StreamManager.cpp.
02536 { 02537 if( Next == NULL ) 02538 { 02539 pthread_mutex_lock( &MutexNextBlockToBeFetched ); 02540 if( m_PosAtClientsList->nextBlockToBeFetched->next == NULL ) 02541 { 02542 #ifdef RIO_DEBUG2 02543 if( m_log.is_open() ) m_log << "updateNextBlockToBeFetched: prefetchAll true"<< endl; 02544 #endif 02545 m_PrefetchedAllBlocks = true; 02546 } 02547 else 02548 { 02549 m_PosAtClientsList->nextBlockToBeFetched = 02550 m_PosAtClientsList->nextBlockToBeFetched->next; 02551 } 02552 pthread_mutex_unlock( &MutexNextBlockToBeFetched ); 02553 } 02554 else 02555 { 02556 pthread_mutex_lock( &MutexNextBlockToBeFetched ); 02557 m_PosAtClientsList->nextBlockToBeFetched = Next; 02558 pthread_mutex_unlock( &MutexNextBlockToBeFetched ); 02559 } 02560 #ifdef RIO_DEBUG2 02561 if( m_log.is_open() ) m_log << "updateNextBlockToBeFetched: " 02562 << m_PosAtClientsList->nextBlockToBeFetched->block<< endl; 02563 #endif 02564 }
void RioStream::updateTokensToDisk | ( | struct timeval | time | ) |
Definition at line 2615 of file server/StreamManager.cpp.
02616 { 02617 pthread_mutex_lock( &MutexTokensToDisk ); 02618 while( ( (m_TokensToDiskClock.tv_sec < time.tv_sec ) || 02619 ( ( m_TokensToDiskClock.tv_sec == time.tv_sec) && 02620 (m_TokensToDiskClock.tv_usec < time.tv_usec ) 02621 ) 02622 ) 02623 && (m_StartingToSend == false )) 02624 { 02625 m_TokensToDiskClock.tv_sec += m_ArrivalInterval.tv_sec; 02626 m_TokensToDiskClock.tv_usec += m_ArrivalInterval.tv_usec; 02627 if( m_TokensToDiskClock.tv_usec > 1000000 ) 02628 { 02629 m_TokensToDiskClock.tv_sec += 1; 02630 m_TokensToDiskClock.tv_usec -= 1000000; 02631 } 02632 02633 if( m_TokensToDisk < s_mgr->GetBurstSizeOfEachClient() ) 02634 { 02635 m_TokensToDisk++; 02636 } 02637 } 02638 pthread_mutex_unlock( &MutexTokensToDisk ); 02639 #ifdef RIO_DEBUG2 02640 if( m_log.is_open() ) m_log << "TokensToDisk updated : " << m_TokensToDisk << endl; 02641 #endif 02642 }
friend class CStreamManager [friend] |
Definition at line 300 of file server/StreamManager.h.
friend class RioStreamObj [friend] |
Definition at line 301 of file server/StreamManager.h.
pthread_cond_t RioStream::ConditionBlockArrival [private] |
Definition at line 297 of file server/StreamManager.h.
struct timeval RioStream::m_ArrivalInterval [private] |
Definition at line 257 of file server/StreamManager.h.
double RioStream::m_AverageRTTtoClient_msec [private] |
Definition at line 282 of file server/StreamManager.h.
ClientBuffer* RioStream::m_Buffer [private] |
Definition at line 281 of file server/StreamManager.h.
int RioStream::m_ClientBufferSize [private] |
Definition at line 283 of file server/StreamManager.h.
bool RioStream::m_FinishedMovie [private] |
Definition at line 287 of file server/StreamManager.h.
double RioStream::m_FirstArrivalTime [private] |
Definition at line 268 of file server/StreamManager.h.
uint RioStream::m_Id [private] |
Definition at line 279 of file server/StreamManager.h.
struct timeval RioStream::m_LastArrivalTime [private] |
Definition at line 264 of file server/StreamManager.h.
ofstream RioStream::m_log |
Definition at line 233 of file server/StreamManager.h.
ofstream RioStream::m_log_requests |
Definition at line 233 of file server/StreamManager.h.
int RioStream::m_MaxRequests [private] |
Definition at line 253 of file server/StreamManager.h.
u32 RioStream::m_MinimumBlockToSend [private] |
Definition at line 290 of file server/StreamManager.h.
struct timeval RioStream::m_NextRelease [private] |
Definition at line 256 of file server/StreamManager.h.
int RioStream::m_nQueue [private] |
Definition at line 255 of file server/StreamManager.h.
int RioStream::m_nRequests [private] |
Definition at line 252 of file server/StreamManager.h.
PSTREAMLIST RioStream::m_PosAtClientsList [private] |
Definition at line 280 of file server/StreamManager.h.
bool RioStream::m_PrefetchedAllBlocks [private] |
Definition at line 286 of file server/StreamManager.h.
CRequestQueue RioStream::m_Queue [private] |
Definition at line 261 of file server/StreamManager.h.
u64 RioStream::m_QueuedCount [private] |
Definition at line 260 of file server/StreamManager.h.
SOCKADDR_IN RioStream::m_RemoteAddress [private] |
Definition at line 249 of file server/StreamManager.h.
u64 RioStream::m_RequestCount [private] |
Definition at line 259 of file server/StreamManager.h.
PSEQBLOCKLIST RioStream::m_RequestList [private] |
Definition at line 278 of file server/StreamManager.h.
double RioStream::m_SampleAdmissionProcessTime [private] |
Definition at line 273 of file server/StreamManager.h.
double RioStream::m_SampleCACTime [private] |
Definition at line 272 of file server/StreamManager.h.
double RioStream::m_SampleGenReqListTime [private] |
Definition at line 269 of file server/StreamManager.h.
double RioStream::m_SampleSimulationTime [private] |
Definition at line 271 of file server/StreamManager.h.
double RioStream::m_SampleSortListTime [private] |
Definition at line 270 of file server/StreamManager.h.
double RioStream::m_SampleWaitingTime [private] |
Definition at line 274 of file server/StreamManager.h.
int RioStream::m_ServerBufferStatus [private] |
Definition at line 284 of file server/StreamManager.h.
bool RioStream::m_StartingToSend [private] |
Definition at line 289 of file server/StreamManager.h.
StreamStatus RioStream::m_Status [private] |
Definition at line 250 of file server/StreamManager.h.
struct timeval RioStream::m_TimeToBuffer [private] |
Definition at line 266 of file server/StreamManager.h.
int RioStream::m_TokensToDisk [private] |
Definition at line 288 of file server/StreamManager.h.
struct timeval RioStream::m_TokensToDiskClock [private] |
Definition at line 265 of file server/StreamManager.h.
int RioStream::m_TokensToSend [private] |
Definition at line 285 of file server/StreamManager.h.
int RioStream::m_TotalBlocks [private] |
Definition at line 276 of file server/StreamManager.h.
RioTrafficType RioStream::m_Type [private] |
Definition at line 251 of file server/StreamManager.h.
int RioStream::m_UserMaxRequests [private] |
Definition at line 254 of file server/StreamManager.h.
int RioStream::m_WaitingQueueSize [private] |
Definition at line 275 of file server/StreamManager.h.
pthread_mutex_t RioStream::MutexBlock [private] |
Definition at line 295 of file server/StreamManager.h.
pthread_mutex_t RioStream::MutexMinBlock [private] |
Definition at line 294 of file server/StreamManager.h.
pthread_mutex_t RioStream::MutexNextBlockToBeFetched [private] |
Definition at line 293 of file server/StreamManager.h.
pthread_mutex_t RioStream::MutexTokensToDisk [private] |
Definition at line 292 of file server/StreamManager.h.
pthread_mutex_t RioStream::MutexTokensToSend [private] |
Definition at line 291 of file server/StreamManager.h.
pthread_mutex_t RioStream::MutexUpdateEvent [private] |
Definition at line 296 of file server/StreamManager.h.
RioStream* RioStream::Next [private] |
Definition at line 147 of file server/StreamManager.h.
RioStream* RioStream::Previous [private] |
Definition at line 148 of file server/StreamManager.h.
RioStreamDirection RioStream::s_Direction [private] |
Definition at line 247 of file server/StreamManager.h.
CStreamManager* RioStream::s_mgr [private] |
Definition at line 243 of file server/StreamManager.h.
double RioStream::s_Rate [private] |
Definition at line 248 of file server/StreamManager.h.
int RioStream::s_streamid [private] |
Definition at line 245 of file server/StreamManager.h.