00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "Router.h"
00025 #include "RouterTypes.h"
00026
00027 #include "ObjMapMgr.h"
00028 #include "td4list.h"
00029 #include "NATMap.h"
00030
00031 #include "RioError.h"
00032
00033 #include "DiskMgr.h"
00034 #include "StreamManager.h"
00035 #include "SystemManager.h"
00036 #include "MonitorTable.h"
00037
00038 #include <string.h>
00039 #include <errno.h>
00040 #include <pthread.h>
00041
00042 #include <stdio.h>
00043 #include "timer.h"
00044 #include <math.h>
00045
00046 #include <sys/socket.h>
00047 #include <netinet/in.h>
00048 #include <arpa/inet.h>
00049
00050
00051 #include <unistd.h>
00052 #include <sys/syscall.h>
00053
00054 extern CTimer Timer;
00055 double getInterval( struct timeval initial, struct timeval final );
00056
00057 #ifdef __QUEUE_RESP_LOG
00058 const char* const LOGRESP = "RouterEstimatedResponseTime.log";
00059 const char* const LOGEQT = "RouterEstimatedQueueTime.log";
00060 const char* const LOGSQT = "RouterSampleQueueTime.log";
00061 #endif
00062
00063
00064 extern CEventManager EventManager;
00065
00066
00067
00068 CRouter::CRouter()
00069 {
00070 m_UpdatedEstimatedDiskServiceTime = 0;
00071 pthread_mutex_init( &m_MutexUpdated, NULL );
00072 pthread_cond_init( &m_ConditionUpdated, NULL );
00073
00074
00075
00076 m_thread = 0;
00077
00078 m_initialized = false;
00079 m_started = false;
00080 m_DiskMgr = NULL;
00081 m_SystemManager = NULL;
00082 m_nDisks = 0;
00083 m_BlockSize = 0;
00084 m_MaxPending = 0;
00085 m_MaxQueueSize = 0;
00086
00087 m_RTqueue = NULL;
00088 m_NRTqueue = NULL;
00089 m_Pending = NULL;
00090 m_nRT = NULL;
00091 m_nNRT = NULL;
00092 m_nPending = NULL;
00093
00094
00095 m_TempDiskRequest = NULL;
00096
00097 m_EstimatedParameter = 0;
00098 for( int i = 0; i < 100; i++ )
00099 {
00100 for( int j = 0; j < 301; j++ )
00101 {
00102
00103 m_EstDiskServiceTime [ i ][ j ] = 0;
00104 m_EstDiskResponseTime[ i ][ j ] = 0;
00105 m_DevDiskResponseTime[ i ][ j ] = 0;
00106
00107
00108 m_EstDiskQueueTime[ i ][ j ] = 0;
00109 }
00110
00111 m_EstDiskQueueTimeOfDisk [ i ] = 0;
00112 m_EstDiskServiceTimeOfDisk [ i ] = 0;
00113 m_EstDiskResponseTimeOfDisk[ i ] = 0;
00114 m_DevDiskResponseTimeOfDisk[ i ] = 0;
00115 }
00116 m_CollectMeasures = false;
00117
00118
00119
00120
00121
00122 m_StoragesStatus = 0;
00123
00124
00125 pthread_mutex_init( &m_StoragesStatusMutex, NULL );
00126 }
00127
00128
00129 CRouter::~CRouter()
00130 {
00131 CleanUp();
00132
00133
00134
00135 pthread_mutex_destroy( &m_StoragesStatusMutex );
00136 }
00137
00138
00139 void CRouter::CleanUp()
00140 {
00141 m_initialized = false;
00142 m_started = false;
00143
00144 m_DiskMgr = 0;
00145 m_nDisks = 0;
00146 m_BlockSize = 0;
00147 m_MaxPending = 0;
00148 m_MaxQueueSize = 0;
00149
00150
00151
00152
00153 if( m_RTqueue != 0 )
00154 {
00155 delete[] m_RTqueue;
00156 m_RTqueue = 0;
00157 }
00158
00159 if( m_NRTqueue != 0 )
00160 {
00161 delete[] m_NRTqueue;
00162 m_NRTqueue = 0;
00163 }
00164
00165 if( m_Pending != 0 )
00166 {
00167 delete[] m_Pending;
00168 m_Pending = 0;
00169 }
00170
00171 if( m_nRT != 0 )
00172 {
00173 delete[] m_nRT;
00174 m_nRT = 0;
00175 }
00176
00177 if( m_nNRT != 0 )
00178 {
00179 delete[] m_nNRT;
00180 m_nNRT = 0;
00181 }
00182
00183 if( m_nPending != 0 )
00184 {
00185 delete[] m_nPending;
00186 m_nPending = 0;
00187 }
00188
00189 if( m_TempDiskRequest != 0 )
00190 {
00191 delete[] m_TempDiskRequest;
00192 m_TempDiskRequest = 0;
00193 }
00194 }
00195
00196
00197
00198
00199 int CRouter::Initialize( RouterConfig * Config )
00200 {
00201 int i;
00202
00203 if( m_initialized )
00204 {
00205 RioErr << "Router.Initialize: already initialized" << endl;
00206 return ERROR_ROUTER + ERROR_INITIALIZED;
00207 }
00208
00209
00210 m_nDisks = Config->nDisks;
00211
00212
00213 m_BlockSize = Config->BlockSize;
00214
00215
00216 m_MaxPending = Config->MaxPending;
00217
00218
00219 m_CollectMeasures = Config->CollectMeasures;
00220
00221 m_EstimatedParameter = Config->EstimatedTimeParameter;
00222 m_UpdatedEstimatedDiskServiceTime = 0;
00223
00224 #ifdef __QUEUE_RESP_LOG
00225 if( m_CollectMeasures )
00226 {
00227
00228
00229 char LogFileName[ MaxPathSize ];
00230
00231 strcpy( LogFileName, Config->LogsDirectory );
00232 strcat( LogFileName, LOGRESP );
00233 m_logRESP.open( LogFileName );
00234 strcpy( LogFileName, Config->LogsDirectory );
00235 strcat( LogFileName, LOGSQT );
00236 m_logSQT.open ( LogFileName );
00237 m_logRESP << "#Measures in msec" << endl;
00238 m_logSQT << "#Measures in msec" << endl;
00239 }
00240 #endif
00241
00242
00243 m_DiskMgr = Config->DiskManager;
00244 m_SystemManager = Config->SystemManager;
00245
00246
00247
00248
00249 m_RTqueue = new CDiskRequestQueue[m_nDisks];
00250 if( m_RTqueue == 0 )
00251 {
00252 RioErr << "Router.Initialize: Out of memory" << endl;
00253 CleanUp();
00254 return ERROR_ROUTER + ERROR_MEMORY;
00255 }
00256
00257 m_NRTqueue = new CDiskRequestQueue[m_nDisks];
00258 if( m_NRTqueue == 0 )
00259 {
00260 RioErr << "Router.Initialize: Out of memory" << endl;
00261 CleanUp();
00262 return ERROR_ROUTER + ERROR_MEMORY;
00263 }
00264
00265 m_Pending = new CDiskRequestQueue[m_nDisks];
00266 if( m_Pending == 0 )
00267 {
00268 RioErr << "Router.Initialize: Out of memory" << endl;
00269 CleanUp();
00270 return ERROR_ROUTER + ERROR_MEMORY;
00271 }
00272 m_nRT = new int [m_nDisks];
00273 if( m_nRT == 0 )
00274 {
00275 RioErr << "Router.Initialize: Out of memory" << endl;
00276 CleanUp();
00277 return ERROR_ROUTER + ERROR_MEMORY;
00278 }
00279
00280 m_nNRT = new int [m_nDisks];
00281 if( m_nNRT == 0 )
00282 {
00283 RioErr << "Router.Initialize: Out of memory" << endl;
00284 CleanUp();
00285 return ERROR_ROUTER + ERROR_MEMORY;
00286 }
00287
00288 m_nPending = new int [m_nDisks];
00289 if( m_nPending == 0 )
00290 {
00291 RioErr << "Router.Initialize: Out of memory" << endl;
00292 CleanUp();
00293 return ERROR_ROUTER + ERROR_MEMORY;
00294 }
00295
00296 m_MaxQueueSize = ( Config->MaxDataRequests * MaxReplications );
00297
00298 for( i = 0; i < m_nDisks; i++ )
00299 {
00300
00301
00302 m_nRT[i] = m_MaxQueueSize + 1;
00303 m_nNRT[i] = 0;
00304 m_nPending[i] = 0;
00305 }
00306
00307 m_TempDiskRequest = new StrDiskRequest* [MaxReplications];
00308 if( m_TempDiskRequest == 0 )
00309 {
00310 RioErr << "Router.Initialize: Out of memory" << endl;
00311 CleanUp();
00312 return ERROR_ROUTER + ERROR_MEMORY;
00313 }
00314
00315
00316
00317 unsigned int maxrequests = 0xffff;
00318
00319 #ifdef RIO_DEBUG2
00320 RioErr << "Router: Max number of Requests " << maxrequests << endl;
00321 #endif
00322
00323
00324 int hResult;
00325 hResult = m_Request.Initialize( maxrequests );
00326 if( hResult )
00327 {
00328 RioErr << "Could not initialize m_Requests at Router"
00329 << ". (" << maxrequests << ")" << endl;
00330 return hResult;
00331 }
00332
00333 m_initialized = true;
00334
00335 return S_OK;
00336 }
00337
00338 int CRouter::Start()
00339 {
00340 pthread_attr_t attrib;
00341
00342 if( !m_initialized )
00343 {
00344 RioErr << "Router.Start: not initiallized" << endl;
00345 return ERROR_ROUTER + ERROR_NOT_INITIALIZED;
00346 }
00347 if( m_started )
00348 {
00349 RioErr << "Router.Start: already started" << endl;
00350 return ERROR_ROUTER + ERROR_STARTED;
00351 }
00352
00353 pthread_attr_init( &attrib );
00354 pthread_attr_setstacksize( &attrib, 3*PTHREAD_STACK_MIN );
00355
00356 if( pthread_create( &m_thread, &attrib, &routerthreadep, (void *)this ) )
00357 {
00358 RioErr << "Router.Start pthread_create failed "
00359 << strerror(errno) << endl;
00360 return (ERROR_ROUTER + ERROR_CREATE_THREAD);
00361 }
00362
00363 m_started = true;
00364 return S_OK;
00365 }
00366
00367 int CRouter::Stop()
00368 {
00369 Event *event;
00370
00371 if( !m_started )
00372 {
00373 RioErr << "Router.Stop: not started" << endl;
00374 return( ERROR_ROUTER + ERROR_NOT_STARTED );
00375 }
00376
00377
00378 event = EventManager.New( EventTypeFinalizeThread );
00379 Put( event );
00380
00381
00382
00383
00384 pthread_join( m_thread, NULL );
00385
00386 m_started = false;
00387 return S_OK;
00388 }
00389
00390
00391 void *CRouter::routerthreadep( void *parm )
00392 {
00393 ( (CRouter *) parm)->routerthread();
00394 return 0;
00395 }
00396
00397
00398 void CRouter::routerthread()
00399 {
00400 Event* event;
00401
00402 RioErr << "ROUTERTHREADID " << syscall( SYS_gettid ) << endl;
00403
00404 while( 1 )
00405 {
00406 event = Get();
00407
00408 #ifdef RIO_DEBUG2
00409 RioErr << "CRouter::routerthread processando o evento com o tipo "
00410 << event->Type << endl;
00411 #endif
00412
00413 switch( event->Type )
00414 {
00415 case EventTypeRTDataRequest:
00416 case EventTypeNRTDataRequest:
00417 DataReq( event );
00418 break;
00419
00420 case EventTypeStorageReply:
00421
00422 StorageReply(& (((EventStorageReply*)(event))->StorageReply ));
00423 EventManager.Free(event);
00424 break;
00425
00426 case EventTypeAddDisk:
00427 AddDisk((( EventAddDisk* )( event ))->Disk );
00428 EventManager.Free(event);
00429 break;
00430
00431
00432
00433
00434 case EventTypeStorageDown:
00435
00436 #ifdef RIO_DEBUG2
00437 RioErr << "CRouter::routerthread recebida a mensagem "
00438 << "EventTypeStorageDown da classe DiskMgr informando "
00439 << "que o servidor de armazanamento com a ID "
00440 << ( ( EventStorageDown* ) ( event ) )->StorageId
00441 << " parou de funcionar" << endl;
00442 #endif
00443
00444 StorageDown( ( ( EventStorageDown* ) ( event ) )->StorageId,
00445 ( ( EventStorageDown* ) ( event ) )->
00446 EmptyStorageQueue );
00447 EventManager.Free( event );
00448 break;
00449
00450 case EventTypeStorageUp:
00451
00452 #ifdef RIO_DEBUG2
00453 RioErr << "CRouter::routerthread recebida a mensagem "
00454 << "EventTypeStorageUp da classe DiskMgr informando "
00455 << "que o servidor de armazanamento com a ID "
00456 << ( ( EventStorageDown* ) ( event ) )->StorageId
00457 << " voltou a funcionar" << endl;
00458 #endif
00459
00460 StorageUp( ( ( EventStorageUp* )( event ) )->StorageId );
00461 EventManager.Free( event );
00462 break;
00463
00464
00465 case EventTypeFinalizeThread:
00466 #ifdef RIO_DEBUG2
00467 RioErr << "CRouter::routerthread recebida a mensagem "
00468 << "EventTypeFinalizeThread informando que a thread "
00469 << "deve ser finalizada" << endl;
00470 #endif
00471 EventManager.Free( event );
00472
00473 return;
00474 break;
00475
00476 default:
00477 RioErr << "Router.Routerthread unknown element type "
00478 << (int *) event->Type << endl;
00479 EventManager.Free(event);
00480 }
00481 }
00482 }
00483
00484
00485
00486
00487 void CRouter::DataPrefetch( Event *event )
00488 {
00489 NATData ClientAddr, ServerAddr;
00490 int storageid;
00491
00492
00493
00494
00495 unsigned totalStorages;
00496
00497 DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00498 StrDiskRequest* m_TempDiskRequest = m_Request.New();
00499
00500
00501 ClientAddr.nat_addr = Request.Target.IPaddress;
00502 ClientAddr.nat_port = Request.Target.Port;
00503
00504 if( m_TempDiskRequest == 0 )
00505 {
00506 RioErr << "Router ERROR: Too many disk requests (UNEXPECTED)" << endl;
00507 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00508 RequestCompleted( &Request.streamobj, event );
00509 return;
00510 }
00511
00512 if( !CheckStorageStatus( m_TempDiskRequest->Disk ) )
00513 {
00514 RioErr << "Router ERROR (DataPrefetch): Cannot access disk: storage "
00515 << "is down" << endl;
00516 Request.Status = ERROR_ROUTER + ERROR_SERVICE_TEMPORARY_UNAVAILABLE;
00517 RequestCompleted( &Request.streamobj, event );
00518 return;
00519 }
00520
00521
00522
00523
00524
00525
00526 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
00527
00528
00529 storageid = ( m_TempDiskRequest->Disk - 1 ) / SNode::sn_maxdisks + 1;
00530
00531
00532
00533
00534
00535
00536
00537
00538 if( ( ( Request.Operation == NonRealTimeRead ) ||
00539 ( Request.Operation == NonRealTimeWrite ) ) &&
00540 ( FindNATMapping( ClientAddr, storageid + totalStorages + 1 ) ) )
00541 {
00542 #ifdef RIO_DEBUG2
00543 RioErr << "Router:DataPrefetch usando o indice "
00544 << storageid + totalStorages + 1 << " ao inves do indice "
00545 << storageid << "." << endl;
00546 #endif
00547
00548 storageid = storageid + totalStorages + 1;
00549 }
00550 #ifdef RIO_DEBUG2
00551 else
00552 RioErr << "Router:DataPrefetch o novo indice "
00553 << storageid + totalStorages + 1 << " nao foi usado. "
00554 << "Request.Operation = " << Request.Operation << "." << endl;
00555 #endif
00556
00557
00558
00559
00560 ServerAddr = GetNATMapping( ClientAddr, storageid );
00561 if( ( ServerAddr.nat_addr == 0 ) && ( ServerAddr.nat_port == 0 ) )
00562 {
00563 RioErr << "Router ERROR (DataPrefetch): Cannot access disk: storage "
00564 << "is disabled for this client" << endl;
00565 Request.Status = ERROR_ROUTER + ERROR_SERVICE_TEMPORARY_UNAVAILABLE;
00566 RequestCompleted( &Request.streamobj, event );
00567 return;
00568 }
00569
00570
00571 #ifdef RIO_DEBUG2
00572 RioErr << "Router:DataPrefetch => block " << Request.Block
00573 << " disk " << Request.Reps[0].disk << endl;
00574 #endif
00575
00576 m_TempDiskRequest->Disk = Request.Reps[0].disk;
00577 m_TempDiskRequest->Pos = (((u64) Request.Reps[0].block ) * m_BlockSize );
00578 m_TempDiskRequest->StorageId = 0xffffffff;
00579 m_TempDiskRequest->Size = m_BlockSize;
00580 m_TempDiskRequest->event = ( EventDataRequest* ) event;
00581
00582 if( m_CollectMeasures )
00583 {
00584
00585 Request.QueueSize = m_nRT[m_TempDiskRequest->Disk];
00586 }
00587 if( m_nPending[m_TempDiskRequest->Disk] < m_MaxPending )
00588 {
00589 m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00590
00591 m_nPending[m_TempDiskRequest->Disk]++;
00592 m_nRT[m_TempDiskRequest->Disk]++;
00593 SendStorage(m_TempDiskRequest);
00594 }
00595
00596 else
00597 {
00598 m_RTqueue[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00599 m_nRT[m_TempDiskRequest->Disk]++;
00600 }
00601
00602 char name[1025];
00603 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name );
00604
00605 RioStorageNodeInfo sn_info;
00606 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info );
00607
00608 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ]
00609 + m_nNRT[ m_TempDiskRequest->Disk ]
00610 + m_nPending[ m_TempDiskRequest->Disk ];
00611
00612 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( sn_info.Hostname, name,
00613 queue_sum );
00614 m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
00615 }
00616
00617
00618
00619
00620 void CRouter::DataSendToClient( Event *event )
00621 {
00622 NATData ClientAddr, ServerAddr;
00623 bool StorageUp, StorageEnabled;
00624 int storageid;
00625
00626
00627
00628
00629 unsigned int totalStorages;
00630
00631 DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00632 StrDiskRequest* m_TempDiskRequest = m_Request.Get( Request.BufferId );
00633
00634 StorageUp = CheckStorageStatus( m_TempDiskRequest->Disk );
00635
00636
00637 ClientAddr.nat_addr = Request.Target.IPaddress;
00638 ClientAddr.nat_port = Request.Target.Port;
00639
00640
00641
00642
00643
00644
00645 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
00646
00647
00648 storageid = ( m_TempDiskRequest->Disk - 1 ) / SNode::sn_maxdisks + 1;
00649
00650
00651
00652
00653
00654
00655
00656
00657 if( ( ( Request.Operation == NonRealTimeRead ) ||
00658 ( Request.Operation == NonRealTimeWrite ) ) &&
00659 ( FindNATMapping( ClientAddr, storageid + totalStorages + 1 ) ) )
00660 {
00661 #ifdef RIO_DEBUG2
00662 RioErr << "Router:DataSendToClient usando o indice "
00663 << storageid + totalStorages + 1 << " ao inves do indice "
00664 << storageid << "." << endl;
00665 #endif
00666
00667 storageid = storageid + totalStorages + 1;
00668 }
00669 #ifdef RIO_DEBUG2
00670 else
00671 RioErr << "Router:DataSendToClient o novo indice "
00672 << storageid + totalStorages + 1 << " nao foi usado. "
00673 << "Request.Operation = " << Request.Operation << "." << endl;
00674 #endif
00675
00676
00677
00678 ServerAddr = GetNATMapping( ClientAddr, storageid );
00679 StorageEnabled = ( ( ServerAddr.nat_addr != 0 ) ||
00680 ( ServerAddr.nat_port != 0 ) );
00681
00682 if( ( m_TempDiskRequest ) && ( StorageUp ) && ( StorageEnabled ) )
00683 {
00684
00685 #ifdef RIO_DEBUG2
00686 RioErr << "Router:DataSendToClient => block "<< Request.Block
00687 << " bufferid " << Request.BufferId
00688 << " disk " << m_TempDiskRequest->Disk << endl;
00689 #endif
00690
00691
00692 Request.streamobj->Stream()->decServerBufferStatus();
00693
00694 Request.streamobj->Stream()->decTokensToSend();
00695
00696 m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00697
00698 if( m_CollectMeasures )
00699 {
00700
00701 Request.QueueSize = -1;
00702 }
00703 m_nPending[m_TempDiskRequest->Disk]++;
00704 SendStorage(m_TempDiskRequest);
00705
00706 char name[1025];
00707 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name );
00708
00709 RioStorageNodeInfo sn_info;
00710 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info );
00711
00712 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ]
00713 + m_nNRT[ m_TempDiskRequest->Disk ]
00714 + m_nPending[ m_TempDiskRequest->Disk ];
00715
00716 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( sn_info.Hostname, name,
00717 queue_sum );
00718 m_SystemManager->PostITEvent( (MonitorEvent *) it_event);
00719 }
00720 else if( m_TempDiskRequest == 0 )
00721 {
00722 RioErr << "ERROR: Router:DataSendToClient => Could not find request to send"
00723 << endl;
00724 }
00725 else if( StorageEnabled )
00726 {
00727 RioErr << "Router ERROR (DataSendToClient): Cannot access disk: "
00728 << "storage is down" << endl;
00729 }
00730 else
00731 {
00732 RioErr << "Router ERROR (DataSendToClient): Cannot access disk: "
00733 << "storage is disabled for this client" << endl;
00734 }
00735
00736 }
00737
00738
00739 void CRouter::DataCancel( Event *event )
00740 {
00741 DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00742
00743 #ifdef RIO_DEBUG2
00744 RioErr << "Router:DataCancel => block "<< Request.Block
00745 << " BufferId " << Request.BufferId << endl;
00746 #endif
00747
00748 if( Request.RequestCounter == -1 )
00749 {
00750 #ifdef RIO_DEBUG2
00751 RioErr << "Router:DataCancel => did not request this block "
00752 << "yet, so just free it" << endl;
00753 #endif
00754
00755
00756 Request.Status = 0;
00757 RequestCompleted( &Request.streamobj, event );
00758 }
00759 else
00760 {
00761 StrDiskRequest* m_TempDiskRequest = m_Request.Get( Request.BufferId );
00762
00763 if( m_TempDiskRequest )
00764 {
00765 if( ( m_TempDiskRequest->StorageId != 0 ) &&
00766 ( CheckStorageStatus( m_TempDiskRequest->Disk ) ) )
00767 {
00768 if( m_CollectMeasures )
00769 {
00770
00771 Request.QueueSize = m_nRT[m_TempDiskRequest->Disk];
00772 }
00773
00774 if( m_nPending[m_TempDiskRequest->Disk] < m_MaxPending )
00775 {
00776 m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00777
00778 m_nPending[m_TempDiskRequest->Disk]++;
00779 m_nRT[m_TempDiskRequest->Disk]++;
00780 SendStorage(m_TempDiskRequest);
00781 }
00782
00783 else
00784 {
00785 m_RTqueue[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00786 m_nRT[m_TempDiskRequest->Disk]++;
00787 }
00788 char name[1025];
00789 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name );
00790
00791 RioStorageNodeInfo sn_info;
00792 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info );
00793
00794 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ]
00795 + m_nNRT[ m_TempDiskRequest->Disk ]
00796 + m_nPending[ m_TempDiskRequest->Disk ];
00797
00798 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
00799 sn_info.Hostname, name, queue_sum );
00800 m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
00801 }
00802 else
00803 {
00804 RioErr << " Router:Datacancel => StorageId == 0 "
00805 << "(RequestCompleted) or storage is down" << endl;
00806 m_Request.Free(m_TempDiskRequest);
00807 RequestCompleted( &Request.streamobj, event );
00808 }
00809 }
00810 else if ( m_TempDiskRequest->StorageId == 0 )
00811 {
00812 RioErr << "ERROR: Router:Datacancel => Could not find resquest to cancel"
00813 << endl;
00814 }
00815 else
00816
00817 {
00818 RioErr << "ERROR: Router:Datacancel => Could not send message to "
00819 << "storage to cancel the resquest" << endl;
00820 }
00821 }
00822 }
00823
00824
00825 void CRouter::DataReq( Event *event )
00826 {
00827 int i, j, rc;
00828
00829
00830 DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00831
00832 #ifdef RIO_DEBUG2
00833 struct in_addr clientip;
00834 clientip.s_addr = Request.Target.IPaddress;
00835 RioErr << "No DataReq do Router: "
00836 << "IP " << inet_ntoa(clientip)
00837 << " port " << ntohs( Request.Target.Port )
00838 << " reqid " << Request.Reqid
00839 << " block " << Request.Block
00840 << endl;
00841 #endif
00842
00843 Request.streamobj->Stream()->getMutexUpdateEvent();
00844
00845 if( Request.Operation == RealTimePrefetchBlock )
00846 {
00847 DataPrefetch( event );
00848 Request.streamobj->Stream()->releaseMutexUpdateEvent();
00849 }
00850 else if( Request.Operation == RealTimeCancelBlock )
00851 {
00852 DataCancel( event );
00853 Request.streamobj->Stream()->releaseMutexUpdateEvent();
00854 }
00855 else if( Request.Operation == RealTimeSendBlock )
00856 {
00857 DataSendToClient( event );
00858 Request.streamobj->Stream()->releaseMutexUpdateEvent();
00859 }
00860
00861 else
00862 {
00863 Request.streamobj->Stream()->releaseMutexUpdateEvent();
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876 rc = Request.rioobject->MapBlock( Request.Block, Request.RepBits,
00877 &Request.RepNum, Request.Reps );
00878
00879 if( rc )
00880 {
00881
00882
00883
00884 RioErr << "Router ERROR: Failed mapping logical block." << endl;
00885
00886 Request.Status = ERROR_ROUTER + ERROR_INVALID_BLOCK;
00887 RequestCompleted( &Request.streamobj, event );
00888 return;
00889 }
00890
00891
00892 int nDiskRequests;
00893 if( Request.Operation == NonRealTimeWrite )
00894 {
00895
00896 nDiskRequests = Request.RepNum;
00897 }
00898 else if( ( Request.Operation == RealTimeRead ) ||
00899 ( Request.Operation == NonRealTimeRead ))
00900 {
00901
00902 nDiskRequests = 1;
00903 }
00904 else
00905 {
00906 RioErr << "Router ERROR: Got data request with invalid operation: "
00907 << (int) Request.Operation << endl;
00908 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00909 RequestCompleted( &Request.streamobj, event );
00910 return;
00911 }
00912
00913 if( nDiskRequests > MaxReplications )
00914 {
00915 RioErr << "Router ERROR: Got data request with too many replicas: "
00916 << nDiskRequests << endl;
00917 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00918 RequestCompleted( &Request.streamobj, event );
00919 return;
00920 }
00921
00922
00923 for( i = 0; i < nDiskRequests; i++ )
00924 {
00925 m_TempDiskRequest[i] = m_Request.New();
00926
00927
00928
00929 if( m_TempDiskRequest[i] == 0 )
00930 {
00931
00932
00933 for( j = 0; j < i; j++ )
00934 {
00935 m_Request.Free( m_TempDiskRequest[j] );
00936 }
00937 RioErr << "Router ERROR: Too many disk requests (UNEXPECTED)"
00938 << endl;
00939 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00940 RequestCompleted( &Request.streamobj, event );
00941 return;
00942 }
00943 }
00944
00945
00946 if( ( Request.Operation == RealTimeRead ) ||
00947 ( Request.Operation == NonRealTimeRead ))
00948 {
00949
00950 int Min = m_MaxQueueSize + 1;
00951 int Rep = -1;
00952 int nrep = Request.RepNum;
00953 int dk;
00954 int nQueue;
00955 int storageid;
00956
00957
00958 unsigned int totalStorages;
00959
00960
00961 NATData input( Request.Target.IPaddress, Request.Target.Port );
00962
00963
00964
00965 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
00966
00967 for( i = 0; i < nrep ; i++ )
00968 {
00969
00970
00971 dk = Request.Reps[i].disk;
00972 if( CheckStorageStatus( dk ) )
00973 {
00974
00975 storageid = ( dk - 1 ) / ( SNode::sn_maxdisks ) + 1;
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988
00989
00990
00991 if( ( Request.Operation == NonRealTimeRead ) &&
00992 ( FindNATMapping( input, storageid +
00993 totalStorages + 1 ) ) )
00994 {
00995 #ifdef RIO_DEBUG2
00996 RioErr << "CRouter::DataReq usando o indice "
00997 << storageid + totalStorages + 1 << " ao inves "
00998 << "do indice " << storageid << "." << endl;
00999 #endif
01000
01001 storageid = storageid + totalStorages + 1;
01002 }
01003 #ifdef RIO_DEBUG2
01004 else
01005 RioErr << "Router::DataReq o novo indice "
01006 << storageid + totalStorages + 1 << " nao foi "
01007 << "usado. Request.Operation = "
01008 << Request.Operation << "." << endl;
01009 #endif
01010
01011 NATData result = m_NAT_map.getElement( input, storageid );
01012 if( ( result.nat_addr != -1 ) || ( result.nat_port != 0 ) )
01013 {
01014 if( Request.Operation == RealTimeRead )
01015 nQueue = m_nRT[dk];
01016 else
01017 nQueue = m_nNRT[dk] + m_nRT[dk];
01018 if( nQueue < Min )
01019 {
01020 Min = nQueue;
01021 Rep = i;
01022
01023 }
01024 }
01025 #ifdef RIO_DEBUG2
01026 else
01027 RioErr << "Router WARNING: storage with ID "
01028 << ( ( dk - 1 ) / SNode::sn_maxdisks )
01029 << " cannot be used" << endl;
01030 #endif
01031 }
01032 #ifdef RIO_DEBUG2
01033 else
01034 RioErr << "Router WARNING: storage with ID "
01035 << ( ( dk - 1 ) / SNode::sn_maxdisks )
01036 << " is down" << endl;
01037 #endif
01038 }
01039
01040 if( Rep == -1 )
01041 {
01042
01043 RioErr << "Router ERROR: Unavailable disk on read block "
01044 << Request.Block << " with reqid " << Request.Reqid
01045 << endl;
01046 m_Request.Free( m_TempDiskRequest[0] );
01047 Request.Status = ERROR_ROUTER + ERROR_INVALID_DISK;
01048 RequestCompleted( &Request.streamobj, event );
01049 return;
01050 }
01051
01052 m_TempDiskRequest[0]->Disk = Request.Reps[Rep].disk;
01053 m_TempDiskRequest[0]->Pos =
01054 ( ( (u64) Request.Reps[Rep].block ) * m_BlockSize );
01055 }
01056
01057 else
01058 {
01059 int nQueue;
01060 int dk;
01061 int storageid;
01062
01063
01064 unsigned int totalStorages;
01065
01066
01067 NATData input( Request.Target.IPaddress, Request.Target.Port );
01068
01069
01070
01071 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
01072
01073 for( i = 0; i < nDiskRequests ; i++ )
01074 {
01075 dk = Request.Reps[i].disk;
01076 if( Request.Operation == NonRealTimeWrite )
01077 nQueue = m_nNRT[dk] + m_nRT[dk];
01078 else
01079 nQueue = m_nRT[dk];
01080
01081
01082 storageid = ( dk - 1 ) / ( SNode::sn_maxdisks ) + 1;
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099 if( ( Request.Operation == NonRealTimeWrite ) &&
01100 ( FindNATMapping( input, storageid + totalStorages + 1 ) ) )
01101 {
01102 #ifdef RIO_DEBUG2
01103 RioErr << "CRouter::DataReq usando o indice "
01104 << storageid + totalStorages + 1 << " ao inves do "
01105 << "indice " << storageid << "." << endl;
01106 #endif
01107
01108 storageid = storageid + totalStorages + 1;
01109 }
01110 #ifdef RIO_DEBUG2
01111 else
01112 RioErr << "Router::DataReq o novo indice "
01113 << storageid + totalStorages + 1 << " nao foi "
01114 << "usado. Request.Operation = "
01115 << Request.Operation << "." << endl;
01116 #endif
01117
01118 NATData result = m_NAT_map.getElement( input, storageid );
01119
01120 if( ( nQueue > m_MaxQueueSize ) ||
01121 ( !CheckStorageStatus( dk ) ) ||
01122 ( ( result.nat_addr == -1 ) && ( result.nat_port == 0 ) ) )
01123 {
01124 RioErr << "Router ERROR: Unavailable disk " << dk
01125 << " on write block " << Request.Block
01126 << " with reqid " << Request.Reqid << endl;
01127 RioErr << "nQueue " << nQueue << " m_MaxQueueSize "
01128 << m_MaxQueueSize << endl;
01129 for( j = 0; j < nDiskRequests; j++ )
01130 {
01131 m_Request.Free( m_TempDiskRequest[j] );
01132 }
01133 Request.Status = ERROR_ROUTER + ERROR_INVALID_DISK;
01134 RequestCompleted( &Request.streamobj, event );
01135 return;
01136 }
01137
01138 m_TempDiskRequest[i]->Disk = dk;
01139 m_TempDiskRequest[i]->Pos =
01140 (((u64) Request.Reps[i].block ) * m_BlockSize );
01141 }
01142 }
01143
01144 Request.RequestCounter = nDiskRequests;
01145
01146
01147 Request.RequestCancelled = false;
01148
01149
01150 int dk;
01151
01152 for( i = 0; i < nDiskRequests ; i++ )
01153 {
01154
01155 #ifdef RIO_DEBUG2
01156 RioErr << "CRouter::DataReq colocando a requicao para a "
01157 << "replicacao " << i+1 << " na fila do disco "
01158 << m_TempDiskRequest[ i ]->Disk << endl;
01159 #endif
01160
01161 m_TempDiskRequest[i]->StorageId = 0xffffffff;
01162 m_TempDiskRequest[i]->Size = m_BlockSize;
01163 m_TempDiskRequest[i]->event = ( EventDataRequest* ) event;
01164 dk = m_TempDiskRequest[i]->Disk;
01165
01166
01167
01168 if( m_nPending[dk] < m_MaxPending )
01169 {
01170 m_Pending[dk].Put( m_TempDiskRequest[i] );
01171
01172 m_nPending[dk]++;
01173 if( ( Request.Operation == NonRealTimeWrite ) ||
01174 ( Request.Operation == NonRealTimeRead ))
01175 {
01176 m_nNRT[dk]++;
01177 }
01178 else
01179 {
01180 if( m_CollectMeasures )
01181 {
01182
01183 Request.QueueSize = m_nRT[dk];
01184 }
01185
01186 m_nRT[dk]++;
01187 }
01188
01189 SendStorage(m_TempDiskRequest[i]);
01190 }
01191
01192 else
01193 {
01194
01195 if( ( Request.Operation == NonRealTimeWrite ) ||
01196 ( Request.Operation == NonRealTimeRead ))
01197 {
01198 m_NRTqueue[dk].Put( m_TempDiskRequest[i]);
01199 m_nNRT[dk]++;
01200 }
01201 else
01202 {
01203 m_RTqueue[dk].Put( m_TempDiskRequest[i]);
01204 if( m_CollectMeasures )
01205 {
01206
01207 Request.QueueSize = m_nRT[dk];
01208 }
01209
01210 m_nRT[dk]++;
01211
01212 }
01213 }
01214
01215 char name[1025];
01216 m_DiskMgr->GetDiskName( dk, name );
01217
01218 RioStorageNodeInfo sn_info;
01219 m_DiskMgr->GetDiskStorageNodeInfo( dk, &sn_info );
01220
01221 int queue_sum = m_nRT[ dk ]
01222 + m_nNRT[ dk ]
01223 + m_nPending[ dk ];
01224
01225 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
01226 sn_info.Hostname, name, queue_sum );
01227 m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
01228 }
01229 }
01230 }
01231
01232
01233 void CRouter::UpdateMeasures( int disk,
01234 int nThreads,
01235 struct timeval initial_time,
01236 struct timeval final_time )
01237 {
01238 double sample_msec = getInterval( initial_time, final_time );
01239
01240 if( nThreads > m_MaxPending )
01241 {
01242 nThreads = m_MaxPending;
01243 }
01244 else if( nThreads < 1 )
01245 {
01246 RioErr << " ActiveThreads " << nThreads << endl;
01247 nThreads = 1;
01248 }
01249
01250 if( m_EstDiskResponseTime[ disk ][ nThreads ] == 0 )
01251 {
01252 m_EstDiskResponseTime[ disk ][ nThreads ] = sample_msec;
01253 }
01254 else
01255 {
01256 m_EstDiskResponseTime[ disk ][ nThreads ] =
01257 (( 1 - m_EstimatedParameter) *
01258 m_EstDiskResponseTime[ disk ][ nThreads ] ) +
01259 m_EstimatedParameter * sample_msec;
01260 }
01261
01262
01263 m_DevDiskResponseTime[ disk ][ nThreads ] = ( 1 - m_EstimatedParameter ) *
01264 m_DevDiskResponseTime[ disk ][ nThreads ] +
01265 m_EstimatedParameter *
01266 fabs( sample_msec - m_EstDiskResponseTime[ disk ][ nThreads ] );
01267
01268 if( m_EstDiskResponseTimeOfDisk[ disk ] == 0 )
01269 {
01270 m_EstDiskResponseTimeOfDisk[ disk ] = sample_msec;
01271 }
01272 else
01273 {
01274 m_EstDiskResponseTimeOfDisk[ disk ] = ( 1 - m_EstimatedParameter) *
01275 m_EstDiskResponseTimeOfDisk[ disk ] +
01276 m_EstimatedParameter * sample_msec;
01277 }
01278
01279 m_DevDiskResponseTimeOfDisk[ disk ] = ( 1 - m_EstimatedParameter ) *
01280 m_DevDiskResponseTimeOfDisk[ disk ] +
01281 m_EstimatedParameter *
01282 fabs( sample_msec - m_EstDiskResponseTimeOfDisk[ disk ]);
01283
01284 #ifdef __QUEUE_RESP_LOG
01285 m_logRESP << " disk " << disk
01286 << " sample " << sample_msec
01287 << " est " << m_EstDiskResponseTimeOfDisk[ disk ]
01288 << endl;
01289 #endif
01290 }
01291
01292
01293 void CRouter::StorageReply(MsgRSSstorage* Message)
01294 {
01295 StrDiskRequest* Request;
01296 EventDataRequest* event;
01297 DataRequest* DataRequest;
01298
01299 StrDiskRequest* NewRequest;
01300 int Disk;
01301 u16 Type;
01302 u32 Id;
01303 s32 Error;
01304 int ActiveThreads;
01305
01306
01307 switch( Message->Header.Type )
01308 {
01309 case MSG_RSS_READCOMPLETE:
01310 case MSG_RSS_WRITECOMPLETE:
01311 case MSG_RSS_RECEIVECOMPLETE:
01312 case MSG_RSS_SENDCOMPLETE:
01313 case MSG_RSS_READY:
01314 case MSG_RSS_CANCELCOMPLETE:
01315 {
01316 Type = Message->Status.Type;
01317 Id = Message->Status.RouterId;
01318 Error = Message->Status.Error;
01319 ActiveThreads = Message->Status.ActiveThreads;
01320
01321
01322 Request = m_Request.Get( Id );
01323 if( Request == 0 )
01324 {
01325 RioErr << "Router Error: Message from storage node"
01326 " has invalid Router Id " << Id << endl;
01327 return;
01328 }
01329
01330
01331 event = ( EventDataRequest* ) Request->event;
01332 DataRequest = & ( event->Request );
01333
01334
01335
01336 if( DataRequest->Status == 0 )
01337 {
01338 DataRequest->Status = Error;
01339 }
01340
01341 #ifdef RIO_DEBUG2
01342 struct in_addr clientip;
01343 clientip.s_addr = DataRequest->Target.IPaddress;
01344 RioErr << "Router:StorageReply => RouterId " << Id
01345 << " StorageId "<< Message->Status.StorageId
01346 << " Status " << DataRequest->Status
01347 << " client IP " << inet_ntoa(clientip)
01348 << " client Port " << ntohs( DataRequest->Target.Port )
01349 << " block " << DataRequest->Block
01350 << " BufferId " << DataRequest->BufferId
01351 << " Msg->Error " << Error
01352 << " RequestCounter " << DataRequest->RequestCounter
01353 << endl;
01354 #endif
01355
01356 Disk = Request->Disk;
01357
01358
01359
01360 if( !CheckStorageStatus( Disk ) )
01361 {
01362 RioErr << "Router Error: Message from storage node that is "
01363 "down " << Id << endl;
01364
01365
01366 DataRequest->RequestCounter--;
01367 return;
01368
01369 }
01370
01371 if( ( Type == MSG_RSS_READCOMPLETE ) && ( m_CollectMeasures ) )
01372 {
01373 struct timeval now;
01374 gettimeofday( &now, 0 );
01375 UpdateMeasures( Disk, ActiveThreads,
01376 Request->event->Request.ArrivalTime, now );
01377 }
01378
01379
01380
01381 m_Pending[Disk].Remove(Request);
01382
01383
01384
01385 if( (( m_nRT[Disk] + m_nNRT[Disk] ) > m_nPending[Disk] ) &&
01386 ( m_nPending[Disk] < m_MaxPending ))
01387 {
01388
01389 NewRequest = m_RTqueue[Disk].Get();
01390
01391
01392 if( NewRequest == 0 )
01393 {
01394 RioErr <<" Could not get next request from RT queue"
01395 << endl;
01396 NewRequest = m_NRTqueue[Disk].Get();
01397 }
01398 m_Pending[Disk].Put( NewRequest );
01399 SendStorage( NewRequest );
01400 }
01401
01402
01403 else
01404 {
01405 m_nPending[Disk]--;
01406 }
01407
01408
01409 if( ( DataRequest->Operation == NonRealTimeRead ) ||
01410 ( DataRequest->Operation == NonRealTimeWrite) )
01411 {
01412 m_nNRT[Disk]--;
01413 }
01414 else
01415 {
01416 if( DataRequest->Operation != RealTimeSendBlock )
01417 {
01418 m_nRT[Disk]--;
01419 }
01420 }
01421
01422 char name[1025];
01423 m_DiskMgr->GetDiskName( Disk, name );
01424
01425 RioStorageNodeInfo sn_info;
01426 m_DiskMgr->GetDiskStorageNodeInfo( Disk, &sn_info );
01427
01428 int queue_sum = m_nRT[ Disk ]
01429 + m_nNRT[ Disk ]
01430 + m_nPending[ Disk ];
01431
01432 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
01433 sn_info.Hostname, name, queue_sum );
01434 m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
01435
01436
01437
01438
01439
01440 if( ( Type == MSG_RSS_SENDCOMPLETE ) ||
01441 ( Type == MSG_RSS_WRITECOMPLETE ) ||
01442 ( Type == MSG_RSS_CANCELCOMPLETE )||
01443 ( Error != 0) )
01444 {
01445
01446
01447 if( ( DataRequest->BufferId == -1 ) &&
01448 ( Type == MSG_RSS_READCOMPLETE ) )
01449 {
01450 RioErr << "error at prefetch!! Error: "<< Error << endl;
01451
01452 DataRequest->streamobj->Stream()->getMutexUpdateEvent();
01453
01454 Request->StorageId = Message->Status.StorageId;
01455 DataRequest->BufferId = Id;
01456 #ifdef RIO_DEBUG2
01457 RioErr << "Router:StorageReply => new BufferId " << Id
01458 << " and StorageId " << Request->StorageId << endl;
01459 #endif
01460
01461 if( DataRequest->Operation == RealTimeCancelBlock )
01462 {
01463 #ifdef RIO_DEBUG2
01464 RioErr << "Block was canceled." << endl;
01465 #endif
01466 DataCancel( (Event *) event );
01467 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01468 }
01469 else
01470 {
01471
01472 DataRequest->streamobj->Stream()->NewBlockArrival();
01473
01474 if( ( DataRequest->Target.IPaddress != 0 ) &&
01475 ( DataRequest->Target.Port != 0 ))
01476 {
01477 #ifdef RIO_DEBUG2
01478 RioErr << "Block was requested by client (cancel it)."
01479 << endl;
01480 #endif
01481 DataRequest->Operation = RealTimeCancelBlock;
01482 DataCancel( (Event *) event );
01483
01484
01485 DataRequest->streamobj->Stream()->decTokensToSend();
01486
01487 DataRequest->streamobj->Stream()->decServerBufferStatus();
01488 }
01489 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01490 }
01491 }
01492
01493 else
01494 {
01495 m_Request.Free(Request);
01496
01497 #ifdef RIO_DEBUG2
01498 RioErr << "CRouter::StorageReply requisicao "
01499 << DataRequest->RequestCounter << " recebida!"
01500 << endl;
01501 #endif
01502
01503 DataRequest->RequestCounter--;
01504
01505
01506
01507
01508
01509
01510
01511 if( ( DataRequest->RequestCounter == 0 ) &&
01512 ( !DataRequest->RequestCancelled ) )
01513 {
01514
01515 #ifdef RIO_DEBUG2
01516 RioErr << "CRouter::StorageReply operation "
01517 << DataRequest->Operation << " completed "
01518 << "(DataRequest->RequestCounter == 0)" << endl;
01519 #endif
01520
01521 RequestCompleted( &DataRequest->streamobj,
01522 ( Event* ) event );
01523 }
01524 }
01525 }
01526
01527 else
01528 {
01529 if( DataRequest->BufferId == -1 )
01530 {
01531 DataRequest->streamobj->Stream()->getMutexUpdateEvent();
01532
01533 Request->StorageId = Message->Status.StorageId;
01534 DataRequest->BufferId = Id;
01535 #ifdef RIO_DEBUG2
01536 RioErr << "Router:StorageReply => new BufferId " << Id
01537 << " and StorageId " << Request->StorageId << endl;
01538 #endif
01539
01540
01541 if( DataRequest->Operation == RealTimeCancelBlock )
01542 {
01543 #ifdef RIO_DEBUG2
01544 RioErr << "Block was canceled." << endl;
01545 #endif
01546
01547 DataCancel( (Event *) event );
01548 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01549 }
01550 else
01551 {
01552
01553 DataRequest->streamobj->Stream()->NewBlockArrival();
01554
01555
01556 if( ( DataRequest->Target.IPaddress != 0 )&&
01557 ( DataRequest->Target.Port != 0))
01558 {
01559 #ifdef RIO_DEBUG2
01560 RioErr << "Block was requested by client." << endl;
01561 #endif
01562 DataRequest->Operation = RealTimeSendBlock;
01563 DataSendToClient( (Event *) event );
01564 }
01565 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01566 }
01567 }
01568
01569 }
01570 break;
01571 }
01572 default:
01573 RioErr << "Router ERROR: Received invalid message from storage node."
01574 << endl;
01575 }
01576 }
01577
01578
01579 void CRouter::AddDisk( int Disk )
01580 {
01581 RioErr << " Router: Adding disk: " << Disk << endl;
01582 if( ( Disk < 0 ) || ( Disk >= m_nDisks ) )
01583 {
01584 RioErr << "Router ERROR: Tried to add invalid disk " << Disk << endl;
01585 return;
01586 }
01587
01588 if( m_nRT[Disk] != ( m_MaxQueueSize + 1 ) )
01589 {
01590 RioErr << "Router ERROR: Tried to add disk already active "
01591 << Disk << endl;
01592 return;
01593 }
01594
01595 char name[1025];
01596 m_DiskMgr->GetDiskName( Disk, name );
01597
01598 RioStorageNodeInfo sn_info;
01599 m_DiskMgr->GetDiskStorageNodeInfo( Disk, &sn_info );
01600
01601 AddDiskEvent *it_event = new AddDiskEvent( sn_info.Hostname, name );
01602 m_SystemManager->PostITEvent( (MonitorEvent *) it_event );
01603
01604
01605 m_nRT[Disk] = 0;
01606 }
01607
01608
01609 void CRouter::SendStorage( StrDiskRequest *Request )
01610 {
01611 EventStorageRequest* event = NULL;
01612
01613 #ifdef RIO_DEBUG2
01614 int bufferid = ((EventDataRequest*)(Request->event))->Request.BufferId;
01615 RioErr << "Router:SendStorage => RouterId " << Request->Id
01616 << " BufferId " << bufferid
01617 << " Disk " << Request->Disk
01618 << " pending requests " << m_nPending[Request->Disk] << endl;
01619
01620 if( m_nPending[Request->Disk] > m_MaxPending )
01621 {
01622 RioErr << "Router:SendStorage => "
01623 << " Disk " << Request->Disk
01624 << " pending requests " << m_nPending[Request->Disk]
01625 << " RT " <<m_nRT[Request->Disk] << endl;
01626 }
01627 #endif
01628
01629 if( ( m_CollectMeasures ) && ( Request->event->Request.QueueSize != -1 ) )
01630 {
01631 struct timeval now;
01632 gettimeofday( &now, 0 );
01633 double sample_msec = getInterval( Request->event->Request.ArrivalTime,
01634 now );
01635 Request->event->Request.ArrivalTime = now;
01636
01637
01638 int disk = Request->event->Request.Reps[0].disk;
01639 int queuesize = Request->event->Request.QueueSize;
01640
01641 if( queuesize < m_MaxPending + 1 )
01642 queuesize = 0;
01643 else if( queuesize > ( m_MaxPending + 300 ))
01644 queuesize = 300;
01645 else
01646 queuesize = queuesize - m_MaxPending;
01647
01648 if( m_EstDiskQueueTime[ disk ][ queuesize ] == 0 )
01649 {
01650 m_EstDiskQueueTime[ disk ][ queuesize ] = sample_msec;
01651 }
01652 else
01653 {
01654 m_EstDiskQueueTime[ disk ][ queuesize ] =
01655 (( 1 - m_EstimatedParameter) *
01656 m_EstDiskQueueTime[ disk ][ queuesize ]) +
01657 m_EstimatedParameter * sample_msec;
01658 }
01659 if( m_EstDiskQueueTimeOfDisk[ disk ] == 0 )
01660 {
01661 m_EstDiskQueueTimeOfDisk[ disk ] = sample_msec;
01662 }
01663 else
01664 {
01665 m_EstDiskQueueTimeOfDisk[ disk ] = ( 1 - m_EstimatedParameter) *
01666 m_EstDiskQueueTimeOfDisk[ disk ] +
01667 m_EstimatedParameter * sample_msec;
01668 }
01669
01670 #ifdef __QUEUE_RESP_LOG
01671 m_logSQT << " disk " << disk
01672 << " sample " << sample_msec
01673 << " est " << m_EstDiskQueueTimeOfDisk[ disk ] << endl;
01674 #endif
01675 }
01676
01677 if( Request->event->Request.Operation == RealTimePrefetchBlock )
01678 {
01679 MsgRSSnewRequest* msg;
01680
01681 Request->event->Request.RequestCounter = 1;
01682
01683 event = (EventStorageRequest*) EventManager.New(
01684 EventTypeStorageRequest );
01685 msg = &( event->StorageRequest.Request.New );
01686 msg->Type = MSG_RSS_FETCH | RSS_INTERMEDIATE;
01687 msg->Size = SizeMsgRSSnewRequest;
01688 msg->Token = RSS_TOKEN_STORAGE;
01689
01690 msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid;
01691 msg->RouterId = Request->Id;
01692
01693 msg->IPaddr =
01694 ((EventDataRequest*)(Request->event))->Request.Target.IPaddress;
01695
01696 msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port;
01697
01698
01699
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712
01713
01714 msg->Disk = Request->Disk;
01715 msg->Pos = Request->Pos;
01716 msg->DataSize = Request->Size;
01717
01718 #ifdef RIO_DEBUG2
01719 RioErr << " prefetch RouterId "<< Request->Id
01720 << " ClientId "<< msg->ClientId
01721 << " msg-Type (F+Inter)" << msg->Type << endl;
01722 #endif
01723 }
01724 else if( Request->event->Request.Operation == RealTimeCancelBlock )
01725 {
01726 if( Request->event->Request.BufferId == -1 )
01727 {
01728 #ifdef RIO_DEBUG2
01729 RioErr << "request block was not submit to storage yet, "
01730 << " so just free request" << endl;
01731 #endif
01732
01733 m_nRT[Request->Disk]--;
01734 m_nPending[Request->Disk]--;
01735
01736
01737 Request->event->Request.Status = 0;
01738 RequestCompleted( &Request->event->Request.streamobj,
01739 ( Event* ) ( Request->event ) );
01740 char name[1025];
01741 m_DiskMgr->GetDiskName( Request->Disk, name );
01742
01743 RioStorageNodeInfo sn_info;
01744 m_DiskMgr->GetDiskStorageNodeInfo( Request->Disk, &sn_info );
01745
01746 int queue_sum = m_nRT[ Request->Disk ]
01747 + m_nNRT[ Request->Disk ]
01748 + m_nPending[ Request->Disk ];
01749
01750 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
01751 sn_info.Hostname, name, queue_sum );
01752 m_SystemManager->PostITEvent( (MonitorEvent *) it_event );
01753 return;
01754 }
01755 else
01756 {
01757 MsgRSSpendRequest* msg;
01758 event = (EventStorageRequest*) EventManager.New(
01759 EventTypeStorageRequest );
01760 msg = &( event->StorageRequest.Request.Pending );
01761 msg->Type = MSG_RSS_CANCEL;
01762 msg->Size = SizeMsgRSSpendRequest;
01763 msg->Token = RSS_TOKEN_STORAGE;
01764
01765 msg->ClientId = ((EventDataRequest*)(
01766 Request->event))->Request.Reqid;
01767
01768 msg->StorageId = Request->StorageId;
01769 msg->RouterId = Request->Id;
01770
01771 msg->IPaddr = ((EventDataRequest*)(
01772 Request->event))->Request.Target.IPaddress;
01773
01774 msg->Port = ((EventDataRequest*)(
01775 Request->event))->Request.Target.Port;
01776
01777
01778
01779
01780
01781
01782
01783 NATData input( msg->IPaddr, msg->Port );
01784 NATData result = m_NAT_map.getElement( input,
01785 (Request->Disk-1)/(SNode::sn_maxdisks)+1 );
01786 if( result.nat_addr != 0 )
01787 {
01788 msg->IPaddr = result.nat_addr;
01789 msg->Port = result.nat_port;
01790 }
01791
01792 #ifdef RIO_DEBUG2
01793 RioErr << " cancel RouterId " << Request->Id
01794 << " ClientId " << msg->ClientId
01795 << " StorageId " << msg->StorageId
01796 << " msg-Type C " << msg->Type << endl;
01797 #endif
01798 }
01799 }
01800 else if( Request->event->Request.Operation == RealTimeSendBlock )
01801 {
01802 MsgRSSpendRequest* msg;
01803 event = (EventStorageRequest*) EventManager.New(
01804 EventTypeStorageRequest );
01805 msg = &( event->StorageRequest.Request.Pending );
01806 msg->Type = MSG_RSS_SEND;
01807 msg->Size = SizeMsgRSSpendRequest;
01808 msg->Token = RSS_TOKEN_STORAGE;
01809
01810 msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid;
01811
01812 msg->StorageId = Request->StorageId;
01813 msg->RouterId = Request->Id;
01814
01815 msg->IPaddr = ((EventDataRequest*)(
01816 Request->event))->Request.Target.IPaddress;
01817
01818 msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port;
01819
01820
01821
01822
01823
01824
01825 NATData input( msg->IPaddr, msg->Port );
01826 NATData result = m_NAT_map.getElement( input,
01827 (Request->Disk-1)/(SNode::sn_maxdisks)+1 );
01828 if( result.nat_addr != 0 )
01829 {
01830 msg->IPaddr = result.nat_addr;
01831 msg->Port = result.nat_port;
01832 }
01833
01834
01835 msg->StreamTraffic = ((Event*)(Request->event))->Type;
01836
01837
01838 ((EventDataRequest*)(Request->event))->Request.rioobject->GetVideoRate( &msg->VideoRate );
01839
01840 #ifdef RIO_DEBUG2
01841 RioErr << " send RouterId " << Request->Id
01842 << " ClientId " << msg->ClientId
01843 << " StorageId " << msg->StorageId
01844 << " msg-Type S " << msg->Type << endl;
01845 #endif
01846 }
01847
01848 else
01849 {
01850 MsgRSSnewRequest* msg;
01851
01852 int storageid;
01853
01854
01855 unsigned int totalStorages;
01856
01857 event = (EventStorageRequest*)EventManager.New(EventTypeStorageRequest);
01858 msg = &( event->StorageRequest.Request.New );
01859 if( ( Request->event->Request.Operation == RealTimeRead ) ||
01860 ( Request->event->Request.Operation == NonRealTimeRead))
01861 msg->Type = MSG_RSS_READ;
01862 else
01863 msg->Type = MSG_RSS_WRITE;
01864
01865 msg->Size = SizeMsgRSSnewRequest;
01866 msg->Token = RSS_TOKEN_STORAGE;
01867 msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid;
01868 msg->RouterId = Request->Id;
01869 msg->IPaddr = ((EventDataRequest*)(
01870 Request->event))->Request.Target.IPaddress;
01871 msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port;
01872
01873
01874 storageid = ( Request->Disk - 1 )/( SNode::sn_maxdisks ) + 1;
01875
01876
01877
01878
01879 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
01880
01881
01882
01883 NATData input( msg->IPaddr, msg->Port );
01884
01885
01886
01887
01888
01889
01890
01891
01892
01893
01894
01895
01896 if( ( ( Request->event->Request.Operation == NonRealTimeRead ) ||
01897 ( Request->event->Request.Operation == NonRealTimeWrite ) ) &&
01898 ( FindNATMapping( input, storageid + totalStorages + 1 ) ) )
01899 {
01900 #ifdef RIO_DEBUG2
01901 RioErr << "CRouter::SendStorage usando o indice "
01902 << storageid + totalStorages + 1 << " ao inves do indice "
01903 << storageid << "." << endl;
01904 #endif
01905
01906 storageid = storageid + totalStorages + 1;
01907 }
01908 #ifdef RIO_DEBUG2
01909 else
01910 RioErr << "Router::SendStorage o novo indice "
01911 << storageid + totalStorages + 1 << " nao foi usado. "
01912 << "Request->event->Request.Operation = "
01913 << Request->event->Request.Operation << "." << endl;
01914 #endif
01915
01916
01917 NATData result = m_NAT_map.getElement( input, storageid );
01918 if( result.nat_addr != 0 )
01919 {
01920 msg->IPaddr = result.nat_addr;
01921 msg->Port = result.nat_port;
01922 }
01923 msg->Disk = Request->Disk;
01924 msg->Pos = Request->Pos;
01925 msg->DataSize = Request->Size;
01926
01927
01928 msg->StreamTraffic = ((Event*)(Request->event))->Type;
01929
01930
01931 ((EventDataRequest*)(Request->event))->Request.rioobject->GetVideoRate( &msg->VideoRate );
01932
01933 #ifdef RIO_DEBUG2
01934 RioErr << " read and send RouterId "<< Request->Id
01935 << " ClientId "<< msg->ClientId
01936 << " msg-Type RS|RW " << msg->Type << endl;
01937 #endif
01938 }
01939
01940 m_DiskMgr->SendStorageNode( event, u16 ( Request->Disk ));
01941 return;
01942 }
01943
01944
01945 long double CRouter::GetEstimatedDiskQueueTime( int disk )
01946 {
01947 return m_EstDiskQueueTimeOfDisk[disk];
01948 }
01949
01950
01951 void CRouter::PrintQueues()
01952 {
01953 int i;
01954 RioErr << "Queues:" << endl;
01955 for( i = 0; i < m_nDisks; i++ )
01956 {
01957 RioErr << " Disk " << i << ": "
01958 << m_nRT[i] << " "
01959 << m_nNRT[i] << " "
01960 << m_nPending[i] << endl;
01961
01962 }
01963 }
01964
01965
01966
01967 int CRouter::GetMaxNumberOfDisks()
01968 {
01969 return m_DiskMgr->GetMaxNumberOfDisks();
01970 }
01971
01972
01973
01974 int CRouter::GetNumberOfActiveDisks()
01975 {
01976 unsigned int numd;
01977 m_DiskMgr->GetNumberOfActiveDisks( &numd);
01978 return (int) numd;
01979 }
01980
01981
01982
01983 int CRouter::GetDiskServiceTime( EventStorageRequest* event, u16 DiskId )
01984 {
01985 int result;
01986 result = m_DiskMgr->GetDiskServiceTime( event, DiskId );
01987 if( result == -1 )
01988 {
01989 m_EstDiskServiceTimeOfDisk[ DiskId ] = -1;
01990 }
01991 return result;
01992 }
01993
01994
01995
01996 void CRouter::SetDiskServiceTime( int Disk,
01997 double EstServiceTimeOfDisk,
01998 double EstServiceTime[301] )
01999 {
02000 for(int j = 0; j < 301; j++)
02001 {
02002 m_EstDiskServiceTime[ Disk ][ j ] = EstServiceTime[ j ];
02003 }
02004
02005 m_EstDiskServiceTimeOfDisk[ Disk ] = EstServiceTimeOfDisk;
02006 pthread_mutex_lock(&m_MutexUpdated);
02007 m_UpdatedEstimatedDiskServiceTime++;
02008 pthread_cond_broadcast(&m_ConditionUpdated);
02009 pthread_mutex_unlock(&m_MutexUpdated);
02010
02011 return;
02012 }
02013
02014
02015
02016 void CRouter::UpdateDiskServiceTime( double EstimatedServiceTimeOfDisk[100],
02017 double EstimatedServiceTime[100][301] )
02018 {
02019 int num = GetMaxNumberOfDisks();
02020 int numd = GetNumberOfActiveDisks();
02021
02022
02023
02024 while( m_UpdatedEstimatedDiskServiceTime < numd )
02025 {
02026 pthread_mutex_lock( &m_MutexUpdated );
02027 pthread_cond_wait( &m_ConditionUpdated, &m_MutexUpdated );
02028 pthread_mutex_unlock( &m_MutexUpdated );
02029 }
02030
02031 for( int i = 1; i < num; i++ )
02032 {
02033 for(int j = 0; j < 301; j++)
02034 {
02035 EstimatedServiceTime[i][j] = m_EstDiskServiceTime[i][j];
02036 }
02037 EstimatedServiceTimeOfDisk[i] = m_EstDiskServiceTimeOfDisk[i];
02038 }
02039
02040 m_UpdatedEstimatedDiskServiceTime = 0;
02041 return;
02042 }
02043
02044 void CRouter::UpdateDiskResponseTime( double EstimatedResponseTimeOfDisk[100],
02045 double EstimatedResponseTime[100][301],
02046 double DevResponseTimeOfDisk[100],
02047 double DevResponseTime[100][301] )
02048 {
02049 int num = GetMaxNumberOfDisks();
02050
02051 for( int i = 1; i < num; i++ )
02052 {
02053 for(int j = 0; j < 301; j++)
02054 {
02055 if( m_EstDiskServiceTime[i][j] > m_EstDiskResponseTime[i][j] )
02056
02057 m_EstDiskResponseTime[i][j] = m_EstDiskServiceTime[i][j];
02058
02059 EstimatedResponseTime[i][j] = m_EstDiskResponseTime[i][j];
02060 DevResponseTime[i][j] = m_DevDiskResponseTime[i][j];
02061 }
02062 EstimatedResponseTimeOfDisk[i] = m_EstDiskResponseTimeOfDisk[i];
02063 DevResponseTimeOfDisk[i] = m_DevDiskResponseTimeOfDisk[i];
02064 }
02065
02066 return;
02067 }
02068
02069
02070
02071
02072 void CRouter::UpdateDiskQueueTime( double EstimatedQueueTimeOfDisk[100],
02073 double EstimatedQueueTime[100][301])
02074 {
02075 int num = GetMaxNumberOfDisks();
02076
02077 for( int i = 1; i < num; i++ )
02078 {
02079 for(int j = 0; j < 301; j++)
02080 {
02081 EstimatedQueueTime[i][j] = m_EstDiskQueueTime[i][j];
02082 }
02083 EstimatedQueueTimeOfDisk[i] = m_EstDiskQueueTimeOfDisk[i];
02084 }
02085
02086 return;
02087 }
02088
02089
02090
02091
02092 void CRouter::Put( Event *ep )
02093 {
02094 EventDataRequest* event;
02095 event = ( EventDataRequest *) ep;
02096
02097 if( ( ( ep->Type==EventTypeRTDataRequest ) ||
02098 ( ep->Type==EventTypeNRTDataRequest )
02099 ) && ( m_CollectMeasures )
02100 )
02101 {
02102 gettimeofday( &event->Request.ArrivalTime, 0 );
02103 }
02104
02105 m_Queue.Put(ep);
02106 }
02107
02108
02109
02110
02111 void CRouter::SetNATMapping( NATData server_map, int stor_id, NATData stor_map )
02112 {
02113 m_NAT_map.insertElement( server_map, stor_id, stor_map );
02114 }
02115
02116
02117
02118 void CRouter::RemoveNATMapping( NATData server_map, int stor_id )
02119 {
02120 m_NAT_map.removeElement( server_map, stor_id );
02121 }
02122
02123
02124
02125
02126 NATData CRouter::GetNATMapping( NATData server_map, int stor_id )
02127 {
02128 return m_NAT_map.getElement( server_map, stor_id );
02129 }
02130
02131
02132 #ifdef RIO_DEBUG2
02133 void CRouter::PrintNATMapping()
02134 {
02135 m_NAT_map.printMapping();
02136 }
02137 #endif
02138
02139
02140
02141
02142 bool CRouter::FindNATMapping( NATData server_map, int stor_id )
02143 {
02144 return m_NAT_map.findElement( server_map, stor_id );
02145 }
02146
02147
02148
02149
02150
02151
02152
02153
02154
02155
02156
02157 void CRouter::StorageDown( int StorageId, bool EmptyStorageQueue )
02158 {
02159 StrDiskRequest *DiskRequest;
02160 RioStorageNodeInfo NodeInfo;
02161 RioStorageNodeInfo sn_info;
02162 char name[1025];
02163 int FirstDiskId;
02164 bool EmptyQueues;
02165 bool IsPending;
02166
02167 pthread_mutex_lock( &m_StoragesStatusMutex );
02168
02169 m_StoragesStatus = m_StoragesStatus | ( 1ull << StorageId );
02170
02171 pthread_mutex_unlock( &m_StoragesStatusMutex );
02172 if( EmptyStorageQueue )
02173 {
02174
02175 m_DiskMgr->GetStorageNodeInfo( StorageId, &NodeInfo );
02176
02177
02178
02179
02180
02181 FirstDiskId = StorageId * SNode::sn_maxdisks + 1;
02182
02183
02184 for( int dk = FirstDiskId; dk < FirstDiskId + NodeInfo.NumberOfDisks;
02185 dk++ )
02186 {
02187 EmptyQueues = true;
02188
02189
02190 while( EmptyQueues )
02191 {
02192
02193
02194
02195 if( m_nPending[ dk ] > 0 )
02196 {
02197 DiskRequest = m_Pending[ dk ].Get();
02198
02199 #ifdef RIO_DEBUG2
02200 RioErr << "CRouter::StorageDown removendo a requisicao "
02201 << DiskRequest << " de m_Pending. m_Pending[ " << dk
02202 << " ] = " << m_nPending[ dk ] << endl;
02203 #endif
02204
02205 m_nPending[ dk ]--;
02206
02207
02208
02209 DataRequest& Request = DiskRequest->event->Request;
02210 if( ( Request.Operation == NonRealTimeWrite ) ||
02211 ( Request.Operation == NonRealTimeRead ) )
02212 m_nNRT[dk]--;
02213 else
02214 m_nRT[dk]--;
02215 IsPending = true;
02216 }
02217 else if( m_nRT[ dk ] > 0 )
02218 {
02219 DiskRequest = m_RTqueue[ dk ].Get();
02220
02221 #ifdef RIO_DEBUG2
02222 RioErr << "CRouter::StorageDown removendo a requisicao "
02223 << DiskRequest << " de m_RTqueue. m_nRT[ " << dk
02224 << " ] = " << m_nRT[ dk ] << endl;
02225 #endif
02226
02227 m_nRT[ dk ]--;
02228 IsPending = false;
02229 }
02230 else if( m_nNRT[ dk ] > 0 )
02231 {
02232 DiskRequest = m_NRTqueue[ dk ].Get();
02233
02234 #ifdef RIO_DEBUG2
02235 RioErr << "CRouter::StorageDown removendo a requisicao "
02236 << DiskRequest << " de m_NRTqueue. m_nNRT[ " << dk
02237 << " ] = " << m_nNRT[ dk ] << endl;
02238 #endif
02239
02240 m_nNRT[ dk ]--;
02241 IsPending = false;
02242 }
02243 else
02244 EmptyQueues = false;
02245
02246
02247
02248 if( ( EmptyQueues ) && ( DiskRequest != NULL ) )
02249 {
02250
02251
02252 DataRequest& Request = DiskRequest->event->Request;
02253 if( ( IsPending ) ||
02254 ( ( Request.Operation != RealTimeRead ) &&
02255 ( Request.Operation != RealTimePrefetchBlock ) ) )
02256 {
02257
02258
02259 #ifdef RIO_DEBUG2
02260 RioErr << "CRouter::StorageDown removendo uma "
02261 << "solicitacao do disco " << dk << " do "
02262 << "servidor de armazenamento " << StorageId
02263 << endl;
02264 #endif
02265
02266
02267
02268
02269
02270 if( ( Request.Operation == RealTimeCancelBlock ) &&
02271 ( Request.BufferId == -1 ) )
02272 Request.Status = 0;
02273 else
02274 Request.Status = ERROR_ROUTER +
02275 ERROR_SERVICE_TEMPORARY_UNAVAILABLE;
02276 RequestCompleted( &Request.streamobj, ( Event * )
02277 DiskRequest->event );
02278
02279
02280 Request.RequestCancelled = true;
02281 }
02282 else
02283 {
02284
02285 #ifdef RIO_DEBUG2
02286 RioErr << "CRouter::StorageDown Reenviando o evento "
02287 << "com a requisicao para o Router" << endl;
02288 #endif
02289
02290 Put( ( Event *) DiskRequest->event );
02291
02292 }
02293
02294 m_Request.Free( DiskRequest );
02295 }
02296 else if( ( EmptyQueues ) && ( DiskRequest == NULL ) )
02297 RioErr << "CRouter::StorageDown aviso DiskRequest == NULL"
02298 << endl;
02299 }
02300
02301
02302
02303 m_DiskMgr->GetDiskName( dk, name );
02304 m_DiskMgr->GetDiskStorageNodeInfo( dk, &sn_info );
02305 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
02306 sn_info.Hostname, name, 0 );
02307 m_SystemManager->PostITEvent( ( MonitorEvent * ) it_event );
02308 }
02309 }
02310 }
02311
02312
02313
02314 void CRouter::StorageUp( int StorageId )
02315 {
02316
02317 pthread_mutex_lock( &m_StoragesStatusMutex );
02318
02319
02320
02321
02322 m_SystemManager->InvalidateStorageNatMappings( StorageId + 1 );
02323
02324 m_StoragesStatus = m_StoragesStatus & ( ~( 1ull << StorageId ) );
02325
02326 pthread_mutex_unlock( &m_StoragesStatusMutex );
02327 }
02328
02329
02330
02331
02332
02333 bool CRouter::CheckStorageStatus( int DiskId )
02334 {
02335 bool Status;
02336 int StorageId;
02337
02338 if( DiskId >= 0 )
02339 {
02340
02341 StorageId = ( DiskId - 1 ) / SNode::sn_maxdisks;
02342
02343 pthread_mutex_lock( &m_StoragesStatusMutex );
02344
02345
02346 Status = ( ( m_StoragesStatus & ( 1ull << StorageId ) ) == 0 );
02347
02348 pthread_mutex_unlock( &m_StoragesStatusMutex );
02349 }
02350 else
02351 Status = false;
02352 return Status;
02353 }
02354
02355
02356
02357
02358 unsigned long long int CRouter::GetInvalidStorages( NATData ClientAddr )
02359 {
02360 unsigned long long int InvalidStorages;
02361 unsigned int TotalStorages;
02362 NATData StorageAddr;
02363
02364
02365 m_DiskMgr->GetNumberOfStorageNodes( &TotalStorages );
02366
02367
02368 pthread_mutex_lock( &m_StoragesStatusMutex );
02369
02370
02371 InvalidStorages = m_StoragesStatus;
02372
02373 pthread_mutex_unlock( &m_StoragesStatusMutex );
02374
02375
02376 if( ( ClientAddr.nat_addr != 0 ) && ( ClientAddr.nat_port != 0 ) )
02377 {
02378 for( unsigned int Storage = 0; Storage < TotalStorages; Storage++ )
02379 {
02380
02381
02382
02383
02384 StorageAddr = GetNATMapping( ClientAddr, Storage + 1 );
02385 if( ( StorageAddr.nat_addr == -1 ) &&
02386 ( StorageAddr.nat_port == 0 ) )
02387 {
02388
02389 #ifdef RIO_DEBUG2
02390 RioErr << "CRouter::GetInvalidStorages o servidor de "
02391 << "armazenamento com a ID " << Storage << " nao pode "
02392 << "ser usado pelo cliente pois seu mapeamento foi "
02393 << "invalidado." << endl;
02394 #endif
02395
02396 InvalidStorages = InvalidStorages | ( 1ull << Storage );
02397 }
02398
02399
02400
02401
02402
02403
02404
02405
02406 if( FindNATMapping( ClientAddr, Storage + TotalStorages + 1 ) )
02407 {
02408 StorageAddr = GetNATMapping( ClientAddr, Storage +
02409 TotalStorages + 1 );
02410 if( ( StorageAddr.nat_addr == -1 ) &&
02411 ( StorageAddr.nat_port == 0 ) )
02412 {
02413
02414 #ifdef RIO_DEBUG2
02415 RioErr << "CRouter::GetInvalidStorages o servidor de "
02416 << "armazenamento com a ID " << Storage << " nao "
02417 << "pode ser usado pelo cliente pois seu mapeamento "
02418 << " foi invalidado (na nova implementacao)."
02419 << endl;
02420 #endif
02421
02422 InvalidStorages = InvalidStorages | ( 1ull << Storage );
02423 }
02424 }
02425 #ifdef RIO_DEBUG2
02426 else
02427 RioErr << "CRouter::GetInvalidStorages a ID "
02428 << Storage + TotalStorages + 1 << " nao existe no mapa e "
02429 << "nao foi considerada!" << endl;
02430 #endif
02431 }
02432 }
02433 #ifdef RIO_DEBUG2
02434
02435 RioErr << "CRouter::GetInvalidStorages Vetor com a identificacao dos "
02436 << "servidores de armazenamento que nao podem ser usados: ";
02437 for( int p = MAXNUMSTORAGES - 1; p >= 0 ; p-- )
02438 {
02439 unsigned int q = ( InvalidStorages >> p ) & 1ull;
02440 RioErr << q;
02441 }
02442 RioErr << endl;
02443 #endif
02444
02445 return InvalidStorages;
02446 }
02447
02448
02449
02450 void CRouter::GetStorageInfo( NATData ClientAddr, bool *CanRead,
02451 bool *CanWrite )
02452 {
02453 unsigned long long int InvalidStorages;
02454 unsigned int TotalStorages;
02455 int InactiveStorages, ActiveStorages;
02456 int NumberOfReplications;
02457
02458
02459 m_DiskMgr->GetNumberOfStorageNodes( &TotalStorages );
02460
02461 NumberOfReplications = m_DiskMgr->GetNumberOfReplications();
02462
02463
02464 InvalidStorages = GetInvalidStorages( ClientAddr );
02465
02466 InactiveStorages = 0;
02467 for( unsigned int i = 0; i < MAXNUMSTORAGES; i++ )
02468 if( ( InvalidStorages & ( 1ull << i ) ) != 0 )
02469 ( InactiveStorages )++;
02470
02471
02472 ActiveStorages = TotalStorages - InactiveStorages;
02473
02474
02475 *CanRead = ( InactiveStorages < NumberOfReplications );
02476
02477
02478
02479
02480 *CanWrite = ( ActiveStorages >= NumberOfReplications );
02481 #ifdef RIO_DEBUG2
02482 RioErr << "RioSession::GetStorageInfo estado dos servidores de "
02483 << "armazenamento ao receber um pedido de dados : ativos = "
02484 << ActiveStorages << ", inativos = " << InactiveStorages
02485 << ", totais = " << TotalStorages << ", replicacoes = "
02486 << NumberOfReplications << ", CanRead = "
02487 << ( *CanRead ? "true": "false" ) << ", CanWrite = "
02488 << ( *CanWrite ? "true": "false" ) << endl;
02489 #endif
02490 }
02491
02492
02493 int CRouter::GetNumberOfStorageNodes( unsigned int *NumberOfStorageNodes )
02494 {
02495 return m_DiskMgr->GetNumberOfStorageNodes( NumberOfStorageNodes );
02496 }
02497
02498
02499
02500
02501 void CRouter::RequestCompleted( RioStreamObj **StreamObjAddr, Event *event )
02502 {
02503
02504
02505 bool RemoveStreamObj;
02506
02507 if( ( *StreamObjAddr ) != NULL )
02508 {
02509 ( *StreamObjAddr )->RequestCompleted( event, &RemoveStreamObj );
02510 if( RemoveStreamObj )
02511 {
02512
02513
02514
02515
02516
02517
02518
02519
02520
02521
02522 delete *StreamObjAddr;
02523 *StreamObjAddr = NULL;
02524 }
02525 }
02526 else
02527 {
02528 RioErr << "[CRouter] RequestCompleted nao foi possivel executar a "
02529 << "funcao porque foi passado um ponteiro NULO!" << endl;
02530 }
02531 }
02532
02533
02534 double getInterval( struct timeval initial, struct timeval final )
02535 {
02536 double msec;
02537 struct timeval interval;
02538 interval.tv_sec = final.tv_sec - initial.tv_sec;
02539 interval.tv_usec = final.tv_usec - initial.tv_usec;
02540
02541 if( interval.tv_usec < 0 )
02542 {
02543 interval.tv_sec -= 1;
02544 interval.tv_usec += 1000000;
02545 }
02546
02547 msec = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0;
02548
02549 return msec;
02550 }
02551
02552