#include <Router.h>
Public Member Functions | |
CRouter () | |
~CRouter () | |
int | Initialize (RouterConfig *Config) |
int | Start () |
int | Stop () |
void | Put (Event *ep) |
int | GetMaxNumberOfDisks () |
int | GetNumberOfActiveDisks () |
int | GetDiskServiceTime (EventStorageRequest *event, u16 DiskId) |
void | SetDiskServiceTime (int Disk, double EstServiceTimeOfDisk, double EstServiceTime[301]) |
void | UpdateDiskServiceTime (double EstServiceTimeOfDisk[100], double EstServiceTime[100][301]) |
void | UpdateDiskResponseTime (double EstResponseTimeOfDisk[100], double EstResponseTime[100][301], double DevDiskResponseTimeOfDisk[100], double DevDiskResponseTime[100][301]) |
void | UpdateDiskQueueTime (double EstQueueTimeOfDisk[100], double EstQueueTime[100][301]) |
void | UpdateMeasures (int disk, int nThreads, struct timeval initial_time, struct timeval final_time) |
long double | GetEstimatedQueueTimeOfAllDisks () |
long double | GetEstimatedDiskQueueTime (int disk) |
void | SetNATMapping (NATData server_map, int stor_id, NATData stor_map) |
NATData | GetNATMapping (NATData server_map, int stor_id) |
Funcao para obter um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho. | |
void | RemoveNATMapping (NATData server_map, int stor_id) |
Funcao para remover um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho. | |
bool | FindNATMapping (NATData server_map, int stor_id) |
Funcao para verificar se um mapeamento esta presente no mapa com os mapeamentos. | |
unsigned long long int | GetInvalidStorages (NATData ClientAddr) |
Deternima o vetor de bits com os servidores de armazenamento que nao podem ser usados pelo cliente com endereco passado como parametro, isto e, que estao inativos ou que foram invalidados (apos se tornarem novamente ativos). | |
void | GetStorageInfo (NATData ClientAddr, bool *CanRead, bool *CanWrite) |
Deternima se podemos ler ou escrever blocos, no nomento, para um dado cliente cujo endereco e passado como parametro. | |
int | GetNumberOfStorageNodes (unsigned int *NumberOfStorageNodes) |
Funcao para obter o numero de storage nodes. | |
void | RequestCompleted (RioStreamObj **StreamObjAddr, Event *event) |
Funcao para completar uma requisicao (chamando a RequestCompleted do objeto RioStreamObj passado como parametro) removendo, quando necessario, o objeto do stream e alterando o ponteiro passado como parametro para NULL. | |
Data Fields | |
NATMap | m_NAT_map |
Private Member Functions | |
void | StorageReply (MsgRSSstorage *Msg) |
void | AddDisk (int Disk) |
void | SendStorage (StrDiskRequest *Request) |
void | CleanUp () |
Event * | Get () |
void | routerthread () |
void | DataReq (Event *ep) |
void | DataPrefetch (Event *event) |
void | DataSendToClient (Event *event) |
void | DataCancel (Event *event) |
void | PrintQueues () |
void | StorageDown (int StorageId, bool EmptyStorageQueue) |
Funcao para definir que um servidor de armazenamento esta temporariamente indisponivel. | |
void | StorageUp (int StorageId) |
Funcao para definir que um servidor de armazenamento voltou a estar disponivel. | |
bool | CheckStorageStatus (int DiskId) |
Funcao usada para verificar se o servidor de armazenamento com uma dada ID esta ou nao parado. | |
Static Private Member Functions | |
static void * | routerthreadep (void *param) |
Private Attributes | |
double | m_EstDiskServiceTime [100][301] |
double | m_EstDiskServiceTimeOfDisk [100] |
double | m_EstDiskResponseTime [100][301] |
double | m_EstDiskResponseTimeOfDisk [100] |
double | m_DevDiskResponseTime [100][301] |
double | m_DevDiskResponseTimeOfDisk [100] |
double | m_EstDiskQueueTime [100][301] |
double | m_EstDiskQueueTimeOfDisk [100] |
int | m_UpdatedEstimatedDiskServiceTime |
bool | m_CollectMeasures |
double | m_EstimatedParameter |
pthread_mutex_t | m_MutexUpdated |
pthread_cond_t | m_ConditionUpdated |
ofstream | m_logSQT |
ofstream | m_logRESP |
pthread_t | m_thread |
bool | m_initialized |
bool | m_started |
DiskMgr * | m_DiskMgr |
CSystemManager * | m_SystemManager |
EventQueue | m_Queue |
int | m_nDisks |
int | m_BlockSize |
int | m_MaxPending |
int | m_MaxQueueSize |
CDiskRequestQueue * | m_RTqueue |
CDiskRequestQueue * | m_NRTqueue |
CDiskRequestQueue * | m_Pending |
int * | m_nRT |
int * | m_nNRT |
int * | m_nPending |
CDiskRequestManager | m_Request |
StrDiskRequest ** | m_TempDiskRequest |
unsigned long long int | m_StoragesStatus |
pthread_mutex_t | m_StoragesStatusMutex |
Definition at line 55 of file Router.h.
CRouter::CRouter | ( | ) |
Definition at line 68 of file Router.cpp.
00069 { 00070 m_UpdatedEstimatedDiskServiceTime = 0; 00071 pthread_mutex_init( &m_MutexUpdated, NULL ); 00072 pthread_cond_init( &m_ConditionUpdated, NULL ); 00073 // m_logSQT does not need to be initialized 00074 // m_logRESP does not need to be initialized 00075 // m_Queue does not need to be initialized 00076 m_thread = 0; 00077 00078 m_initialized = false; 00079 m_started = false; 00080 m_DiskMgr = NULL; 00081 m_SystemManager = NULL; 00082 m_nDisks = 0; 00083 m_BlockSize = 0; 00084 m_MaxPending = 0; 00085 m_MaxQueueSize = 0; 00086 00087 m_RTqueue = NULL; 00088 m_NRTqueue = NULL; 00089 m_Pending = NULL; 00090 m_nRT = NULL; 00091 m_nNRT = NULL; 00092 m_nPending = NULL; 00093 00094 // m_Request does not need to be initialized 00095 m_TempDiskRequest = NULL; 00096 00097 m_EstimatedParameter = 0; 00098 for( int i = 0; i < 100; i++ ) 00099 { 00100 for( int j = 0; j < 301; j++ ) 00101 { 00102 // according number of active threads 00103 m_EstDiskServiceTime [ i ][ j ] = 0; 00104 m_EstDiskResponseTime[ i ][ j ] = 0; 00105 m_DevDiskResponseTime[ i ][ j ] = 0; 00106 00107 // according queue size 00108 m_EstDiskQueueTime[ i ][ j ] = 0; 00109 } 00110 // for each disk - using all requests 00111 m_EstDiskQueueTimeOfDisk [ i ] = 0; 00112 m_EstDiskServiceTimeOfDisk [ i ] = 0; 00113 m_EstDiskResponseTimeOfDisk[ i ] = 0; 00114 m_DevDiskResponseTimeOfDisk[ i ] = 0; 00115 } 00116 m_CollectMeasures = false; 00117 // ------------------------------------------------------------------------ 00118 00119 // Inicializa a variavel que define os servidores de armazenamento inativos 00120 // com 0 (pois inicialmente todos os servidores estarao ativos apos as 00121 // conexoes forem estabelecidas). 00122 m_StoragesStatus = 0; 00123 // Inicializa o mutex para garantir o acesso exclusivo a variavel 00124 // m_StoragesStatus. 00125 pthread_mutex_init( &m_StoragesStatusMutex, NULL ); 00126 }
CRouter::~CRouter | ( | ) |
Definition at line 129 of file Router.cpp.
00130 { 00131 CleanUp(); 00132 00133 // Remove o mutex para garantir o acesso exclusivo a variavel 00134 // m_StoragesStatus. 00135 pthread_mutex_destroy( &m_StoragesStatusMutex ); 00136 }
void CRouter::AddDisk | ( | int | Disk | ) | [private] |
Definition at line 1579 of file Router.cpp.
01580 { 01581 RioErr << " Router: Adding disk: " << Disk << endl; 01582 if( ( Disk < 0 ) || ( Disk >= m_nDisks ) ) 01583 { 01584 RioErr << "Router ERROR: Tried to add invalid disk " << Disk << endl; 01585 return; 01586 } 01587 01588 if( m_nRT[Disk] != ( m_MaxQueueSize + 1 ) ) 01589 { 01590 RioErr << "Router ERROR: Tried to add disk already active " 01591 << Disk << endl; 01592 return; 01593 } 01594 01595 char name[1025]; 01596 m_DiskMgr->GetDiskName( Disk, name ); 01597 01598 RioStorageNodeInfo sn_info; 01599 m_DiskMgr->GetDiskStorageNodeInfo( Disk, &sn_info ); 01600 01601 AddDiskEvent *it_event = new AddDiskEvent( sn_info.Hostname, name ); 01602 m_SystemManager->PostITEvent( (MonitorEvent *) it_event ); 01603 01604 // Reset disk queue size (now active) 01605 m_nRT[Disk] = 0; 01606 }
bool CRouter::CheckStorageStatus | ( | int | DiskId | ) | [private] |
Funcao usada para verificar se o servidor de armazenamento com uma dada ID esta ou nao parado.
DiskId | identificador do disco do servidor de armazenamento. |
Definition at line 2333 of file Router.cpp.
02334 { 02335 bool Status; 02336 int StorageId; 02337 // Verifica se a DiskId e valida 02338 if( DiskId >= 0 ) 02339 { 02340 // Descobre o identificador do servidor de armazenamento. 02341 StorageId = ( DiskId - 1 ) / SNode::sn_maxdisks; 02342 // Obtem acesso exclusivo a variavel m_StoragesStatus. 02343 pthread_mutex_lock( &m_StoragesStatusMutex ); 02344 // Verifica se o servidor esta ativo (se for o caso, o seu bit deve ser 02345 // igual a 0). 02346 Status = ( ( m_StoragesStatus & ( 1ull << StorageId ) ) == 0 ); 02347 // Libera o acesso exclusivo a variavel m_StoragesStatus. 02348 pthread_mutex_unlock( &m_StoragesStatusMutex ); 02349 } 02350 else 02351 Status = false; 02352 return Status; 02353 }
void CRouter::CleanUp | ( | ) | [private] |
Definition at line 139 of file Router.cpp.
00140 { 00141 m_initialized = false; 00142 m_started = false; 00143 00144 m_DiskMgr = 0; 00145 m_nDisks = 0; 00146 m_BlockSize = 0; 00147 m_MaxPending = 0; 00148 m_MaxQueueSize = 0; 00149 00150 //Pra tese: Verificar estes if's abaixo. Estao muito estranhos!!! 00151 //Nao deveriam ser do tipo if( x != 0 ) em vez de if(x == 0)?!?!? 00152 //Obs: Corrigido, pois existiam erros de memoria. 00153 if( m_RTqueue != 0 ) 00154 { 00155 delete[] m_RTqueue; 00156 m_RTqueue = 0; 00157 } 00158 00159 if( m_NRTqueue != 0 ) 00160 { 00161 delete[] m_NRTqueue; 00162 m_NRTqueue = 0; 00163 } 00164 00165 if( m_Pending != 0 ) 00166 { 00167 delete[] m_Pending; 00168 m_Pending = 0; 00169 } 00170 00171 if( m_nRT != 0 ) 00172 { 00173 delete[] m_nRT; 00174 m_nRT = 0; 00175 } 00176 00177 if( m_nNRT != 0 ) 00178 { 00179 delete[] m_nNRT; 00180 m_nNRT = 0; 00181 } 00182 00183 if( m_nPending != 0 ) 00184 { 00185 delete[] m_nPending; 00186 m_nPending = 0; 00187 } 00188 00189 if( m_TempDiskRequest != 0 ) 00190 { 00191 delete[] m_TempDiskRequest; 00192 m_TempDiskRequest = 0; 00193 } 00194 }
void CRouter::DataCancel | ( | Event * | event | ) | [private] |
Definition at line 739 of file Router.cpp.
00740 { 00741 DataRequest& Request = (( EventDataRequest* )( event ))->Request; 00742 00743 #ifdef RIO_DEBUG2 00744 RioErr << "Router:DataCancel => block "<< Request.Block 00745 << " BufferId " << Request.BufferId << endl; 00746 #endif 00747 00748 if( Request.RequestCounter == -1 ) 00749 { 00750 #ifdef RIO_DEBUG2 00751 RioErr << "Router:DataCancel => did not request this block " 00752 << "yet, so just free it" << endl; 00753 #endif 00754 00755 //the request was not submit to storage yet, so just free request 00756 Request.Status = 0; 00757 RequestCompleted( &Request.streamobj, event ); 00758 } 00759 else 00760 { 00761 StrDiskRequest* m_TempDiskRequest = m_Request.Get( Request.BufferId ); 00762 00763 if( m_TempDiskRequest ) 00764 { 00765 if( ( m_TempDiskRequest->StorageId != 0 ) && 00766 ( CheckStorageStatus( m_TempDiskRequest->Disk ) ) ) 00767 { 00768 if( m_CollectMeasures ) 00769 { 00770 // update queue size to get estimated queue time 00771 Request.QueueSize = m_nRT[m_TempDiskRequest->Disk]; 00772 } 00773 00774 if( m_nPending[m_TempDiskRequest->Disk] < m_MaxPending ) 00775 { 00776 m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest ); 00777 // Update disk queue sizes 00778 m_nPending[m_TempDiskRequest->Disk]++; 00779 m_nRT[m_TempDiskRequest->Disk]++; 00780 SendStorage(m_TempDiskRequest); 00781 } 00782 // otherwise put request on disk queue 00783 else 00784 { 00785 m_RTqueue[m_TempDiskRequest->Disk].Put( m_TempDiskRequest ); 00786 m_nRT[m_TempDiskRequest->Disk]++; 00787 } 00788 char name[1025]; 00789 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name ); 00790 00791 RioStorageNodeInfo sn_info; 00792 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info ); 00793 00794 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ] 00795 + m_nNRT[ m_TempDiskRequest->Disk ] 00796 + m_nPending[ m_TempDiskRequest->Disk ]; 00797 00798 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( 00799 sn_info.Hostname, name, queue_sum ); 00800 m_SystemManager->PostITEvent( (MonitorEvent *)it_event ); 00801 } 00802 else 00803 { 00804 RioErr << " Router:Datacancel => StorageId == 0 " 00805 << "(RequestCompleted) or storage is down" << endl; 00806 m_Request.Free(m_TempDiskRequest); 00807 RequestCompleted( &Request.streamobj, event ); 00808 } 00809 } 00810 else if ( m_TempDiskRequest->StorageId == 0 ) 00811 { 00812 RioErr << "ERROR: Router:Datacancel => Could not find resquest to cancel" 00813 << endl; 00814 } 00815 else // Novo erro indicando que o servidor de armazenamento com o disco 00816 // esta temporariamente indisponivel. 00817 { 00818 RioErr << "ERROR: Router:Datacancel => Could not send message to " 00819 << "storage to cancel the resquest" << endl; 00820 } 00821 } 00822 }
void CRouter::DataPrefetch | ( | Event * | event | ) | [private] |
Definition at line 487 of file Router.cpp.
00488 { 00489 NATData ClientAddr, ServerAddr; 00490 int storageid; 00491 // 00492 // TODO: Remover a variavel seguir quando a RioNeti e a NetMgr nao forem 00493 // mais usadas. 00494 // 00495 unsigned totalStorages; 00496 00497 DataRequest& Request = (( EventDataRequest* )( event ))->Request; 00498 StrDiskRequest* m_TempDiskRequest = m_Request.New(); 00499 00500 // Obtem o par IP, porta do cliente 00501 ClientAddr.nat_addr = Request.Target.IPaddress; 00502 ClientAddr.nat_port = Request.Target.Port; 00503 00504 if( m_TempDiskRequest == 0 ) 00505 { 00506 RioErr << "Router ERROR: Too many disk requests (UNEXPECTED)" << endl; 00507 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED; 00508 RequestCompleted( &Request.streamobj, event ); 00509 return; 00510 } 00511 00512 if( !CheckStorageStatus( m_TempDiskRequest->Disk ) ) 00513 { 00514 RioErr << "Router ERROR (DataPrefetch): Cannot access disk: storage " 00515 << "is down" << endl; 00516 Request.Status = ERROR_ROUTER + ERROR_SERVICE_TEMPORARY_UNAVAILABLE; 00517 RequestCompleted( &Request.streamobj, event ); 00518 return; 00519 } 00520 00521 // Descobre o numero de storages. 00522 // 00523 // TODO: Remover o comando seguir quando a RioNeti e a NetMgr nao forem mais 00524 // usadas. 00525 // 00526 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages ); 00527 00528 // Descobre a id do storage. 00529 storageid = ( m_TempDiskRequest->Disk - 1 ) / SNode::sn_maxdisks + 1; 00530 // Ajusta a id para as copias em tempo nao real, caso existam pares 00531 // IP, porta associados a nova implementacao de transmissao de dados. 00532 // (estes pares comecarao a partir da posicao do ultimo storage no vetor 00533 // mais 1, e o numero de pares sera o numero de storages. 00534 // 00535 // TODO: Remover todo o codigo dentro do if a seguir, incluindo o 00536 // comando if, quando a RioNeti e a NetMgr nao forem mais usadas. 00537 // 00538 if( ( ( Request.Operation == NonRealTimeRead ) || 00539 ( Request.Operation == NonRealTimeWrite ) ) && 00540 ( FindNATMapping( ClientAddr, storageid + totalStorages + 1 ) ) ) 00541 { 00542 #ifdef RIO_DEBUG2 00543 RioErr << "Router:DataPrefetch usando o indice " 00544 << storageid + totalStorages + 1 << " ao inves do indice " 00545 << storageid << "." << endl; 00546 #endif 00547 00548 storageid = storageid + totalStorages + 1; 00549 } 00550 #ifdef RIO_DEBUG2 00551 else 00552 RioErr << "Router:DataPrefetch o novo indice " 00553 << storageid + totalStorages + 1 << " nao foi usado. " 00554 << "Request.Operation = " << Request.Operation << "." << endl; 00555 #endif 00556 00557 00558 // Verifica se o disco deste servidor de armazenamento pode ser usado pelo 00559 // cliente. 00560 ServerAddr = GetNATMapping( ClientAddr, storageid ); 00561 if( ( ServerAddr.nat_addr == 0 ) && ( ServerAddr.nat_port == 0 ) ) 00562 { 00563 RioErr << "Router ERROR (DataPrefetch): Cannot access disk: storage " 00564 << "is disabled for this client" << endl; 00565 Request.Status = ERROR_ROUTER + ERROR_SERVICE_TEMPORARY_UNAVAILABLE; 00566 RequestCompleted( &Request.streamobj, event ); 00567 return; 00568 } 00569 00570 00571 #ifdef RIO_DEBUG2 00572 RioErr << "Router:DataPrefetch => block " << Request.Block 00573 << " disk " << Request.Reps[0].disk << endl; 00574 #endif 00575 00576 m_TempDiskRequest->Disk = Request.Reps[0].disk; 00577 m_TempDiskRequest->Pos = (((u64) Request.Reps[0].block ) * m_BlockSize ); 00578 m_TempDiskRequest->StorageId = 0xffffffff; // Invalid storage id 00579 m_TempDiskRequest->Size = m_BlockSize; 00580 m_TempDiskRequest->event = ( EventDataRequest* ) event; 00581 00582 if( m_CollectMeasures ) 00583 { 00584 // update queue size to get estimated queue time 00585 Request.QueueSize = m_nRT[m_TempDiskRequest->Disk]; 00586 } 00587 if( m_nPending[m_TempDiskRequest->Disk] < m_MaxPending ) 00588 { 00589 m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest ); 00590 // Update disk queue sizes 00591 m_nPending[m_TempDiskRequest->Disk]++; 00592 m_nRT[m_TempDiskRequest->Disk]++; 00593 SendStorage(m_TempDiskRequest); 00594 } 00595 // otherwise put request on disk queue 00596 else 00597 { 00598 m_RTqueue[m_TempDiskRequest->Disk].Put( m_TempDiskRequest ); 00599 m_nRT[m_TempDiskRequest->Disk]++; 00600 } 00601 00602 char name[1025]; 00603 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name ); 00604 00605 RioStorageNodeInfo sn_info; 00606 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info ); 00607 00608 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ] 00609 + m_nNRT[ m_TempDiskRequest->Disk ] 00610 + m_nPending[ m_TempDiskRequest->Disk ]; 00611 00612 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( sn_info.Hostname, name, 00613 queue_sum ); 00614 m_SystemManager->PostITEvent( (MonitorEvent *)it_event ); 00615 }
void CRouter::DataReq | ( | Event * | ep | ) | [private] |
Definition at line 825 of file Router.cpp.
00826 { 00827 int i, j, rc; 00828 00829 // Get Request from RequestEle 00830 DataRequest& Request = (( EventDataRequest* )( event ))->Request; 00831 00832 #ifdef RIO_DEBUG2 00833 struct in_addr clientip; 00834 clientip.s_addr = Request.Target.IPaddress; 00835 RioErr << "No DataReq do Router: " 00836 << "IP " << inet_ntoa(clientip) 00837 << " port " << ntohs( Request.Target.Port ) 00838 << " reqid " << Request.Reqid 00839 << " block " << Request.Block 00840 << endl; 00841 #endif 00842 00843 Request.streamobj->Stream()->getMutexUpdateEvent(); 00844 00845 if( Request.Operation == RealTimePrefetchBlock ) 00846 { 00847 DataPrefetch( event ); 00848 Request.streamobj->Stream()->releaseMutexUpdateEvent(); 00849 } 00850 else if( Request.Operation == RealTimeCancelBlock ) 00851 { 00852 DataCancel( event ); 00853 Request.streamobj->Stream()->releaseMutexUpdateEvent(); 00854 } 00855 else if( Request.Operation == RealTimeSendBlock ) 00856 { 00857 DataSendToClient( event ); 00858 Request.streamobj->Stream()->releaseMutexUpdateEvent(); 00859 } 00860 // ------------------------------------------------------------------------ 00861 else 00862 { 00863 Request.streamobj->Stream()->releaseMutexUpdateEvent(); 00864 /////////////////////////////////////// 00865 //RioErr << "Router:DataReq => RealTimeRead or NonRealTime[Read|Write]" 00866 // << endl; 00867 00868 // ### before request was queued to Router should have 00869 // increased pending count in streamobj so that won't be deleted 00870 // while I/O running? 00871 00872 // -- map request to absolute blocks 00873 // ### this could be a lot cleaner!! 00874 // shouldn't know about internals of RioDiskBlock... 00875 00876 rc = Request.rioobject->MapBlock( Request.Block, Request.RepBits, 00877 &Request.RepNum, Request.Reps ); 00878 00879 if( rc ) 00880 { 00881 // failed in mapping logical block to physical block 00882 // must send error status to client 00883 00884 RioErr << "Router ERROR: Failed mapping logical block." << endl; 00885 00886 Request.Status = ERROR_ROUTER + ERROR_INVALID_BLOCK; 00887 RequestCompleted( &Request.streamobj, event ); 00888 return; 00889 } 00890 00891 // Compute number of disk requests for this data request 00892 int nDiskRequests; 00893 if( Request.Operation == NonRealTimeWrite ) 00894 { 00895 // For write, need to send disk request to all replicas 00896 nDiskRequests = Request.RepNum; 00897 } 00898 else if( ( Request.Operation == RealTimeRead ) || 00899 ( Request.Operation == NonRealTimeRead )) 00900 { 00901 // For read send disk request to just one replica 00902 nDiskRequests = 1; 00903 } 00904 else 00905 { 00906 RioErr << "Router ERROR: Got data request with invalid operation: " 00907 << (int) Request.Operation << endl; 00908 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED; 00909 RequestCompleted( &Request.streamobj, event ); 00910 return; 00911 } 00912 00913 if( nDiskRequests > MaxReplications ) 00914 { 00915 RioErr << "Router ERROR: Got data request with too many replicas: " 00916 << nDiskRequests << endl; 00917 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED; 00918 RequestCompleted( &Request.streamobj, event ); 00919 return; 00920 } 00921 00922 // allocate disk requests 00923 for( i = 0; i < nDiskRequests; i++ ) 00924 { 00925 m_TempDiskRequest[i] = m_Request.New(); 00926 00927 // This should not happen if number of data requests does 00928 // not exceed maximum. 00929 if( m_TempDiskRequest[i] == 0 ) 00930 { 00931 // If there is no available disk request, free previous 00932 // allocated disk requests and send error status to client 00933 for( j = 0; j < i; j++ ) 00934 { 00935 m_Request.Free( m_TempDiskRequest[j] ); 00936 } 00937 RioErr << "Router ERROR: Too many disk requests (UNEXPECTED)" 00938 << endl; 00939 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED; 00940 RequestCompleted( &Request.streamobj, event ); 00941 return; 00942 } 00943 } 00944 00945 // Fill disk request fields: Disk and Pos 00946 if( ( Request.Operation == RealTimeRead ) || 00947 ( Request.Operation == NonRealTimeRead )) 00948 { 00949 // find shortest disk queue 00950 int Min = m_MaxQueueSize + 1; 00951 int Rep = -1; 00952 int nrep = Request.RepNum; 00953 int dk; 00954 int nQueue; 00955 int storageid; 00956 // TODO: Esta variavel devera ser removida quando a NetMgr e a 00957 // RioNeti forem definitivamente removidas do codigo. 00958 unsigned int totalStorages; 00959 // Obtem o endereco do servidor de despacho usado como base do 00960 // mapeamento 00961 NATData input( Request.Target.IPaddress, Request.Target.Port ); 00962 // TODO: Inicializa totalStorages com o numero de storages. Este 00963 // codigo devera ser removido quando a NetMgr e a RioNeti forem 00964 // definitivamente removidas do codigo. 00965 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages ); 00966 00967 for( i = 0; i < nrep ; i++ ) 00968 { 00969 // Garante que somente os discos do servidor de armazenamento 00970 // ativo sejam utilizados. 00971 dk = Request.Reps[i].disk; 00972 if( CheckStorageStatus( dk ) ) 00973 { 00974 // Descobre a ID do storage a partir do disco acessado. 00975 storageid = ( dk - 1 ) / ( SNode::sn_maxdisks ) + 1; 00976 // Se o cliente estiver atrás de NAT, o par (IP, porta) que 00977 // o servidor informar ao storage são diferentes, variam 00978 // para cada storage. 00979 // Para descobrir qual par deve ser usado, deve-se consultar 00980 // o heap m_NAT_map, que sera usado como o par IP, porta a 00981 // ser enviado ao servidor de armazenamento 00982 // Obs: Note que agora sempre deveremos ter o par IP, porta 00983 // do cliente no mapa, porque agora sempre enviamos o 00984 // mapeamento para o servidor, independentemente de o 00985 // cliente estar ou nao atras de NAT. 00986 // 00987 // TODO: Precisamos verificar agora se estamos usando a nova 00988 // ou a implementacao antiga. Todo o codigo dentro do if e 00989 // do seu else devem ser removidos quando a NetMgr e a 00990 // RioNeti forem definitivamente retiradas do codigo. 00991 if( ( Request.Operation == NonRealTimeRead ) && 00992 ( FindNATMapping( input, storageid + 00993 totalStorages + 1 ) ) ) 00994 { 00995 #ifdef RIO_DEBUG2 00996 RioErr << "CRouter::DataReq usando o indice " 00997 << storageid + totalStorages + 1 << " ao inves " 00998 << "do indice " << storageid << "." << endl; 00999 #endif 01000 01001 storageid = storageid + totalStorages + 1; 01002 } 01003 #ifdef RIO_DEBUG2 01004 else 01005 RioErr << "Router::DataReq o novo indice " 01006 << storageid + totalStorages + 1 << " nao foi " 01007 << "usado. Request.Operation = " 01008 << Request.Operation << "." << endl; 01009 #endif 01010 01011 NATData result = m_NAT_map.getElement( input, storageid ); 01012 if( ( result.nat_addr != -1 ) || ( result.nat_port != 0 ) ) 01013 { 01014 if( Request.Operation == RealTimeRead ) 01015 nQueue = m_nRT[dk]; 01016 else 01017 nQueue = m_nNRT[dk] + m_nRT[dk]; 01018 if( nQueue < Min ) 01019 { 01020 Min = nQueue; 01021 Rep = i; 01022 //ClientAddr = result; 01023 } 01024 } 01025 #ifdef RIO_DEBUG2 01026 else 01027 RioErr << "Router WARNING: storage with ID " 01028 << ( ( dk - 1 ) / SNode::sn_maxdisks ) 01029 << " cannot be used" << endl; 01030 #endif 01031 } 01032 #ifdef RIO_DEBUG2 01033 else 01034 RioErr << "Router WARNING: storage with ID " 01035 << ( ( dk - 1 ) / SNode::sn_maxdisks ) 01036 << " is down" << endl; 01037 #endif 01038 } 01039 01040 if( Rep == -1 ) 01041 { 01042 // no disk queue found -- fail request 01043 RioErr << "Router ERROR: Unavailable disk on read block " 01044 << Request.Block << " with reqid " << Request.Reqid 01045 << endl; 01046 m_Request.Free( m_TempDiskRequest[0] ); 01047 Request.Status = ERROR_ROUTER + ERROR_INVALID_DISK; 01048 RequestCompleted( &Request.streamobj, event ); 01049 return; 01050 } 01051 01052 m_TempDiskRequest[0]->Disk = Request.Reps[Rep].disk; 01053 m_TempDiskRequest[0]->Pos = 01054 ( ( (u64) Request.Reps[Rep].block ) * m_BlockSize ); 01055 } 01056 // Write case 01057 else 01058 { 01059 int nQueue; 01060 int dk; 01061 int storageid; 01062 // TODO: Esta variavel devera ser removida quando a NetMgr e a 01063 // RioNeti forem definitivamente removidas do codigo. 01064 unsigned int totalStorages; 01065 // Obtem o endereco do servidor de despacho usado como base do 01066 // mapeamento 01067 NATData input( Request.Target.IPaddress, Request.Target.Port ); 01068 // TODO: Inicializa totalStorages com o numero de storages. Este 01069 // codigo devera ser removido quando a NetMgr e a RioNeti forem 01070 // definitivamente removidas do codigo. 01071 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages ); 01072 01073 for( i = 0; i < nDiskRequests ; i++ ) 01074 { 01075 dk = Request.Reps[i].disk; 01076 if( Request.Operation == NonRealTimeWrite ) 01077 nQueue = m_nNRT[dk] + m_nRT[dk]; 01078 else 01079 nQueue = m_nRT[dk]; 01080 01081 // Descobre a ID do storage a partir do disco acessado. 01082 storageid = ( dk - 1 ) / ( SNode::sn_maxdisks ) + 1; 01083 // Se o cliente estiver atrás de NAT, o par (IP, porta) 01084 // que o servidor informar ao storage são diferentes, 01085 // variam para cada storage. 01086 // Para descobrir qual par deve ser usado, deve-se 01087 // consultar o heap m_NAT_map, que sera usado como o par 01088 // IP, porta a ser enviado ao servidor de armazenamento. 01089 // Obs: Note que agora sempre deveremos ter o par IP, 01090 // porta do cliente no mapa, porque agora sempre 01091 // enviamos o mapeamento para o servidor, 01092 // independentemente de o cliente estar ou nao atras de 01093 // NAT. 01094 // 01095 // TODO: Precisamos verificar agora se estamos usando a nova 01096 // ou a implementacao antiga. Todo o codigo dentro do if e 01097 // do seu else devem ser removidos quando a NetMgr e a 01098 // RioNeti forem definitivamente retiradas do codigo. 01099 if( ( Request.Operation == NonRealTimeWrite ) && 01100 ( FindNATMapping( input, storageid + totalStorages + 1 ) ) ) 01101 { 01102 #ifdef RIO_DEBUG2 01103 RioErr << "CRouter::DataReq usando o indice " 01104 << storageid + totalStorages + 1 << " ao inves do " 01105 << "indice " << storageid << "." << endl; 01106 #endif 01107 01108 storageid = storageid + totalStorages + 1; 01109 } 01110 #ifdef RIO_DEBUG2 01111 else 01112 RioErr << "Router::DataReq o novo indice " 01113 << storageid + totalStorages + 1 << " nao foi " 01114 << "usado. Request.Operation = " 01115 << Request.Operation << "." << endl; 01116 #endif 01117 01118 NATData result = m_NAT_map.getElement( input, storageid ); 01119 01120 if( ( nQueue > m_MaxQueueSize ) || 01121 ( !CheckStorageStatus( dk ) ) || 01122 ( ( result.nat_addr == -1 ) && ( result.nat_port == 0 ) ) ) 01123 { 01124 RioErr << "Router ERROR: Unavailable disk " << dk 01125 << " on write block " << Request.Block 01126 << " with reqid " << Request.Reqid << endl; 01127 RioErr << "nQueue " << nQueue << " m_MaxQueueSize " 01128 << m_MaxQueueSize << endl; 01129 for( j = 0; j < nDiskRequests; j++ ) 01130 { 01131 m_Request.Free( m_TempDiskRequest[j] ); 01132 } 01133 Request.Status = ERROR_ROUTER + ERROR_INVALID_DISK; 01134 RequestCompleted( &Request.streamobj, event ); 01135 return; 01136 } 01137 // Se o disco e valido, inicializamos a sua requisicao. 01138 m_TempDiskRequest[i]->Disk = dk; 01139 m_TempDiskRequest[i]->Pos = 01140 (((u64) Request.Reps[i].block ) * m_BlockSize ); 01141 } 01142 } 01143 01144 Request.RequestCounter = nDiskRequests; 01145 01146 // Informa que a funcao RequestCompleted deve ser chamada. 01147 Request.RequestCancelled = false; 01148 01149 //Filling Disk request fields 01150 int dk; 01151 // Fill other fields of disk requests and include it of disk queue 01152 for( i = 0; i < nDiskRequests ; i++ ) 01153 { 01154 01155 #ifdef RIO_DEBUG2 01156 RioErr << "CRouter::DataReq colocando a requicao para a " 01157 << "replicacao " << i+1 << " na fila do disco " 01158 << m_TempDiskRequest[ i ]->Disk << endl; 01159 #endif 01160 01161 m_TempDiskRequest[i]->StorageId = 0xffffffff;// Invalid storage id 01162 m_TempDiskRequest[i]->Size = m_BlockSize; 01163 m_TempDiskRequest[i]->event = ( EventDataRequest* ) event; 01164 dk = m_TempDiskRequest[i]->Disk; 01165 01166 // Send request to storage node if there are not many pending 01167 // requests 01168 if( m_nPending[dk] < m_MaxPending ) 01169 { 01170 m_Pending[dk].Put( m_TempDiskRequest[i] ); 01171 // update disk queue sizes 01172 m_nPending[dk]++; 01173 if( ( Request.Operation == NonRealTimeWrite ) || 01174 ( Request.Operation == NonRealTimeRead )) 01175 { 01176 m_nNRT[dk]++; 01177 } 01178 else 01179 { 01180 if( m_CollectMeasures ) 01181 { 01182 // update queue size to get estimated queue time 01183 Request.QueueSize = m_nRT[dk]; 01184 } 01185 // ---------------------------------------------------- 01186 m_nRT[dk]++; 01187 } 01188 01189 SendStorage(m_TempDiskRequest[i]); 01190 } 01191 // otherwise put request on disk queue 01192 else // else do if( m_nPending[dk] < m_MaxPending ) 01193 { 01194 // insert disk request at real time or non real time queue 01195 if( ( Request.Operation == NonRealTimeWrite ) || 01196 ( Request.Operation == NonRealTimeRead )) 01197 { 01198 m_NRTqueue[dk].Put( m_TempDiskRequest[i]); 01199 m_nNRT[dk]++; 01200 } 01201 else 01202 { 01203 m_RTqueue[dk].Put( m_TempDiskRequest[i]); 01204 if( m_CollectMeasures ) 01205 { 01206 // update queue size to get estimated queue time 01207 Request.QueueSize = m_nRT[dk]; 01208 } 01209 // -------------------------------------------------------- 01210 m_nRT[dk]++; 01211 // -------------------------------------------------------- 01212 } 01213 } // fim do else do if( m_nPending[dk] < m_MaxPending ) 01214 01215 char name[1025]; 01216 m_DiskMgr->GetDiskName( dk, name ); 01217 01218 RioStorageNodeInfo sn_info; 01219 m_DiskMgr->GetDiskStorageNodeInfo( dk, &sn_info ); 01220 01221 int queue_sum = m_nRT[ dk ] 01222 + m_nNRT[ dk ] 01223 + m_nPending[ dk ]; 01224 01225 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( 01226 sn_info.Hostname, name, queue_sum ); 01227 m_SystemManager->PostITEvent( (MonitorEvent *)it_event ); 01228 } // fim do for( i = 0; i < nDiskRequests ; i++ ) 01229 } // fim do else 01230 }
void CRouter::DataSendToClient | ( | Event * | event | ) | [private] |
Definition at line 620 of file Router.cpp.
00621 { 00622 NATData ClientAddr, ServerAddr; 00623 bool StorageUp, StorageEnabled; 00624 int storageid; 00625 // 00626 // TODO: Remover a variavel seguir quando a RioNeti e a NetMgr nao forem 00627 // mais usadas. 00628 // 00629 unsigned int totalStorages; 00630 00631 DataRequest& Request = (( EventDataRequest* )( event ))->Request; 00632 StrDiskRequest* m_TempDiskRequest = m_Request.Get( Request.BufferId ); 00633 00634 StorageUp = CheckStorageStatus( m_TempDiskRequest->Disk ); 00635 00636 // Obtem o par IP, porta do cliente 00637 ClientAddr.nat_addr = Request.Target.IPaddress; 00638 ClientAddr.nat_port = Request.Target.Port; 00639 00640 // Descobre o numero de storages. 00641 // 00642 // TODO: Remover o comando seguir quando a RioNeti e a NetMgr nao forem mais 00643 // usadas. 00644 // 00645 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages ); 00646 00647 // Descobre a id do storage. 00648 storageid = ( m_TempDiskRequest->Disk - 1 ) / SNode::sn_maxdisks + 1; 00649 // Ajusta a id para as copias em tempo nao real, caso existam pares 00650 // IP, porta associados a nova implementacao de transmissao de dados. 00651 // (estes pares comecarao a partir da posicao do ultimo storage no vetor 00652 // mais 1, e o numero de pares sera o numero de storages. 00653 // 00654 // TODO: Remover a variavel seguir quando a RioNeti e a NetMgr nao forem 00655 // mais usadas. 00656 // 00657 if( ( ( Request.Operation == NonRealTimeRead ) || 00658 ( Request.Operation == NonRealTimeWrite ) ) && 00659 ( FindNATMapping( ClientAddr, storageid + totalStorages + 1 ) ) ) 00660 { 00661 #ifdef RIO_DEBUG2 00662 RioErr << "Router:DataSendToClient usando o indice " 00663 << storageid + totalStorages + 1 << " ao inves do indice " 00664 << storageid << "." << endl; 00665 #endif 00666 00667 storageid = storageid + totalStorages + 1; 00668 } 00669 #ifdef RIO_DEBUG2 00670 else 00671 RioErr << "Router:DataSendToClient o novo indice " 00672 << storageid + totalStorages + 1 << " nao foi usado. " 00673 << "Request.Operation = " << Request.Operation << "." << endl; 00674 #endif 00675 00676 // Verifica se o disco deste servidor de armazenamento pode ser usado pelo 00677 // cliente. 00678 ServerAddr = GetNATMapping( ClientAddr, storageid ); 00679 StorageEnabled = ( ( ServerAddr.nat_addr != 0 ) || 00680 ( ServerAddr.nat_port != 0 ) ); 00681 00682 if( ( m_TempDiskRequest ) && ( StorageUp ) && ( StorageEnabled ) ) 00683 { 00684 00685 #ifdef RIO_DEBUG2 00686 RioErr << "Router:DataSendToClient => block "<< Request.Block 00687 << " bufferid " << Request.BufferId 00688 << " disk " << m_TempDiskRequest->Disk << endl; 00689 #endif 00690 00691 //update bufferstatus 00692 Request.streamobj->Stream()->decServerBufferStatus(); 00693 //update tokenstosend 00694 Request.streamobj->Stream()->decTokensToSend(); 00695 00696 m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest ); 00697 00698 if( m_CollectMeasures ) 00699 { 00700 // don't use this sample 00701 Request.QueueSize = -1; 00702 } 00703 m_nPending[m_TempDiskRequest->Disk]++; 00704 SendStorage(m_TempDiskRequest); 00705 00706 char name[1025]; 00707 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name ); 00708 00709 RioStorageNodeInfo sn_info; 00710 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info ); 00711 00712 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ] 00713 + m_nNRT[ m_TempDiskRequest->Disk ] 00714 + m_nPending[ m_TempDiskRequest->Disk ]; 00715 00716 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( sn_info.Hostname, name, 00717 queue_sum ); 00718 m_SystemManager->PostITEvent( (MonitorEvent *) it_event); 00719 } 00720 else if( m_TempDiskRequest == 0 ) 00721 { 00722 RioErr << "ERROR: Router:DataSendToClient => Could not find request to send" 00723 << endl; 00724 } 00725 else if( StorageEnabled ) 00726 { 00727 RioErr << "Router ERROR (DataSendToClient): Cannot access disk: " 00728 << "storage is down" << endl; 00729 } 00730 else 00731 { 00732 RioErr << "Router ERROR (DataSendToClient): Cannot access disk: " 00733 << "storage is disabled for this client" << endl; 00734 } 00735 00736 }
bool CRouter::FindNATMapping | ( | NATData | server_map, | |
int | stor_id | |||
) |
Funcao para verificar se um mapeamento esta presente no mapa com os mapeamentos.
TODO: Remover a funcao quando a RioNeti e a NetMgr nao forem mais usadas.
server_map | estrutura com o par IP, porta do servidor | |
stor_id | identificador do servidor de armazenamento. |
Definition at line 2142 of file Router.cpp.
02143 { 02144 return m_NAT_map.findElement( server_map, stor_id ); 02145 }
Event* CRouter::Get | ( | void | ) | [inline, private] |
int CRouter::GetDiskServiceTime | ( | EventStorageRequest * | event, | |
u16 | DiskId | |||
) |
Definition at line 1983 of file Router.cpp.
01984 { 01985 int result; 01986 result = m_DiskMgr->GetDiskServiceTime( event, DiskId ); 01987 if( result == -1 ) 01988 { 01989 m_EstDiskServiceTimeOfDisk[ DiskId ] = -1; 01990 } 01991 return result; 01992 }
long double CRouter::GetEstimatedDiskQueueTime | ( | int | disk | ) |
Definition at line 1945 of file Router.cpp.
01946 { 01947 return m_EstDiskQueueTimeOfDisk[disk]; 01948 }
long double CRouter::GetEstimatedQueueTimeOfAllDisks | ( | ) |
unsigned long long int CRouter::GetInvalidStorages | ( | NATData | ClientAddr | ) |
Deternima o vetor de bits com os servidores de armazenamento que nao podem ser usados pelo cliente com endereco passado como parametro, isto e, que estao inativos ou que foram invalidados (apos se tornarem novamente ativos).
ClientAddr | endereco UDP do cliente. Se os campos da estrutura NATData forem ambos 0, somente iremos considerar se os servidores estao ou nao ativos. Em caso contrario, tambem vamos contar os servidores que nao podem ser usados pelo cliente. |
Definition at line 2358 of file Router.cpp.
02359 { 02360 unsigned long long int InvalidStorages; 02361 unsigned int TotalStorages; 02362 NATData StorageAddr; 02363 02364 // Determina o numero de servidores de armazenamento ativos. 02365 m_DiskMgr->GetNumberOfStorageNodes( &TotalStorages ); 02366 02367 // Obtem acesso exclusivo a variavel m_StoragesStatus. 02368 pthread_mutex_lock( &m_StoragesStatusMutex ); 02369 // Inicialmente os servidores que nao podem ser usados sao os que estao 02370 // inativos. 02371 InvalidStorages = m_StoragesStatus; 02372 // Libera o acesso exclusivo a variavel m_StoragesStatus. 02373 pthread_mutex_unlock( &m_StoragesStatusMutex ); 02374 // Adiciona os servidores de armazenamento que se tornaram ativos depois de 02375 // o cliente ter criado a conexao e que, portanto, nao podem ser usados. 02376 if( ( ClientAddr.nat_addr != 0 ) && ( ClientAddr.nat_port != 0 ) ) 02377 { 02378 for( unsigned int Storage = 0; Storage < TotalStorages; Storage++ ) 02379 { 02380 // Devemos adicionar mais 1 ao chamar GetNATMapping porque, no mapa 02381 // de mapeamentos do NAT, os servidores de armazenamento estao a 02382 // partir da posicao 1 (a posicao 0 e usada pelo servidor de 02383 // gerenciamento). 02384 StorageAddr = GetNATMapping( ClientAddr, Storage + 1 ); 02385 if( ( StorageAddr.nat_addr == -1 ) && 02386 ( StorageAddr.nat_port == 0 ) ) 02387 { 02388 02389 #ifdef RIO_DEBUG2 02390 RioErr << "CRouter::GetInvalidStorages o servidor de " 02391 << "armazenamento com a ID " << Storage << " nao pode " 02392 << "ser usado pelo cliente pois seu mapeamento foi " 02393 << "invalidado." << endl; 02394 #endif 02395 02396 InvalidStorages = InvalidStorages | ( 1ull << Storage ); 02397 } 02398 02399 // Verifica se o outro endereco para o storage para a nova classe e 02400 // valido (se existir). Este codigo devera ser removido quando a 02401 // classe NetMgr for definitivamente retirada do codigo. 02402 // 02403 // TODO: Remover todo o codigo dentro do if a seguir, incluindo o 02404 // comando if, quando a RioNeti e a NetMgr nao forem mais usadas. 02405 // 02406 if( FindNATMapping( ClientAddr, Storage + TotalStorages + 1 ) ) 02407 { 02408 StorageAddr = GetNATMapping( ClientAddr, Storage + 02409 TotalStorages + 1 ); 02410 if( ( StorageAddr.nat_addr == -1 ) && 02411 ( StorageAddr.nat_port == 0 ) ) 02412 { 02413 02414 #ifdef RIO_DEBUG2 02415 RioErr << "CRouter::GetInvalidStorages o servidor de " 02416 << "armazenamento com a ID " << Storage << " nao " 02417 << "pode ser usado pelo cliente pois seu mapeamento " 02418 << " foi invalidado (na nova implementacao)." 02419 << endl; 02420 #endif 02421 02422 InvalidStorages = InvalidStorages | ( 1ull << Storage ); 02423 } 02424 } 02425 #ifdef RIO_DEBUG2 02426 else 02427 RioErr << "CRouter::GetInvalidStorages a ID " 02428 << Storage + TotalStorages + 1 << " nao existe no mapa e " 02429 << "nao foi considerada!" << endl; 02430 #endif 02431 } 02432 } 02433 #ifdef RIO_DEBUG2 02434 // Imprime o vetor com os servidores que nao podem ser usados. 02435 RioErr << "CRouter::GetInvalidStorages Vetor com a identificacao dos " 02436 << "servidores de armazenamento que nao podem ser usados: "; 02437 for( int p = MAXNUMSTORAGES - 1; p >= 0 ; p-- ) 02438 { 02439 unsigned int q = ( InvalidStorages >> p ) & 1ull; 02440 RioErr << q; 02441 } 02442 RioErr << endl; 02443 #endif 02444 02445 return InvalidStorages; 02446 }
int CRouter::GetMaxNumberOfDisks | ( | ) |
Definition at line 1967 of file Router.cpp.
01968 { 01969 return m_DiskMgr->GetMaxNumberOfDisks(); 01970 }
Funcao para obter um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho.
server_map | estrutura com o par IP, porta do servidor | |
stor_id | identificador do servidor de armazenamento. |
Definition at line 2126 of file Router.cpp.
02127 { 02128 return m_NAT_map.getElement( server_map, stor_id ); 02129 }
int CRouter::GetNumberOfActiveDisks | ( | ) |
Definition at line 1974 of file Router.cpp.
01975 { 01976 unsigned int numd; 01977 m_DiskMgr->GetNumberOfActiveDisks( &numd); 01978 return (int) numd; 01979 }
int CRouter::GetNumberOfStorageNodes | ( | unsigned int * | NumberOfStorageNodes | ) |
Funcao para obter o numero de storage nodes.
NumberOfStorageNodes | ponteiro para um inteiro nao sinalizado que armazenara o numero de storages. |
Definition at line 2493 of file Router.cpp.
02494 { 02495 return m_DiskMgr->GetNumberOfStorageNodes( NumberOfStorageNodes ); 02496 }
void CRouter::GetStorageInfo | ( | NATData | ClientAddr, | |
bool * | CanRead, | |||
bool * | CanWrite | |||
) |
Deternima se podemos ler ou escrever blocos, no nomento, para um dado cliente cujo endereco e passado como parametro.
ClientAddr | endereco UDP do cliente. Se os campos da estrutura NATData forem ambos 0, somente iremos considerar se os servidores estao ou nao ativos. Em caso contrario, tambem vamos contar os servidores que nao podem ser usados pelo cliente. | |
CanRead | ponteiro para um valor booleano que informa se uma leitura de um bloco pode ser feita. Consideraremos que uma leitura nao podera ser feita se nao for possivel garantir que um bloco pode ser lido, isto e, se o numero de servidores de armazenamento inativos for maior ou igual ao numero de replicacoes. | |
CanWrite | ponteiro para um valor booleano que informa se uma escrita de um bloco pode ser feita. Consideraremos que uma escrita nao podera ser feita se nao for possivel garantir um disco em um servidor de armazenamento diferente para cada replicacao do bloco, isto e, se o numero de servidores de armazenamento ativos for menor do que o numero de replicacoes. |
Definition at line 2450 of file Router.cpp.
02452 { 02453 unsigned long long int InvalidStorages; 02454 unsigned int TotalStorages; 02455 int InactiveStorages, ActiveStorages; 02456 int NumberOfReplications; 02457 02458 // Determina o numero de servidores de armazenamento ativos. 02459 m_DiskMgr->GetNumberOfStorageNodes( &TotalStorages ); 02460 // Determina o numero de replicacoes. 02461 NumberOfReplications = m_DiskMgr->GetNumberOfReplications(); 02462 // Determina quais servidores de armazenamento nao podem ser usados pelo 02463 // cliente 02464 InvalidStorages = GetInvalidStorages( ClientAddr ); 02465 // Conta o numero de servidores de armazenamento nao ativos. 02466 InactiveStorages = 0; 02467 for( unsigned int i = 0; i < MAXNUMSTORAGES; i++ ) 02468 if( ( InvalidStorages & ( 1ull << i ) ) != 0 ) 02469 ( InactiveStorages )++; 02470 // Define o numero de servidores ativos (igual ao numero total de servidores 02471 // menos os servidores inativos). 02472 ActiveStorages = TotalStorages - InactiveStorages; 02473 // A leitura somente podera ser feita, com garantia de sucesso, se o numero 02474 // de servidores inativos for menor do que o numero de replicacoes. 02475 *CanRead = ( InactiveStorages < NumberOfReplications ); 02476 // A escrita somente podera ser feita se o numero de servidores ativos for 02477 // maior ou igual ao numero de replicacoes pois, em caso contrario, nao 02478 // existirao servidores suficientes para salvarmos todas as replicacoes de 02479 // um bloco. 02480 *CanWrite = ( ActiveStorages >= NumberOfReplications ); 02481 #ifdef RIO_DEBUG2 02482 RioErr << "RioSession::GetStorageInfo estado dos servidores de " 02483 << "armazenamento ao receber um pedido de dados : ativos = " 02484 << ActiveStorages << ", inativos = " << InactiveStorages 02485 << ", totais = " << TotalStorages << ", replicacoes = " 02486 << NumberOfReplications << ", CanRead = " 02487 << ( *CanRead ? "true": "false" ) << ", CanWrite = " 02488 << ( *CanWrite ? "true": "false" ) << endl; 02489 #endif 02490 }
int CRouter::Initialize | ( | RouterConfig * | Config | ) |
Definition at line 199 of file Router.cpp.
00200 { 00201 int i; 00202 00203 if( m_initialized ) 00204 { 00205 RioErr << "Router.Initialize: already initialized" << endl; 00206 return ERROR_ROUTER + ERROR_INITIALIZED; 00207 } 00208 00209 // Update number of disks 00210 m_nDisks = Config->nDisks; 00211 00212 // Update block size 00213 m_BlockSize = Config->BlockSize; 00214 00215 // Update maximum number of pending requests per disk 00216 m_MaxPending = Config->MaxPending; 00217 00218 // Update flag to collect measures 00219 m_CollectMeasures = Config->CollectMeasures; 00220 // Update Parameter to collect measures 00221 m_EstimatedParameter = Config->EstimatedTimeParameter; 00222 m_UpdatedEstimatedDiskServiceTime = 0; 00223 00224 #ifdef __QUEUE_RESP_LOG 00225 if( m_CollectMeasures ) 00226 { 00227 // open files to save measures 00228 // Variavel usada para compor os nomes dos arquivos com os logs. 00229 char LogFileName[ MaxPathSize ]; 00230 // Compoe o nome do arquivo com os logs. 00231 strcpy( LogFileName, Config->LogsDirectory ); 00232 strcat( LogFileName, LOGRESP ); 00233 m_logRESP.open( LogFileName ); 00234 strcpy( LogFileName, Config->LogsDirectory ); 00235 strcat( LogFileName, LOGSQT ); 00236 m_logSQT.open ( LogFileName ); 00237 m_logRESP << "#Measures in msec" << endl; 00238 m_logSQT << "#Measures in msec" << endl; 00239 } 00240 #endif 00241 // ------------------------------------------------------------------------ 00242 00243 m_DiskMgr = Config->DiskManager; 00244 m_SystemManager = Config->SystemManager; 00245 00246 // allocate disk queues. 3 queues: real-time, non real-time and 00247 // pending requests (i.e. submited to storage node ) 00248 00249 m_RTqueue = new CDiskRequestQueue[m_nDisks]; 00250 if( m_RTqueue == 0 ) 00251 { 00252 RioErr << "Router.Initialize: Out of memory" << endl; 00253 CleanUp(); 00254 return ERROR_ROUTER + ERROR_MEMORY; 00255 } 00256 00257 m_NRTqueue = new CDiskRequestQueue[m_nDisks]; 00258 if( m_NRTqueue == 0 ) 00259 { 00260 RioErr << "Router.Initialize: Out of memory" << endl; 00261 CleanUp(); 00262 return ERROR_ROUTER + ERROR_MEMORY; 00263 } 00264 00265 m_Pending = new CDiskRequestQueue[m_nDisks]; 00266 if( m_Pending == 0 ) 00267 { 00268 RioErr << "Router.Initialize: Out of memory" << endl; 00269 CleanUp(); 00270 return ERROR_ROUTER + ERROR_MEMORY; 00271 } 00272 m_nRT = new int [m_nDisks]; 00273 if( m_nRT == 0 ) 00274 { 00275 RioErr << "Router.Initialize: Out of memory" << endl; 00276 CleanUp(); 00277 return ERROR_ROUTER + ERROR_MEMORY; 00278 } 00279 00280 m_nNRT = new int [m_nDisks]; 00281 if( m_nNRT == 0 ) 00282 { 00283 RioErr << "Router.Initialize: Out of memory" << endl; 00284 CleanUp(); 00285 return ERROR_ROUTER + ERROR_MEMORY; 00286 } 00287 00288 m_nPending = new int [m_nDisks]; 00289 if( m_nPending == 0 ) 00290 { 00291 RioErr << "Router.Initialize: Out of memory" << endl; 00292 CleanUp(); 00293 return ERROR_ROUTER + ERROR_MEMORY; 00294 } 00295 00296 m_MaxQueueSize = ( Config->MaxDataRequests * MaxReplications ); 00297 00298 for( i = 0; i < m_nDisks; i++ ) 00299 { 00300 // Set size of real time queues to large value 00301 // This indicates that the disk is not available 00302 m_nRT[i] = m_MaxQueueSize + 1; 00303 m_nNRT[i] = 0; 00304 m_nPending[i] = 0; 00305 } 00306 00307 m_TempDiskRequest = new StrDiskRequest* [MaxReplications]; 00308 if( m_TempDiskRequest == 0 ) 00309 { 00310 RioErr << "Router.Initialize: Out of memory" << endl; 00311 CleanUp(); 00312 return ERROR_ROUTER + ERROR_MEMORY; 00313 } 00314 00315 // number of requests must be >= maxbuffer * maxclients so set 00316 // maxrequests to max value. 00317 unsigned int maxrequests = 0xffff; 00318 00319 #ifdef RIO_DEBUG2 00320 RioErr << "Router: Max number of Requests " << maxrequests << endl; 00321 #endif 00322 // ------------------------------------------------------------------------ 00323 00324 int hResult; 00325 hResult = m_Request.Initialize( maxrequests ); 00326 if( hResult ) 00327 { 00328 RioErr << "Could not initialize m_Requests at Router" 00329 << ". (" << maxrequests << ")" << endl; 00330 return hResult; 00331 } 00332 00333 m_initialized = true; 00334 00335 return S_OK; 00336 }
void CRouter::PrintQueues | ( | ) | [private] |
Definition at line 1951 of file Router.cpp.
void CRouter::Put | ( | Event * | ep | ) |
Definition at line 2092 of file Router.cpp.
02093 { 02094 EventDataRequest* event; 02095 event = ( EventDataRequest *) ep; 02096 02097 if( ( ( ep->Type==EventTypeRTDataRequest ) || 02098 ( ep->Type==EventTypeNRTDataRequest ) 02099 ) && ( m_CollectMeasures ) 02100 ) 02101 { 02102 gettimeofday( &event->Request.ArrivalTime, 0 ); 02103 } 02104 02105 m_Queue.Put(ep); 02106 }
void CRouter::RemoveNATMapping | ( | NATData | server_map, | |
int | stor_id | |||
) |
Funcao para remover um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho.
server_map | estrutura com o par IP, porta do servidor | |
stor_id | identificador do servidor de armazenamento. |
Definition at line 2118 of file Router.cpp.
02119 { 02120 m_NAT_map.removeElement( server_map, stor_id ); 02121 }
void CRouter::RequestCompleted | ( | RioStreamObj ** | StreamObjAddr, | |
Event * | event | |||
) |
Funcao para completar uma requisicao (chamando a RequestCompleted do objeto RioStreamObj passado como parametro) removendo, quando necessario, o objeto do stream e alterando o ponteiro passado como parametro para NULL.
Obs: Se o objeto passado for NULL, indicando que o objeto foi incorretamente removido, um erro sera impresso no log do servidor informando deste erro.
StreamObjAddr | ponteiro com o endereco com o ponteiro para o objeto para o qual deveremos chamar a funcao RequestCompleted (isso e necessario porque, se removermos o objeto, o ponteiro para ele devera ser alterado para NULL). | |
event | ponteiro do evento a ser passado a funcao RequestCompleted. |
Definition at line 2501 of file Router.cpp.
02502 { 02503 // Variavel booleana para armazenar a informacao de se o objeto de stream 02504 // deve ou nao ser removido nesta funcao. 02505 bool RemoveStreamObj; 02506 02507 if( ( *StreamObjAddr ) != NULL ) 02508 { 02509 ( *StreamObjAddr )->RequestCompleted( event, &RemoveStreamObj ); 02510 if( RemoveStreamObj ) 02511 { 02512 // Neste caso, tratamos da ultima requisicao recebida e o stream ja 02513 // foi fechado, mas nao foi removido por causa das requisicoes que 02514 // ainda estavam pendentes quando ele foi fechado. Entao, devemos 02515 // remover o stream e alterar o ponteiro passado como parametro 02516 // para NULL, para podermos detectar eventuais erros futuros. Note 02517 // que o ponteiro nao deveria ser usado em outras partes do Router 02518 // porque, nestas partes, o stream ainda deveria estar aberto. 02519 //RioErr << "CRouter::RequestCompleted deletando o objeto " 02520 // << "RioStreamObj cujo ponteiro e igual a 0x " << hex 02521 // << ( unsigned long ) *StreamObjAddr << dec << "." << endl; 02522 delete *StreamObjAddr; 02523 *StreamObjAddr = NULL; 02524 } 02525 } 02526 else 02527 { 02528 RioErr << "[CRouter] RequestCompleted nao foi possivel executar a " 02529 << "funcao porque foi passado um ponteiro NULO!" << endl; 02530 } 02531 }
void CRouter::routerthread | ( | ) | [private] |
Definition at line 398 of file Router.cpp.
00399 { 00400 Event* event; 00401 00402 RioErr << "ROUTERTHREADID " << syscall( SYS_gettid ) << endl; 00403 00404 while( 1 ) 00405 { 00406 event = Get(); 00407 00408 #ifdef RIO_DEBUG2 00409 RioErr << "CRouter::routerthread processando o evento com o tipo " 00410 << event->Type << endl; 00411 #endif 00412 00413 switch( event->Type ) 00414 { 00415 case EventTypeRTDataRequest: 00416 case EventTypeNRTDataRequest: 00417 DataReq( event ); 00418 break; 00419 00420 case EventTypeStorageReply: 00421 // -- free specific request for that disk 00422 StorageReply(& (((EventStorageReply*)(event))->StorageReply )); 00423 EventManager.Free(event); 00424 break; 00425 00426 case EventTypeAddDisk: 00427 AddDisk((( EventAddDisk* )( event ))->Disk ); 00428 EventManager.Free(event); 00429 break; 00430 00431 // Novos eventos usados para tratar do caso em que um servidor de 00432 // armazenamento cai e depois volta a estar disponivel. 00433 // Evento para tratar a queda de um servidor de armazenamento. 00434 case EventTypeStorageDown: 00435 00436 #ifdef RIO_DEBUG2 00437 RioErr << "CRouter::routerthread recebida a mensagem " 00438 << "EventTypeStorageDown da classe DiskMgr informando " 00439 << "que o servidor de armazanamento com a ID " 00440 << ( ( EventStorageDown* ) ( event ) )->StorageId 00441 << " parou de funcionar" << endl; 00442 #endif 00443 00444 StorageDown( ( ( EventStorageDown* ) ( event ) )->StorageId, 00445 ( ( EventStorageDown* ) ( event ) )-> 00446 EmptyStorageQueue ); 00447 EventManager.Free( event ); 00448 break; 00449 // Evento para tratar a volta de um servidor de armazenamento. 00450 case EventTypeStorageUp: 00451 00452 #ifdef RIO_DEBUG2 00453 RioErr << "CRouter::routerthread recebida a mensagem " 00454 << "EventTypeStorageUp da classe DiskMgr informando " 00455 << "que o servidor de armazanamento com a ID " 00456 << ( ( EventStorageDown* ) ( event ) )->StorageId 00457 << " voltou a funcionar" << endl; 00458 #endif 00459 00460 StorageUp( ( ( EventStorageUp* )( event ) )->StorageId ); 00461 EventManager.Free( event ); 00462 break; 00463 00464 // Novo evento usado para parar a thread do Router. 00465 case EventTypeFinalizeThread: 00466 #ifdef RIO_DEBUG2 00467 RioErr << "CRouter::routerthread recebida a mensagem " 00468 << "EventTypeFinalizeThread informando que a thread " 00469 << "deve ser finalizada" << endl; 00470 #endif 00471 EventManager.Free( event ); 00472 //pthread_exit( NULL ); 00473 return; 00474 break; 00475 00476 default: 00477 RioErr << "Router.Routerthread unknown element type " 00478 << (int *) event->Type << endl; 00479 EventManager.Free(event); 00480 } 00481 } 00482 }
void * CRouter::routerthreadep | ( | void * | param | ) | [static, private] |
Definition at line 391 of file Router.cpp.
00392 { 00393 ( (CRouter *) parm)->routerthread(); 00394 return 0; 00395 }
void CRouter::SendStorage | ( | StrDiskRequest * | Request | ) | [private] |
Definition at line 1609 of file Router.cpp.
01610 { 01611 EventStorageRequest* event = NULL; 01612 01613 #ifdef RIO_DEBUG2 01614 int bufferid = ((EventDataRequest*)(Request->event))->Request.BufferId; 01615 RioErr << "Router:SendStorage => RouterId " << Request->Id 01616 << " BufferId " << bufferid 01617 << " Disk " << Request->Disk 01618 << " pending requests " << m_nPending[Request->Disk] << endl; 01619 01620 if( m_nPending[Request->Disk] > m_MaxPending ) 01621 { 01622 RioErr << "Router:SendStorage => " 01623 << " Disk " << Request->Disk 01624 << " pending requests " << m_nPending[Request->Disk] 01625 << " RT " <<m_nRT[Request->Disk] << endl; 01626 } 01627 #endif 01628 01629 if( ( m_CollectMeasures ) && ( Request->event->Request.QueueSize != -1 ) ) 01630 { 01631 struct timeval now; 01632 gettimeofday( &now, 0 ); 01633 double sample_msec = getInterval( Request->event->Request.ArrivalTime, 01634 now ); 01635 Request->event->Request.ArrivalTime = now; 01636 01637 // updates m_EstimatedQueueTime of the disk 01638 int disk = Request->event->Request.Reps[0].disk; 01639 int queuesize = Request->event->Request.QueueSize; 01640 01641 if( queuesize < m_MaxPending + 1 ) 01642 queuesize = 0; 01643 else if( queuesize > ( m_MaxPending + 300 )) 01644 queuesize = 300; 01645 else 01646 queuesize = queuesize - m_MaxPending; 01647 01648 if( m_EstDiskQueueTime[ disk ][ queuesize ] == 0 ) 01649 { 01650 m_EstDiskQueueTime[ disk ][ queuesize ] = sample_msec; 01651 } 01652 else 01653 { 01654 m_EstDiskQueueTime[ disk ][ queuesize ] = 01655 (( 1 - m_EstimatedParameter) * 01656 m_EstDiskQueueTime[ disk ][ queuesize ]) + 01657 m_EstimatedParameter * sample_msec; 01658 } 01659 if( m_EstDiskQueueTimeOfDisk[ disk ] == 0 ) 01660 { 01661 m_EstDiskQueueTimeOfDisk[ disk ] = sample_msec; 01662 } 01663 else 01664 { 01665 m_EstDiskQueueTimeOfDisk[ disk ] = ( 1 - m_EstimatedParameter) * 01666 m_EstDiskQueueTimeOfDisk[ disk ] + 01667 m_EstimatedParameter * sample_msec; 01668 } 01669 01670 #ifdef __QUEUE_RESP_LOG 01671 m_logSQT << " disk " << disk 01672 << " sample " << sample_msec 01673 << " est " << m_EstDiskQueueTimeOfDisk[ disk ] << endl; 01674 #endif 01675 } 01676 01677 if( Request->event->Request.Operation == RealTimePrefetchBlock ) 01678 { 01679 MsgRSSnewRequest* msg; 01680 01681 Request->event->Request.RequestCounter = 1; 01682 01683 event = (EventStorageRequest*) EventManager.New( 01684 EventTypeStorageRequest ); 01685 msg = &( event->StorageRequest.Request.New ); 01686 msg->Type = MSG_RSS_FETCH | RSS_INTERMEDIATE; 01687 msg->Size = SizeMsgRSSnewRequest; 01688 msg->Token = RSS_TOKEN_STORAGE; 01689 // Doesn't know yet ClientId 01690 msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid; 01691 msg->RouterId = Request->Id; 01692 // Doesn't know yet IP 01693 msg->IPaddr = 01694 ((EventDataRequest*)(Request->event))->Request.Target.IPaddress; 01695 // Doesn't know yet Port 01696 msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port; 01697 // Se o cliente estiver atrás de NAT, o par (IP, porta) que o servidor 01698 // deve informar ao storage são diferentes, variam para cada storage 01699 // Para descobrir qual par deve ser usado, deve-se consultar o heap 01700 // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente não 01701 // está atrás de NAT e o par atual deve ser mantido; caso contrário, 01702 // utiliza-se o resultado da consulta 01703 // Aparentemente este trecho de código não é necessário, visto que o IP 01704 // e porta não são conhecidos neste momento. Se preciso, descomentar 01705 // o código abaixo e testar. 01706 //NATData input( msg->IPaddr, msg->Port ); 01707 //NATData result = m_NAT_map.getElement( input, 01708 // (Request->Disk-1)/(SNode::sn_maxdisks)+1 ); 01709 //if( result.nat_addr != 0 ) 01710 //{ 01711 // msg->IPaddr = result.nat_addr; 01712 // msg->Port = result.nat_port; 01713 //} 01714 msg->Disk = Request->Disk; 01715 msg->Pos = Request->Pos; 01716 msg->DataSize = Request->Size; 01717 01718 #ifdef RIO_DEBUG2 01719 RioErr << " prefetch RouterId "<< Request->Id 01720 << " ClientId "<< msg->ClientId 01721 << " msg-Type (F+Inter)" << msg->Type << endl; 01722 #endif 01723 } 01724 else if( Request->event->Request.Operation == RealTimeCancelBlock ) 01725 { 01726 if( Request->event->Request.BufferId == -1 ) 01727 { 01728 #ifdef RIO_DEBUG2 01729 RioErr << "request block was not submit to storage yet, " 01730 << " so just free request" << endl; 01731 #endif 01732 01733 m_nRT[Request->Disk]--; 01734 m_nPending[Request->Disk]--; 01735 01736 //the request was not submit to storage yet, so just free request 01737 Request->event->Request.Status = 0; 01738 RequestCompleted( &Request->event->Request.streamobj, 01739 ( Event* ) ( Request->event ) ); 01740 char name[1025]; 01741 m_DiskMgr->GetDiskName( Request->Disk, name ); 01742 01743 RioStorageNodeInfo sn_info; 01744 m_DiskMgr->GetDiskStorageNodeInfo( Request->Disk, &sn_info ); 01745 01746 int queue_sum = m_nRT[ Request->Disk ] 01747 + m_nNRT[ Request->Disk ] 01748 + m_nPending[ Request->Disk ]; 01749 01750 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( 01751 sn_info.Hostname, name, queue_sum ); 01752 m_SystemManager->PostITEvent( (MonitorEvent *) it_event ); 01753 return; 01754 } 01755 else 01756 { 01757 MsgRSSpendRequest* msg; 01758 event = (EventStorageRequest*) EventManager.New( 01759 EventTypeStorageRequest ); 01760 msg = &( event->StorageRequest.Request.Pending ); 01761 msg->Type = MSG_RSS_CANCEL; 01762 msg->Size = SizeMsgRSSpendRequest; 01763 msg->Token = RSS_TOKEN_STORAGE; 01764 //Update ClientId 01765 msg->ClientId = ((EventDataRequest*)( 01766 Request->event))->Request.Reqid; 01767 //Update StorageId 01768 msg->StorageId = Request->StorageId; 01769 msg->RouterId = Request->Id; 01770 //Update IP 01771 msg->IPaddr = ((EventDataRequest*)( 01772 Request->event))->Request.Target.IPaddress; 01773 //Update Port 01774 msg->Port = ((EventDataRequest*)( 01775 Request->event))->Request.Target.Port; 01776 // Se o cliente estiver atrás de NAT, o par (IP, porta) que o 01777 // servidor deve informar ao storage são diferentes, variam para 01778 // cada storage 01779 // Para descobrir qual par deve ser usado, deve-se consultar o heap 01780 // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente 01781 // não está atrás de NAT e o par atual deve ser mantido; caso 01782 // contrário, utiliza-se o resultado da consulta 01783 NATData input( msg->IPaddr, msg->Port ); 01784 NATData result = m_NAT_map.getElement( input, 01785 (Request->Disk-1)/(SNode::sn_maxdisks)+1 ); 01786 if( result.nat_addr != 0 ) 01787 { 01788 msg->IPaddr = result.nat_addr; 01789 msg->Port = result.nat_port; 01790 } 01791 01792 #ifdef RIO_DEBUG2 01793 RioErr << " cancel RouterId " << Request->Id 01794 << " ClientId " << msg->ClientId 01795 << " StorageId " << msg->StorageId 01796 << " msg-Type C " << msg->Type << endl; 01797 #endif 01798 } 01799 } 01800 else if( Request->event->Request.Operation == RealTimeSendBlock ) 01801 { 01802 MsgRSSpendRequest* msg; 01803 event = (EventStorageRequest*) EventManager.New( 01804 EventTypeStorageRequest ); 01805 msg = &( event->StorageRequest.Request.Pending ); 01806 msg->Type = MSG_RSS_SEND; 01807 msg->Size = SizeMsgRSSpendRequest; 01808 msg->Token = RSS_TOKEN_STORAGE; 01809 //Update ClientId 01810 msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid; 01811 //Update StorageId 01812 msg->StorageId = Request->StorageId; 01813 msg->RouterId = Request->Id; 01814 //Update IP 01815 msg->IPaddr = ((EventDataRequest*)( 01816 Request->event))->Request.Target.IPaddress; 01817 //Update Port 01818 msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port; 01819 // Se o cliente estiver atrás de NAT, o par (IP, porta) que o servidor 01820 // deve informar ao storage são diferentes, variam para cada storage 01821 // Para descobrir qual par deve ser usado, deve-se consultar o heap 01822 // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente não 01823 // está atrás de NAT e o par atual deve ser mantido; caso contrário, 01824 // utiliza-se o resultado da consulta 01825 NATData input( msg->IPaddr, msg->Port ); 01826 NATData result = m_NAT_map.getElement( input, 01827 (Request->Disk-1)/(SNode::sn_maxdisks)+1 ); 01828 if( result.nat_addr != 0 ) 01829 { 01830 msg->IPaddr = result.nat_addr; 01831 msg->Port = result.nat_port; 01832 } 01833 01834 /*not send ack*/ 01835 msg->StreamTraffic = ((Event*)(Request->event))->Type; 01836 01837 // Adiciona a mensagem a taxa de transferencia do Video. 01838 ((EventDataRequest*)(Request->event))->Request.rioobject->GetVideoRate( &msg->VideoRate ); 01839 01840 #ifdef RIO_DEBUG2 01841 RioErr << " send RouterId " << Request->Id 01842 << " ClientId " << msg->ClientId 01843 << " StorageId " << msg->StorageId 01844 << " msg-Type S " << msg->Type << endl; 01845 #endif 01846 } 01847 // ------------------------------------------------------------------------ 01848 else 01849 { 01850 MsgRSSnewRequest* msg; 01851 // Armazena o identificador do storage. 01852 int storageid; 01853 // TODO: Esta variavel devera ser removida quando a NetMgr e a 01854 // RioNeti forem definitivamente removidas do codigo. 01855 unsigned int totalStorages; 01856 01857 event = (EventStorageRequest*)EventManager.New(EventTypeStorageRequest); 01858 msg = &( event->StorageRequest.Request.New ); 01859 if( ( Request->event->Request.Operation == RealTimeRead ) || 01860 ( Request->event->Request.Operation == NonRealTimeRead)) 01861 msg->Type = MSG_RSS_READ; 01862 else 01863 msg->Type = MSG_RSS_WRITE; 01864 01865 msg->Size = SizeMsgRSSnewRequest; 01866 msg->Token = RSS_TOKEN_STORAGE; 01867 msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid; 01868 msg->RouterId = Request->Id; 01869 msg->IPaddr = ((EventDataRequest*)( 01870 Request->event))->Request.Target.IPaddress; 01871 msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port; 01872 01873 // Descobre o identificador do storage. 01874 storageid = ( Request->Disk - 1 )/( SNode::sn_maxdisks ) + 1; 01875 01876 // TODO: Inicializa totalStorages com o numero de storages. Este codigo 01877 // devera ser removido quando a NetMgr e a RioNeti forem 01878 // definitivamente removidas do codigo. 01879 m_DiskMgr->GetNumberOfStorageNodes( &totalStorages ); 01880 01881 // Obtem o endereco do servidor de despacho usado como base do 01882 // mapeamento. 01883 NATData input( msg->IPaddr, msg->Port ); 01884 01885 // Se o cliente estiver atrás de NAT, o par (IP, porta) que o servidor 01886 // deve informar ao storage são diferentes, variam para cada storage 01887 // Para descobrir qual par deve ser usado, deve-se consultar o heap 01888 // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente não 01889 // está atrás de NAT e o par atual deve ser mantido; caso contrário, 01890 // utiliza-se o resultado da consulta 01891 // 01892 // TODO: Precisamos verificar agora se estamos usando a nova ou a 01893 // implementacao antiga. Todo o codigo dentro do if e do seu else devem 01894 // ser removidos quando a NetMgr e a RioNeti forem definitivamente 01895 // retiradas do codigo. 01896 if( ( ( Request->event->Request.Operation == NonRealTimeRead ) || 01897 ( Request->event->Request.Operation == NonRealTimeWrite ) ) && 01898 ( FindNATMapping( input, storageid + totalStorages + 1 ) ) ) 01899 { 01900 #ifdef RIO_DEBUG2 01901 RioErr << "CRouter::SendStorage usando o indice " 01902 << storageid + totalStorages + 1 << " ao inves do indice " 01903 << storageid << "." << endl; 01904 #endif 01905 01906 storageid = storageid + totalStorages + 1; 01907 } 01908 #ifdef RIO_DEBUG2 01909 else 01910 RioErr << "Router::SendStorage o novo indice " 01911 << storageid + totalStorages + 1 << " nao foi usado. " 01912 << "Request->event->Request.Operation = " 01913 << Request->event->Request.Operation << "." << endl; 01914 #endif 01915 01916 // Obtem o mapeamento a ser usado pelo storage. 01917 NATData result = m_NAT_map.getElement( input, storageid ); 01918 if( result.nat_addr != 0 ) 01919 { 01920 msg->IPaddr = result.nat_addr; 01921 msg->Port = result.nat_port; 01922 } 01923 msg->Disk = Request->Disk; 01924 msg->Pos = Request->Pos; 01925 msg->DataSize = Request->Size; 01926 01927 /*not send ack*/ 01928 msg->StreamTraffic = ((Event*)(Request->event))->Type; 01929 01930 // Adiciona a mensagem a taxa de transferencia do Video. 01931 ((EventDataRequest*)(Request->event))->Request.rioobject->GetVideoRate( &msg->VideoRate ); 01932 01933 #ifdef RIO_DEBUG2 01934 RioErr << " read and send RouterId "<< Request->Id 01935 << " ClientId "<< msg->ClientId 01936 << " msg-Type RS|RW " << msg->Type << endl; 01937 #endif 01938 } 01939 01940 m_DiskMgr->SendStorageNode( event, u16 ( Request->Disk )); 01941 return; 01942 }
void CRouter::SetDiskServiceTime | ( | int | Disk, | |
double | EstServiceTimeOfDisk, | |||
double | EstServiceTime[301] | |||
) |
Definition at line 1996 of file Router.cpp.
01999 { 02000 for(int j = 0; j < 301; j++) 02001 { 02002 m_EstDiskServiceTime[ Disk ][ j ] = EstServiceTime[ j ]; 02003 } 02004 02005 m_EstDiskServiceTimeOfDisk[ Disk ] = EstServiceTimeOfDisk; 02006 pthread_mutex_lock(&m_MutexUpdated); 02007 m_UpdatedEstimatedDiskServiceTime++; 02008 pthread_cond_broadcast(&m_ConditionUpdated); 02009 pthread_mutex_unlock(&m_MutexUpdated); 02010 02011 return; 02012 }
Definition at line 2111 of file Router.cpp.
02112 { 02113 m_NAT_map.insertElement( server_map, stor_id, stor_map ); 02114 }
int CRouter::Start | ( | void | ) |
Definition at line 338 of file Router.cpp.
00339 { 00340 pthread_attr_t attrib; 00341 00342 if( !m_initialized ) 00343 { 00344 RioErr << "Router.Start: not initiallized" << endl; 00345 return ERROR_ROUTER + ERROR_NOT_INITIALIZED; 00346 } 00347 if( m_started ) 00348 { 00349 RioErr << "Router.Start: already started" << endl; 00350 return ERROR_ROUTER + ERROR_STARTED; 00351 } 00352 00353 pthread_attr_init( &attrib ); 00354 pthread_attr_setstacksize( &attrib, 3*PTHREAD_STACK_MIN ); 00355 00356 if( pthread_create( &m_thread, &attrib, &routerthreadep, (void *)this ) ) 00357 { 00358 RioErr << "Router.Start pthread_create failed " 00359 << strerror(errno) << endl; 00360 return (ERROR_ROUTER + ERROR_CREATE_THREAD); 00361 } 00362 00363 m_started = true; 00364 return S_OK; 00365 }
int CRouter::Stop | ( | void | ) |
Definition at line 367 of file Router.cpp.
00368 { 00369 Event *event; 00370 00371 if( !m_started ) 00372 { 00373 RioErr << "Router.Stop: not started" << endl; 00374 return( ERROR_ROUTER + ERROR_NOT_STARTED ); 00375 } 00376 00377 // Coloca o evento do termino da thread na fila de eventos. 00378 event = EventManager.New( EventTypeFinalizeThread ); 00379 Put( event ); 00380 00381 // ### int term should have timer event which will wake router 00382 // soon so don't need to do anything other than set flag... 00383 00384 pthread_join( m_thread, NULL ); 00385 00386 m_started = false; 00387 return S_OK; 00388 }
void CRouter::StorageDown | ( | int | StorageId, | |
bool | EmptyStorageQueue | |||
) | [private] |
Funcao para definir que um servidor de armazenamento esta temporariamente indisponivel.
ServerId | identificador do servidor de armazenamento. | |
EmptyStorageQueue | true se a fila do servidor de armazenamento deve ser esvaziada e false em caso contrario. Obs: a fila nao deve ser esvaziada se a mensagem e enviada devido ao servidor nao estiver habilitado quando o servidor de gerenciamento iniciar a sua execucao. |
Definition at line 2157 of file Router.cpp.
02158 { 02159 StrDiskRequest *DiskRequest; 02160 RioStorageNodeInfo NodeInfo; 02161 RioStorageNodeInfo sn_info; 02162 char name[1025]; 02163 int FirstDiskId; 02164 bool EmptyQueues; 02165 bool IsPending; 02166 // Obtem acesso exclusivo a variavel m_StoragesStatus. 02167 pthread_mutex_lock( &m_StoragesStatusMutex ); 02168 // Define o servidor StorageId como inativo. 02169 m_StoragesStatus = m_StoragesStatus | ( 1ull << StorageId ); 02170 // Libera o acesso exclusivo a variavel m_StoragesStatus. 02171 pthread_mutex_unlock( &m_StoragesStatusMutex ); 02172 if( EmptyStorageQueue ) 02173 { 02174 // Determina o numero de discos do servidor de armazenamento. 02175 m_DiskMgr->GetStorageNodeInfo( StorageId, &NodeInfo ); 02176 // Calcula o identificador do primeiro disco do servidor de 02177 // armazenamento. Ele sera igual ao Id do servidor multiplado pelo 02178 // numero maximo de discos em cada servidor de armazanemento, dado por 02179 // SNode::sn_maxdisks. Devemos somar mais 1 porque o identificador 0 e 02180 // reservado. 02181 FirstDiskId = StorageId * SNode::sn_maxdisks + 1; 02182 // Remove as solicitacoes das filas de todos os discos do servidor de 02183 // armazenamento, e gera os codigos de erro esperados. 02184 for( int dk = FirstDiskId; dk < FirstDiskId + NodeInfo.NumberOfDisks; 02185 dk++ ) 02186 { 02187 EmptyQueues = true; 02188 // Remove as socilitacoes de todas as filas deste disco, gerando um 02189 // erro para o cliente 02190 while( EmptyQueues ) 02191 { 02192 // Primeiramente tentamos remover todas as solicitacoes de disco 02193 // da fila m_Pending, depois da fila m_RTqueue, e finalmente as 02194 // da fila m_NRTqueue. 02195 if( m_nPending[ dk ] > 0 ) 02196 { 02197 DiskRequest = m_Pending[ dk ].Get(); 02198 02199 #ifdef RIO_DEBUG2 02200 RioErr << "CRouter::StorageDown removendo a requisicao " 02201 << DiskRequest << " de m_Pending. m_Pending[ " << dk 02202 << " ] = " << m_nPending[ dk ] << endl; 02203 #endif 02204 02205 m_nPending[ dk ]--; 02206 // Verifica qual fila deve ser decrementada (pelo que notei, 02207 // sempre que algo e colocado em m_Pending, ou m_nRT[ dk ] 02208 // e incrementado, ou m_nNRT[ dk ] e incrementado. 02209 DataRequest& Request = DiskRequest->event->Request; 02210 if( ( Request.Operation == NonRealTimeWrite ) || 02211 ( Request.Operation == NonRealTimeRead ) ) 02212 m_nNRT[dk]--; 02213 else 02214 m_nRT[dk]--; 02215 IsPending = true; 02216 } 02217 else if( m_nRT[ dk ] > 0 ) 02218 { 02219 DiskRequest = m_RTqueue[ dk ].Get(); 02220 02221 #ifdef RIO_DEBUG2 02222 RioErr << "CRouter::StorageDown removendo a requisicao " 02223 << DiskRequest << " de m_RTqueue. m_nRT[ " << dk 02224 << " ] = " << m_nRT[ dk ] << endl; 02225 #endif 02226 02227 m_nRT[ dk ]--; 02228 IsPending = false; 02229 } 02230 else if( m_nNRT[ dk ] > 0 ) 02231 { 02232 DiskRequest = m_NRTqueue[ dk ].Get(); 02233 02234 #ifdef RIO_DEBUG2 02235 RioErr << "CRouter::StorageDown removendo a requisicao " 02236 << DiskRequest << " de m_NRTqueue. m_nNRT[ " << dk 02237 << " ] = " << m_nNRT[ dk ] << endl; 02238 #endif 02239 02240 m_nNRT[ dk ]--; 02241 IsPending = false; 02242 } 02243 else 02244 EmptyQueues = false; 02245 // Se ainda tivermos uma solicitacao de disco, retornamos o erro 02246 // ao cliente (informando que o servidor de armazanamento do 02247 // disco se tornou indisponivel). 02248 if( ( EmptyQueues ) && ( DiskRequest != NULL ) ) 02249 { 02250 // Obtem a solicitacao do cliente associada a solicitacao do 02251 // disco, a partir do evento armazenado em DiskRequest. 02252 DataRequest& Request = DiskRequest->event->Request; 02253 if( ( IsPending ) || 02254 ( ( Request.Operation != RealTimeRead ) && 02255 ( Request.Operation != RealTimePrefetchBlock ) ) ) 02256 { 02257 // Cancela a requisicao (ou o pedido). 02258 02259 #ifdef RIO_DEBUG2 02260 RioErr << "CRouter::StorageDown removendo uma " 02261 << "solicitacao do disco " << dk << " do " 02262 << "servidor de armazenamento " << StorageId 02263 << endl; 02264 #endif 02265 // Define o codigo de erro para a requisicao. No caso da 02266 // operacao ser um cancelamento e BufferId for igual a 02267 // 1, devemos enviar uma resposta positiva (sem erros) 02268 // ao cliente. Em caso contrario, a resposta deve ser o 02269 // erro indicando que o servidor deixou de funcionar. 02270 if( ( Request.Operation == RealTimeCancelBlock ) && 02271 ( Request.BufferId == -1 ) ) 02272 Request.Status = 0; 02273 else 02274 Request.Status = ERROR_ROUTER + 02275 ERROR_SERVICE_TEMPORARY_UNAVAILABLE; 02276 RequestCompleted( &Request.streamobj, ( Event * ) 02277 DiskRequest->event ); 02278 02279 // Isso e para evitar o envio de termino duplicado. 02280 Request.RequestCancelled = true; 02281 } 02282 else 02283 { 02284 // Reenvia o evento para o Router. 02285 #ifdef RIO_DEBUG2 02286 RioErr << "CRouter::StorageDown Reenviando o evento " 02287 << "com a requisicao para o Router" << endl; 02288 #endif 02289 02290 Put( ( Event *) DiskRequest->event ); 02291 02292 } 02293 // Libera a requisicao do disco. 02294 m_Request.Free( DiskRequest ); 02295 } 02296 else if( ( EmptyQueues ) && ( DiskRequest == NULL ) ) 02297 RioErr << "CRouter::StorageDown aviso DiskRequest == NULL" 02298 << endl; 02299 } 02300 02301 // Gera um evento para a classe de monitoramento, informando que as 02302 // filas do disco ficaram vazias. 02303 m_DiskMgr->GetDiskName( dk, name ); 02304 m_DiskMgr->GetDiskStorageNodeInfo( dk, &sn_info ); 02305 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( 02306 sn_info.Hostname, name, 0 ); 02307 m_SystemManager->PostITEvent( ( MonitorEvent * ) it_event ); 02308 } 02309 } 02310 }
void CRouter::StorageReply | ( | MsgRSSstorage * | Msg | ) | [private] |
Definition at line 1293 of file Router.cpp.
01294 { 01295 StrDiskRequest* Request; 01296 EventDataRequest* event; 01297 DataRequest* DataRequest; 01298 01299 StrDiskRequest* NewRequest; 01300 int Disk; 01301 u16 Type; 01302 u32 Id; 01303 s32 Error; 01304 int ActiveThreads; 01305 // ------------------------------------------------------------------------ 01306 01307 switch( Message->Header.Type ) 01308 { 01309 case MSG_RSS_READCOMPLETE: 01310 case MSG_RSS_WRITECOMPLETE: 01311 case MSG_RSS_RECEIVECOMPLETE: 01312 case MSG_RSS_SENDCOMPLETE: 01313 case MSG_RSS_READY: 01314 case MSG_RSS_CANCELCOMPLETE: 01315 { 01316 Type = Message->Status.Type; 01317 Id = Message->Status.RouterId; 01318 Error = Message->Status.Error; 01319 ActiveThreads = Message->Status.ActiveThreads; 01320 // ---------------------------------------------------------------- 01321 01322 Request = m_Request.Get( Id ); 01323 if( Request == 0 ) 01324 { 01325 RioErr << "Router Error: Message from storage node" 01326 " has invalid Router Id " << Id << endl; 01327 return; 01328 } 01329 01330 // Get data request event associated with disk request 01331 event = ( EventDataRequest* ) Request->event; 01332 DataRequest = & ( event->Request ); 01333 01334 // Status contains the first error detected 01335 // If more errors are detected they are ignored 01336 if( DataRequest->Status == 0 ) 01337 { 01338 DataRequest->Status = Error; 01339 } 01340 01341 #ifdef RIO_DEBUG2 01342 struct in_addr clientip; 01343 clientip.s_addr = DataRequest->Target.IPaddress; 01344 RioErr << "Router:StorageReply => RouterId " << Id 01345 << " StorageId "<< Message->Status.StorageId 01346 << " Status " << DataRequest->Status 01347 << " client IP " << inet_ntoa(clientip) 01348 << " client Port " << ntohs( DataRequest->Target.Port ) 01349 << " block " << DataRequest->Block 01350 << " BufferId " << DataRequest->BufferId 01351 << " Msg->Error " << Error 01352 << " RequestCounter " << DataRequest->RequestCounter 01353 << endl; 01354 #endif 01355 01356 Disk = Request->Disk; 01357 01358 // Verifica se o disco perterce a um servidor de armazenamento que 01359 // acabou de parar de funcionar. 01360 if( !CheckStorageStatus( Disk ) ) 01361 { 01362 RioErr << "Router Error: Message from storage node that is " 01363 "down " << Id << endl; 01364 // Decrementa o contador de solicitacoes, removendo a 01365 // solicitacao do servidor de armazenamento que caiu. 01366 DataRequest->RequestCounter--; 01367 return; 01368 01369 } 01370 01371 if( ( Type == MSG_RSS_READCOMPLETE ) && ( m_CollectMeasures ) ) 01372 { 01373 struct timeval now; 01374 gettimeofday( &now, 0 ); 01375 UpdateMeasures( Disk, ActiveThreads, 01376 Request->event->Request.ArrivalTime, now ); 01377 } 01378 // ---------------------------------------------------------------- 01379 01380 // Remove request from pending queue 01381 m_Pending[Disk].Remove(Request); 01382 01383 // Check if there are queued requests not sent to storage node 01384 // and send one of them 01385 if( (( m_nRT[Disk] + m_nNRT[Disk] ) > m_nPending[Disk] ) && 01386 ( m_nPending[Disk] < m_MaxPending )) 01387 { 01388 // Try to send real time request first 01389 NewRequest = m_RTqueue[Disk].Get(); 01390 // If no RT request available, send a non RT one 01391 // There should be at least one (if condition above) 01392 if( NewRequest == 0 ) 01393 { 01394 RioErr <<" Could not get next request from RT queue" 01395 << endl; 01396 NewRequest = m_NRTqueue[Disk].Get(); 01397 } 01398 m_Pending[Disk].Put( NewRequest ); 01399 SendStorage( NewRequest ); 01400 } 01401 // If no new disk request sent decrement number of pending 01402 // requests 01403 else 01404 { 01405 m_nPending[Disk]--; 01406 } 01407 01408 // Decrement size of queue 01409 if( ( DataRequest->Operation == NonRealTimeRead ) || 01410 ( DataRequest->Operation == NonRealTimeWrite) ) 01411 { 01412 m_nNRT[Disk]--; 01413 } 01414 else 01415 { 01416 if( DataRequest->Operation != RealTimeSendBlock ) 01417 { 01418 m_nRT[Disk]--; 01419 } 01420 } 01421 01422 char name[1025]; 01423 m_DiskMgr->GetDiskName( Disk, name ); 01424 01425 RioStorageNodeInfo sn_info; 01426 m_DiskMgr->GetDiskStorageNodeInfo( Disk, &sn_info ); 01427 01428 int queue_sum = m_nRT[ Disk ] 01429 + m_nNRT[ Disk ] 01430 + m_nPending[ Disk ]; 01431 01432 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( 01433 sn_info.Hostname, name, queue_sum ); 01434 m_SystemManager->PostITEvent( (MonitorEvent *)it_event ); 01435 01436 // Check if this is the last message storage node will send for 01437 // this disk request ( any message with an error code is the last) 01438 // ( success I/O is ended with 01439 // SENDCOMPLETE(read) or WRITECOMPLETE(write) ) 01440 if( ( Type == MSG_RSS_SENDCOMPLETE ) || 01441 ( Type == MSG_RSS_WRITECOMPLETE ) || 01442 ( Type == MSG_RSS_CANCELCOMPLETE )|| 01443 ( Error != 0) ) 01444 { 01445 01446 // if error != 0 and bufferid == -1 so prefetch failed 01447 if( ( DataRequest->BufferId == -1 ) && 01448 ( Type == MSG_RSS_READCOMPLETE ) ) 01449 { 01450 RioErr << "error at prefetch!! Error: "<< Error << endl; 01451 01452 DataRequest->streamobj->Stream()->getMutexUpdateEvent(); 01453 // updates StorageId and BufferId 01454 Request->StorageId = Message->Status.StorageId; 01455 DataRequest->BufferId = Id; 01456 #ifdef RIO_DEBUG2 01457 RioErr << "Router:StorageReply => new BufferId " << Id 01458 << " and StorageId " << Request->StorageId << endl; 01459 #endif 01460 // verifies if the block was canceled 01461 if( DataRequest->Operation == RealTimeCancelBlock ) 01462 { 01463 #ifdef RIO_DEBUG2 01464 RioErr << "Block was canceled." << endl; 01465 #endif 01466 DataCancel( (Event *) event ); 01467 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent(); 01468 } 01469 else 01470 { 01471 // tells a new block arrived 01472 DataRequest->streamobj->Stream()->NewBlockArrival(); 01473 // verifies if the client requested the block 01474 if( ( DataRequest->Target.IPaddress != 0 ) && 01475 ( DataRequest->Target.Port != 0 )) 01476 { 01477 #ifdef RIO_DEBUG2 01478 RioErr << "Block was requested by client (cancel it)." 01479 << endl; 01480 #endif 01481 DataRequest->Operation = RealTimeCancelBlock; 01482 DataCancel( (Event *) event ); 01483 01484 //update tokenstosend 01485 DataRequest->streamobj->Stream()->decTokensToSend(); 01486 //update buffer status 01487 DataRequest->streamobj->Stream()->decServerBufferStatus(); 01488 } 01489 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent(); 01490 } 01491 } 01492 // ------------------------------------------------------------ 01493 else 01494 { 01495 m_Request.Free(Request); 01496 01497 #ifdef RIO_DEBUG2 01498 RioErr << "CRouter::StorageReply requisicao " 01499 << DataRequest->RequestCounter << " recebida!" 01500 << endl; 01501 #endif 01502 01503 DataRequest->RequestCounter--; 01504 // Check if we are done (all disk requests for this data 01505 // request completed) 01506 // Obs: RequestCompleted somente deve ser chamada se a 01507 // requisicao nao foi cancelada (porque um dos discos 01508 // associados a ela pertence a um servidor de armazenamento 01509 // que ficou indisponivel) pois neste caso a funcao ja 01510 // foi chamada. 01511 if( ( DataRequest->RequestCounter == 0 ) && 01512 ( !DataRequest->RequestCancelled ) ) 01513 { 01514 01515 #ifdef RIO_DEBUG2 01516 RioErr << "CRouter::StorageReply operation " 01517 << DataRequest->Operation << " completed " 01518 << "(DataRequest->RequestCounter == 0)" << endl; 01519 #endif 01520 01521 RequestCompleted( &DataRequest->streamobj, 01522 ( Event* ) event ); 01523 } 01524 } 01525 } 01526 // Not last message from storage node for this disk request 01527 else 01528 { 01529 if( DataRequest->BufferId == -1 ) 01530 { 01531 DataRequest->streamobj->Stream()->getMutexUpdateEvent(); 01532 // updates StorageId and BufferId 01533 Request->StorageId = Message->Status.StorageId; 01534 DataRequest->BufferId = Id; 01535 #ifdef RIO_DEBUG2 01536 RioErr << "Router:StorageReply => new BufferId " << Id 01537 << " and StorageId " << Request->StorageId << endl; 01538 #endif 01539 01540 // verifies if the block was canceled 01541 if( DataRequest->Operation == RealTimeCancelBlock ) 01542 { 01543 #ifdef RIO_DEBUG2 01544 RioErr << "Block was canceled." << endl; 01545 #endif 01546 01547 DataCancel( (Event *) event ); 01548 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent(); 01549 } 01550 else 01551 { 01552 // tells a new block arrived 01553 DataRequest->streamobj->Stream()->NewBlockArrival(); 01554 01555 // verifies if the client requested the block 01556 if( ( DataRequest->Target.IPaddress != 0 )&& 01557 ( DataRequest->Target.Port != 0)) 01558 { 01559 #ifdef RIO_DEBUG2 01560 RioErr << "Block was requested by client." << endl; 01561 #endif 01562 DataRequest->Operation = RealTimeSendBlock; 01563 DataSendToClient( (Event *) event ); 01564 } 01565 DataRequest->streamobj->Stream()->releaseMutexUpdateEvent(); 01566 } 01567 } 01568 // ------------------------------------------------------------ 01569 } 01570 break; 01571 } 01572 default: 01573 RioErr << "Router ERROR: Received invalid message from storage node." 01574 << endl; 01575 } 01576 }
void CRouter::StorageUp | ( | int | StorageId | ) | [private] |
Funcao para definir que um servidor de armazenamento voltou a estar disponivel.
ServerId | identificador do servidor de armazenamento. |
Definition at line 2314 of file Router.cpp.
02315 { 02316 // Obtem acesso exclusivo a variavel m_StoragesStatus. 02317 pthread_mutex_lock( &m_StoragesStatusMutex ); 02318 // Invalida todos os pares IP, porta dos clientes conectados associados a 02319 // este servidor de armazenamento. Precisamos somar 1 ao identificador 02320 // porque o identificador 0 e usado pelo mapeamento do servidor de 02321 // gerenciamento. 02322 m_SystemManager->InvalidateStorageNatMappings( StorageId + 1 ); 02323 // Define o servidor StorageId como ativo. 02324 m_StoragesStatus = m_StoragesStatus & ( ~( 1ull << StorageId ) ); 02325 // Libera o acesso exclusivo a variavel m_StoragesStatus. 02326 pthread_mutex_unlock( &m_StoragesStatusMutex ); 02327 }
void CRouter::UpdateDiskQueueTime | ( | double | EstQueueTimeOfDisk[100], | |
double | EstQueueTime[100][301] | |||
) |
Definition at line 2072 of file Router.cpp.
02074 { 02075 int num = GetMaxNumberOfDisks(); 02076 02077 for( int i = 1; i < num; i++ ) 02078 { 02079 for(int j = 0; j < 301; j++) 02080 { 02081 EstimatedQueueTime[i][j] = m_EstDiskQueueTime[i][j]; 02082 } 02083 EstimatedQueueTimeOfDisk[i] = m_EstDiskQueueTimeOfDisk[i]; 02084 } 02085 02086 return; 02087 }
void CRouter::UpdateDiskResponseTime | ( | double | EstResponseTimeOfDisk[100], | |
double | EstResponseTime[100][301], | |||
double | DevDiskResponseTimeOfDisk[100], | |||
double | DevDiskResponseTime[100][301] | |||
) |
Definition at line 2044 of file Router.cpp.
02048 { 02049 int num = GetMaxNumberOfDisks(); 02050 02051 for( int i = 1; i < num; i++ ) 02052 { 02053 for(int j = 0; j < 301; j++) 02054 { 02055 if( m_EstDiskServiceTime[i][j] > m_EstDiskResponseTime[i][j] ) 02056 // server was initialized now 02057 m_EstDiskResponseTime[i][j] = m_EstDiskServiceTime[i][j]; 02058 02059 EstimatedResponseTime[i][j] = m_EstDiskResponseTime[i][j]; 02060 DevResponseTime[i][j] = m_DevDiskResponseTime[i][j]; 02061 } 02062 EstimatedResponseTimeOfDisk[i] = m_EstDiskResponseTimeOfDisk[i]; 02063 DevResponseTimeOfDisk[i] = m_DevDiskResponseTimeOfDisk[i]; 02064 } 02065 02066 return; 02067 }
void CRouter::UpdateDiskServiceTime | ( | double | EstServiceTimeOfDisk[100], | |
double | EstServiceTime[100][301] | |||
) |
Definition at line 2016 of file Router.cpp.
02018 { 02019 int num = GetMaxNumberOfDisks(); 02020 int numd = GetNumberOfActiveDisks(); 02021 02022 //RioErr << "Router:UpdateDiskServiceTime " << numd << " of " << num <<endl; 02023 02024 while( m_UpdatedEstimatedDiskServiceTime < numd ) 02025 { 02026 pthread_mutex_lock( &m_MutexUpdated ); 02027 pthread_cond_wait( &m_ConditionUpdated, &m_MutexUpdated ); 02028 pthread_mutex_unlock( &m_MutexUpdated ); 02029 } 02030 02031 for( int i = 1; i < num; i++ ) 02032 { 02033 for(int j = 0; j < 301; j++) 02034 { 02035 EstimatedServiceTime[i][j] = m_EstDiskServiceTime[i][j]; 02036 } 02037 EstimatedServiceTimeOfDisk[i] = m_EstDiskServiceTimeOfDisk[i]; 02038 } 02039 02040 m_UpdatedEstimatedDiskServiceTime = 0; 02041 return; 02042 }
void CRouter::UpdateMeasures | ( | int | disk, | |
int | nThreads, | |||
struct timeval | initial_time, | |||
struct timeval | final_time | |||
) |
Definition at line 1233 of file Router.cpp.
01237 { 01238 double sample_msec = getInterval( initial_time, final_time ); 01239 01240 if( nThreads > m_MaxPending ) 01241 { 01242 nThreads = m_MaxPending; 01243 } 01244 else if( nThreads < 1 ) 01245 { 01246 RioErr << " ActiveThreads " << nThreads << endl; 01247 nThreads = 1; 01248 } 01249 01250 if( m_EstDiskResponseTime[ disk ][ nThreads ] == 0 ) 01251 { 01252 m_EstDiskResponseTime[ disk ][ nThreads ] = sample_msec; 01253 } 01254 else 01255 { 01256 m_EstDiskResponseTime[ disk ][ nThreads ] = 01257 (( 1 - m_EstimatedParameter) * 01258 m_EstDiskResponseTime[ disk ][ nThreads ] ) + 01259 m_EstimatedParameter * sample_msec; 01260 } 01261 01262 //updates Dev of estimated response time of disk 01263 m_DevDiskResponseTime[ disk ][ nThreads ] = ( 1 - m_EstimatedParameter ) * 01264 m_DevDiskResponseTime[ disk ][ nThreads ] + 01265 m_EstimatedParameter * 01266 fabs( sample_msec - m_EstDiskResponseTime[ disk ][ nThreads ] ); 01267 01268 if( m_EstDiskResponseTimeOfDisk[ disk ] == 0 ) 01269 { 01270 m_EstDiskResponseTimeOfDisk[ disk ] = sample_msec; 01271 } 01272 else 01273 { 01274 m_EstDiskResponseTimeOfDisk[ disk ] = ( 1 - m_EstimatedParameter) * 01275 m_EstDiskResponseTimeOfDisk[ disk ] + 01276 m_EstimatedParameter * sample_msec; 01277 } 01278 01279 m_DevDiskResponseTimeOfDisk[ disk ] = ( 1 - m_EstimatedParameter ) * 01280 m_DevDiskResponseTimeOfDisk[ disk ] + 01281 m_EstimatedParameter * 01282 fabs( sample_msec - m_EstDiskResponseTimeOfDisk[ disk ]); 01283 01284 #ifdef __QUEUE_RESP_LOG 01285 m_logRESP << " disk " << disk 01286 << " sample " << sample_msec 01287 << " est " << m_EstDiskResponseTimeOfDisk[ disk ] 01288 << endl; 01289 #endif 01290 }
int CRouter::m_BlockSize [private] |
bool CRouter::m_CollectMeasures [private] |
pthread_cond_t CRouter::m_ConditionUpdated [private] |
double CRouter::m_DevDiskResponseTime[100][301] [private] |
double CRouter::m_DevDiskResponseTimeOfDisk[100] [private] |
DiskMgr* CRouter::m_DiskMgr [private] |
double CRouter::m_EstDiskQueueTime[100][301] [private] |
double CRouter::m_EstDiskQueueTimeOfDisk[100] [private] |
double CRouter::m_EstDiskResponseTime[100][301] [private] |
double CRouter::m_EstDiskResponseTimeOfDisk[100] [private] |
double CRouter::m_EstDiskServiceTime[100][301] [private] |
double CRouter::m_EstDiskServiceTimeOfDisk[100] [private] |
double CRouter::m_EstimatedParameter [private] |
bool CRouter::m_initialized [private] |
ofstream CRouter::m_logRESP [private] |
ofstream CRouter::m_logSQT [private] |
int CRouter::m_MaxPending [private] |
int CRouter::m_MaxQueueSize [private] |
pthread_mutex_t CRouter::m_MutexUpdated [private] |
int CRouter::m_nDisks [private] |
int* CRouter::m_nNRT [private] |
int* CRouter::m_nPending [private] |
int* CRouter::m_nRT [private] |
CDiskRequestQueue* CRouter::m_NRTqueue [private] |
CDiskRequestQueue* CRouter::m_Pending [private] |
EventQueue CRouter::m_Queue [private] |
CDiskRequestManager CRouter::m_Request [private] |
CDiskRequestQueue* CRouter::m_RTqueue [private] |
bool CRouter::m_started [private] |
unsigned long long int CRouter::m_StoragesStatus [private] |
pthread_mutex_t CRouter::m_StoragesStatusMutex [private] |
CSystemManager* CRouter::m_SystemManager [private] |
StrDiskRequest** CRouter::m_TempDiskRequest [private] |
pthread_t CRouter::m_thread [private] |
int CRouter::m_UpdatedEstimatedDiskServiceTime [private] |