CRouter Class Reference

#include <Router.h>

Public Member Functions

 CRouter ()
 ~CRouter ()
int Initialize (RouterConfig *Config)
int Start ()
int Stop ()
void Put (Event *ep)
int GetMaxNumberOfDisks ()
int GetNumberOfActiveDisks ()
int GetDiskServiceTime (EventStorageRequest *event, u16 DiskId)
void SetDiskServiceTime (int Disk, double EstServiceTimeOfDisk, double EstServiceTime[301])
void UpdateDiskServiceTime (double EstServiceTimeOfDisk[100], double EstServiceTime[100][301])
void UpdateDiskResponseTime (double EstResponseTimeOfDisk[100], double EstResponseTime[100][301], double DevDiskResponseTimeOfDisk[100], double DevDiskResponseTime[100][301])
void UpdateDiskQueueTime (double EstQueueTimeOfDisk[100], double EstQueueTime[100][301])
void UpdateMeasures (int disk, int nThreads, struct timeval initial_time, struct timeval final_time)
long double GetEstimatedQueueTimeOfAllDisks ()
long double GetEstimatedDiskQueueTime (int disk)
void SetNATMapping (NATData server_map, int stor_id, NATData stor_map)
NATData GetNATMapping (NATData server_map, int stor_id)
 Funcao para obter um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho.
void RemoveNATMapping (NATData server_map, int stor_id)
 Funcao para remover um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho.
bool FindNATMapping (NATData server_map, int stor_id)
 Funcao para verificar se um mapeamento esta presente no mapa com os mapeamentos.
unsigned long long int GetInvalidStorages (NATData ClientAddr)
 Deternima o vetor de bits com os servidores de armazenamento que nao podem ser usados pelo cliente com endereco passado como parametro, isto e, que estao inativos ou que foram invalidados (apos se tornarem novamente ativos).
void GetStorageInfo (NATData ClientAddr, bool *CanRead, bool *CanWrite)
 Deternima se podemos ler ou escrever blocos, no nomento, para um dado cliente cujo endereco e passado como parametro.
int GetNumberOfStorageNodes (unsigned int *NumberOfStorageNodes)
 Funcao para obter o numero de storage nodes.
void RequestCompleted (RioStreamObj **StreamObjAddr, Event *event)
 Funcao para completar uma requisicao (chamando a RequestCompleted do objeto RioStreamObj passado como parametro) removendo, quando necessario, o objeto do stream e alterando o ponteiro passado como parametro para NULL.

Data Fields

NATMap m_NAT_map

Private Member Functions

void StorageReply (MsgRSSstorage *Msg)
void AddDisk (int Disk)
void SendStorage (StrDiskRequest *Request)
void CleanUp ()
EventGet ()
void routerthread ()
void DataReq (Event *ep)
void DataPrefetch (Event *event)
void DataSendToClient (Event *event)
void DataCancel (Event *event)
void PrintQueues ()
void StorageDown (int StorageId, bool EmptyStorageQueue)
 Funcao para definir que um servidor de armazenamento esta temporariamente indisponivel.
void StorageUp (int StorageId)
 Funcao para definir que um servidor de armazenamento voltou a estar disponivel.
bool CheckStorageStatus (int DiskId)
 Funcao usada para verificar se o servidor de armazenamento com uma dada ID esta ou nao parado.

Static Private Member Functions

static void * routerthreadep (void *param)

Private Attributes

double m_EstDiskServiceTime [100][301]
double m_EstDiskServiceTimeOfDisk [100]
double m_EstDiskResponseTime [100][301]
double m_EstDiskResponseTimeOfDisk [100]
double m_DevDiskResponseTime [100][301]
double m_DevDiskResponseTimeOfDisk [100]
double m_EstDiskQueueTime [100][301]
double m_EstDiskQueueTimeOfDisk [100]
int m_UpdatedEstimatedDiskServiceTime
bool m_CollectMeasures
double m_EstimatedParameter
pthread_mutex_t m_MutexUpdated
pthread_cond_t m_ConditionUpdated
ofstream m_logSQT
ofstream m_logRESP
pthread_t m_thread
bool m_initialized
bool m_started
DiskMgrm_DiskMgr
CSystemManagerm_SystemManager
EventQueue m_Queue
int m_nDisks
int m_BlockSize
int m_MaxPending
int m_MaxQueueSize
CDiskRequestQueuem_RTqueue
CDiskRequestQueuem_NRTqueue
CDiskRequestQueuem_Pending
int * m_nRT
int * m_nNRT
int * m_nPending
CDiskRequestManager m_Request
StrDiskRequest ** m_TempDiskRequest
unsigned long long int m_StoragesStatus
pthread_mutex_t m_StoragesStatusMutex

Detailed Description

Definition at line 55 of file Router.h.


Constructor & Destructor Documentation

CRouter::CRouter (  ) 

Definition at line 68 of file Router.cpp.

00069 {
00070     m_UpdatedEstimatedDiskServiceTime = 0;
00071     pthread_mutex_init( &m_MutexUpdated,     NULL );
00072     pthread_cond_init(  &m_ConditionUpdated, NULL );
00073     // m_logSQT  does not need to be initialized
00074     // m_logRESP does not need to be initialized
00075     // m_Queue   does not need to be initialized
00076     m_thread          = 0;
00077 
00078     m_initialized     = false;
00079     m_started         = false;
00080     m_DiskMgr         = NULL;
00081     m_SystemManager   = NULL;
00082     m_nDisks          = 0;
00083     m_BlockSize       = 0;
00084     m_MaxPending      = 0;
00085     m_MaxQueueSize    = 0;
00086 
00087     m_RTqueue         = NULL;
00088     m_NRTqueue        = NULL;
00089     m_Pending         = NULL;
00090     m_nRT             = NULL;
00091     m_nNRT            = NULL;
00092     m_nPending        = NULL;
00093 
00094     // m_Request does not need to be initialized
00095     m_TempDiskRequest = NULL;
00096 
00097     m_EstimatedParameter = 0;
00098     for( int i = 0; i < 100; i++ )
00099     {
00100         for( int j = 0; j < 301; j++ )
00101         {
00102            // according number of active threads
00103            m_EstDiskServiceTime [ i ][ j ] = 0;
00104            m_EstDiskResponseTime[ i ][ j ] = 0;
00105            m_DevDiskResponseTime[ i ][ j ] = 0;
00106 
00107            // according queue size
00108            m_EstDiskQueueTime[ i ][ j ] = 0;
00109         }
00110         // for each disk - using all requests
00111         m_EstDiskQueueTimeOfDisk   [ i ] = 0;
00112         m_EstDiskServiceTimeOfDisk [ i ] = 0;
00113         m_EstDiskResponseTimeOfDisk[ i ] = 0;
00114         m_DevDiskResponseTimeOfDisk[ i ] = 0;
00115     }
00116     m_CollectMeasures = false;
00117     // ------------------------------------------------------------------------
00118     
00119     // Inicializa a variavel que define os servidores de armazenamento inativos
00120     // com 0 (pois inicialmente todos os servidores estarao ativos apos as
00121     // conexoes forem estabelecidas).
00122     m_StoragesStatus = 0;
00123     // Inicializa o mutex para garantir o acesso exclusivo a variavel 
00124     // m_StoragesStatus.
00125     pthread_mutex_init( &m_StoragesStatusMutex, NULL );
00126 }

CRouter::~CRouter (  ) 

Definition at line 129 of file Router.cpp.

00130 {
00131     CleanUp();
00132 
00133     // Remove o mutex para garantir o acesso exclusivo a variavel 
00134     // m_StoragesStatus.
00135     pthread_mutex_destroy( &m_StoragesStatusMutex );
00136 }


Member Function Documentation

void CRouter::AddDisk ( int  Disk  )  [private]

Definition at line 1579 of file Router.cpp.

01580 {
01581     RioErr << " Router: Adding disk: " << Disk << endl;
01582     if( ( Disk < 0 ) || ( Disk >= m_nDisks ) )
01583     {
01584         RioErr << "Router ERROR: Tried to add invalid disk " << Disk << endl;
01585         return;
01586     }
01587 
01588     if( m_nRT[Disk] != ( m_MaxQueueSize + 1 ) )
01589     {
01590         RioErr << "Router ERROR: Tried to add disk already active "
01591                << Disk << endl;
01592         return;
01593     }
01594 
01595     char name[1025];
01596     m_DiskMgr->GetDiskName( Disk, name );
01597 
01598     RioStorageNodeInfo sn_info;
01599     m_DiskMgr->GetDiskStorageNodeInfo( Disk, &sn_info );
01600 
01601     AddDiskEvent *it_event = new AddDiskEvent( sn_info.Hostname, name );
01602     m_SystemManager->PostITEvent( (MonitorEvent *) it_event );
01603 
01604     // Reset disk queue size (now active)
01605     m_nRT[Disk] = 0;
01606 }

bool CRouter::CheckStorageStatus ( int  DiskId  )  [private]

Funcao usada para verificar se o servidor de armazenamento com uma dada ID esta ou nao parado.

Parameters:
DiskId identificador do disco do servidor de armazenamento.
Returns:
true se o servidor esta funcionando ou false se o servidor parou de funcionar.

Definition at line 2333 of file Router.cpp.

02334 {
02335     bool Status;
02336     int StorageId;
02337     // Verifica se a DiskId e valida
02338     if( DiskId >= 0 )
02339     {
02340         // Descobre o identificador do servidor de armazenamento.
02341         StorageId = ( DiskId - 1 ) / SNode::sn_maxdisks;
02342         // Obtem acesso exclusivo a variavel m_StoragesStatus. 
02343         pthread_mutex_lock( &m_StoragesStatusMutex );
02344         // Verifica se o servidor esta ativo (se for o caso, o seu bit deve ser
02345         // igual a 0).
02346         Status = ( ( m_StoragesStatus & ( 1ull << StorageId ) ) == 0 );
02347         // Libera o acesso exclusivo a variavel m_StoragesStatus. 
02348         pthread_mutex_unlock( &m_StoragesStatusMutex );
02349     }
02350     else
02351         Status = false;
02352     return Status;
02353 }

void CRouter::CleanUp (  )  [private]

Definition at line 139 of file Router.cpp.

00140 {
00141     m_initialized     = false;
00142     m_started         = false;
00143 
00144     m_DiskMgr      = 0;
00145     m_nDisks       = 0;
00146     m_BlockSize    = 0;
00147     m_MaxPending   = 0;
00148     m_MaxQueueSize = 0;
00149 
00150     //Pra tese: Verificar estes if's abaixo. Estao muito estranhos!!!
00151     //Nao deveriam ser do tipo if( x != 0 ) em vez de if(x == 0)?!?!?
00152     //Obs: Corrigido, pois existiam erros de memoria.
00153     if( m_RTqueue != 0 )
00154     {
00155         delete[] m_RTqueue;
00156         m_RTqueue = 0;
00157     }
00158 
00159     if( m_NRTqueue != 0 )
00160     {
00161         delete[] m_NRTqueue;
00162         m_NRTqueue = 0;
00163     }
00164 
00165     if( m_Pending != 0 )
00166     {
00167         delete[] m_Pending;
00168         m_Pending = 0;
00169     }
00170 
00171     if( m_nRT != 0 )
00172     {
00173         delete[] m_nRT;
00174         m_nRT = 0;
00175     }
00176 
00177     if( m_nNRT != 0 )
00178     {
00179         delete[] m_nNRT;
00180         m_nNRT = 0;
00181     }
00182 
00183     if( m_nPending != 0 )
00184     {
00185         delete[] m_nPending;
00186         m_nPending = 0;
00187     }
00188 
00189     if( m_TempDiskRequest != 0 )
00190     {
00191         delete[] m_TempDiskRequest;
00192         m_TempDiskRequest = 0;
00193     }
00194 }

void CRouter::DataCancel ( Event event  )  [private]

Definition at line 739 of file Router.cpp.

00740 {
00741     DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00742 
00743     #ifdef RIO_DEBUG2
00744     RioErr << "Router:DataCancel => block "<<  Request.Block
00745            << " BufferId " << Request.BufferId << endl;
00746     #endif
00747 
00748     if( Request.RequestCounter == -1 )
00749     {
00750         #ifdef RIO_DEBUG2
00751         RioErr << "Router:DataCancel => did not request this block "
00752                << "yet, so just free it" << endl;
00753         #endif
00754 
00755         //the request was not submit to storage yet, so just free request
00756         Request.Status = 0;
00757         RequestCompleted( &Request.streamobj, event );
00758     }
00759     else
00760     {
00761         StrDiskRequest* m_TempDiskRequest = m_Request.Get( Request.BufferId );
00762 
00763         if( m_TempDiskRequest )
00764         {
00765             if( ( m_TempDiskRequest->StorageId != 0 ) &&
00766                 ( CheckStorageStatus( m_TempDiskRequest->Disk ) ) )  
00767             {
00768                 if( m_CollectMeasures )
00769                 {
00770                     // update queue size to get estimated queue time
00771                     Request.QueueSize = m_nRT[m_TempDiskRequest->Disk];
00772                 }
00773 
00774                 if( m_nPending[m_TempDiskRequest->Disk] < m_MaxPending )
00775                 {
00776                     m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00777                     // Update disk queue sizes
00778                     m_nPending[m_TempDiskRequest->Disk]++;
00779                     m_nRT[m_TempDiskRequest->Disk]++;
00780                     SendStorage(m_TempDiskRequest);
00781                 }
00782                 // otherwise put request on disk queue
00783                 else
00784                 {
00785                     m_RTqueue[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00786                     m_nRT[m_TempDiskRequest->Disk]++;
00787                 }
00788                 char name[1025];
00789                 m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name );
00790 
00791                 RioStorageNodeInfo sn_info;
00792                 m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info );
00793 
00794                 int queue_sum = m_nRT[ m_TempDiskRequest->Disk ]
00795                               + m_nNRT[ m_TempDiskRequest->Disk ]
00796                               + m_nPending[ m_TempDiskRequest->Disk ];
00797 
00798                 UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
00799                     sn_info.Hostname, name, queue_sum );
00800                 m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
00801             }
00802             else
00803             {
00804                 RioErr << " Router:Datacancel => StorageId == 0 "
00805                        << "(RequestCompleted) or storage is down" << endl;
00806                 m_Request.Free(m_TempDiskRequest);
00807                 RequestCompleted( &Request.streamobj, event );
00808             }
00809         }
00810         else if ( m_TempDiskRequest->StorageId == 0 )
00811         {
00812             RioErr << "ERROR: Router:Datacancel => Could not find resquest to cancel"
00813                    << endl;
00814         } 
00815         else // Novo erro indicando que o servidor de armazenamento com o disco
00816              // esta temporariamente indisponivel.
00817         {
00818             RioErr << "ERROR: Router:Datacancel => Could not send message to "
00819                    << "storage to cancel the resquest" << endl;       
00820         }     
00821     }
00822 }

void CRouter::DataPrefetch ( Event event  )  [private]

Definition at line 487 of file Router.cpp.

00488 {
00489     NATData ClientAddr, ServerAddr;
00490     int storageid;
00491     //
00492     // TODO: Remover a variavel seguir quando a RioNeti e a NetMgr nao forem 
00493     // mais usadas.
00494     //
00495     unsigned totalStorages;
00496 
00497     DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00498     StrDiskRequest* m_TempDiskRequest = m_Request.New();
00499     
00500     // Obtem o par IP, porta do cliente
00501     ClientAddr.nat_addr = Request.Target.IPaddress;
00502     ClientAddr.nat_port = Request.Target.Port;
00503 
00504     if( m_TempDiskRequest == 0 )
00505     {
00506         RioErr << "Router ERROR: Too many disk requests (UNEXPECTED)" << endl;
00507         Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00508         RequestCompleted( &Request.streamobj, event );
00509         return;
00510     }
00511     
00512     if( !CheckStorageStatus( m_TempDiskRequest->Disk ) )
00513     {
00514         RioErr << "Router ERROR (DataPrefetch): Cannot access disk: storage " 
00515                << "is down" << endl;
00516         Request.Status = ERROR_ROUTER + ERROR_SERVICE_TEMPORARY_UNAVAILABLE;
00517         RequestCompleted( &Request.streamobj, event );
00518         return;
00519     }
00520     
00521     // Descobre o numero de storages.
00522     //
00523     // TODO: Remover o comando seguir quando a RioNeti e a NetMgr nao forem mais
00524     // usadas.
00525     //
00526     m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
00527     
00528     // Descobre a id do storage.
00529     storageid = ( m_TempDiskRequest->Disk - 1 ) / SNode::sn_maxdisks + 1;
00530     // Ajusta a id para as copias em tempo nao real, caso existam pares 
00531     // IP, porta associados a nova implementacao de transmissao de dados.
00532     // (estes pares comecarao a partir da posicao do ultimo storage no vetor
00533     // mais 1, e o numero de pares sera o numero de storages.
00534     //
00535     // TODO: Remover todo o codigo dentro do if a seguir, incluindo o 
00536     // comando if, quando a RioNeti e a NetMgr nao forem mais usadas.
00537     //
00538     if( ( ( Request.Operation == NonRealTimeRead ) ||
00539           ( Request.Operation == NonRealTimeWrite ) ) && 
00540         ( FindNATMapping( ClientAddr, storageid + totalStorages + 1 ) ) )
00541     {    
00542          #ifdef RIO_DEBUG2
00543          RioErr << "Router:DataPrefetch usando o indice " 
00544                 << storageid + totalStorages + 1 << " ao inves do indice "
00545                 << storageid << "." << endl;
00546          #endif
00547          
00548          storageid = storageid + totalStorages + 1;
00549     }
00550     #ifdef RIO_DEBUG2
00551     else
00552         RioErr << "Router:DataPrefetch o novo indice " 
00553                << storageid + totalStorages + 1 << " nao foi usado. "
00554                << "Request.Operation = " << Request.Operation << "." << endl;
00555     #endif
00556     
00557     
00558     // Verifica se o disco deste servidor de armazenamento pode ser usado pelo
00559     // cliente.
00560     ServerAddr = GetNATMapping( ClientAddr, storageid );
00561     if( ( ServerAddr.nat_addr == 0 ) && ( ServerAddr.nat_port == 0 ) )
00562     {
00563         RioErr << "Router ERROR (DataPrefetch): Cannot access disk: storage " 
00564                << "is disabled for this client" << endl;
00565         Request.Status = ERROR_ROUTER + ERROR_SERVICE_TEMPORARY_UNAVAILABLE;
00566         RequestCompleted( &Request.streamobj, event );
00567         return;
00568     }
00569     
00570 
00571     #ifdef RIO_DEBUG2
00572     RioErr << "Router:DataPrefetch => block "  <<  Request.Block
00573            << " disk " << Request.Reps[0].disk << endl;
00574     #endif
00575 
00576     m_TempDiskRequest->Disk  = Request.Reps[0].disk;
00577     m_TempDiskRequest->Pos   = (((u64) Request.Reps[0].block ) * m_BlockSize );
00578     m_TempDiskRequest->StorageId = 0xffffffff;  // Invalid storage id
00579     m_TempDiskRequest->Size  = m_BlockSize;
00580     m_TempDiskRequest->event = ( EventDataRequest* ) event;
00581 
00582     if( m_CollectMeasures )
00583     {
00584         // update queue size to get estimated queue time
00585         Request.QueueSize = m_nRT[m_TempDiskRequest->Disk];
00586     }
00587     if( m_nPending[m_TempDiskRequest->Disk] < m_MaxPending )
00588     {
00589         m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00590         // Update disk queue sizes
00591         m_nPending[m_TempDiskRequest->Disk]++;
00592         m_nRT[m_TempDiskRequest->Disk]++;
00593         SendStorage(m_TempDiskRequest);
00594     }
00595     // otherwise put request on disk queue
00596     else
00597     {
00598         m_RTqueue[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00599         m_nRT[m_TempDiskRequest->Disk]++;
00600     }
00601 
00602     char name[1025];
00603     m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name );
00604 
00605     RioStorageNodeInfo sn_info;
00606     m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info );
00607 
00608     int queue_sum = m_nRT[ m_TempDiskRequest->Disk ]
00609                   + m_nNRT[ m_TempDiskRequest->Disk ]
00610                   + m_nPending[ m_TempDiskRequest->Disk ];
00611 
00612     UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( sn_info.Hostname, name,
00613                                                             queue_sum );
00614     m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
00615 }

void CRouter::DataReq ( Event ep  )  [private]

Definition at line 825 of file Router.cpp.

00826 {
00827     int  i, j, rc;
00828 
00829     // Get Request from RequestEle
00830     DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00831 
00832     #ifdef RIO_DEBUG2
00833     struct in_addr clientip;
00834     clientip.s_addr = Request.Target.IPaddress;
00835     RioErr << "No DataReq do Router: "
00836            << "IP " << inet_ntoa(clientip)
00837            << " port "  << ntohs( Request.Target.Port )
00838            << " reqid " << Request.Reqid
00839            << " block " << Request.Block
00840            << endl;
00841     #endif
00842 
00843     Request.streamobj->Stream()->getMutexUpdateEvent();
00844 
00845     if( Request.Operation == RealTimePrefetchBlock )
00846     {
00847         DataPrefetch( event );
00848         Request.streamobj->Stream()->releaseMutexUpdateEvent();
00849     }
00850     else if( Request.Operation == RealTimeCancelBlock )
00851     {
00852         DataCancel( event );
00853         Request.streamobj->Stream()->releaseMutexUpdateEvent();
00854     }
00855     else if( Request.Operation == RealTimeSendBlock )
00856     {
00857         DataSendToClient( event );
00858         Request.streamobj->Stream()->releaseMutexUpdateEvent();
00859     }
00860     // ------------------------------------------------------------------------
00861     else
00862     {
00863         Request.streamobj->Stream()->releaseMutexUpdateEvent();
00864         ///////////////////////////////////////
00865         //RioErr << "Router:DataReq => RealTimeRead or NonRealTime[Read|Write]"
00866         //       << endl;
00867 
00868         // ### before request was queued to Router should have
00869         // increased pending count in streamobj so that won't be deleted
00870         // while I/O running?
00871 
00872         // -- map request to absolute blocks
00873         // ### this could be a lot cleaner!!
00874         //  shouldn't know about internals of RioDiskBlock...
00875 
00876         rc = Request.rioobject->MapBlock( Request.Block, Request.RepBits,
00877                                           &Request.RepNum, Request.Reps );
00878 
00879         if( rc )
00880         {
00881             // failed in mapping logical block to physical block
00882             //  must send error status to client
00883 
00884             RioErr << "Router ERROR: Failed mapping logical block." << endl;
00885 
00886             Request.Status = ERROR_ROUTER + ERROR_INVALID_BLOCK;
00887             RequestCompleted( &Request.streamobj, event );
00888             return;
00889         }
00890 
00891         // Compute number of disk requests for this data request
00892         int nDiskRequests;
00893         if( Request.Operation == NonRealTimeWrite )
00894         {
00895             // For write, need to send disk request to all replicas
00896             nDiskRequests = Request.RepNum;
00897         }
00898         else if( ( Request.Operation == RealTimeRead ) ||
00899                 ( Request.Operation == NonRealTimeRead ))
00900         {
00901             // For read send disk request to just one replica
00902             nDiskRequests = 1;
00903         }
00904         else
00905         {
00906             RioErr << "Router ERROR: Got data request with invalid operation: "
00907                    << (int) Request.Operation << endl;
00908             Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00909             RequestCompleted( &Request.streamobj, event );
00910             return;
00911         }
00912 
00913         if( nDiskRequests > MaxReplications )
00914         {
00915             RioErr << "Router ERROR: Got data request with too many replicas: "
00916                    << nDiskRequests << endl;
00917             Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00918             RequestCompleted( &Request.streamobj, event );
00919             return;
00920         }
00921        
00922         // allocate disk requests
00923         for( i = 0; i < nDiskRequests; i++ )
00924         {
00925             m_TempDiskRequest[i] = m_Request.New();
00926 
00927             // This should not happen if number of data requests does
00928             // not exceed maximum.
00929             if( m_TempDiskRequest[i] == 0 )
00930             {
00931                 // If there is no available disk request, free previous
00932                 // allocated disk requests and send error status to client
00933                 for( j = 0; j < i; j++ )
00934                 {
00935                     m_Request.Free( m_TempDiskRequest[j] );
00936                 }
00937                 RioErr << "Router ERROR: Too many disk requests (UNEXPECTED)"
00938                        << endl;
00939                 Request.Status = ERROR_ROUTER + ERROR_UNEXPECTED;
00940                 RequestCompleted( &Request.streamobj, event );
00941                 return;
00942             } 
00943         }
00944 
00945         // Fill disk request fields: Disk and Pos
00946         if( ( Request.Operation == RealTimeRead ) ||
00947             ( Request.Operation == NonRealTimeRead ))
00948         {
00949             // find shortest disk queue
00950             int Min  = m_MaxQueueSize + 1;
00951             int Rep  = -1;
00952             int nrep = Request.RepNum;
00953             int dk;
00954             int nQueue;
00955             int storageid;
00956             // TODO: Esta variavel devera ser removida quando a NetMgr e a 
00957             // RioNeti forem definitivamente removidas do codigo.
00958             unsigned int totalStorages;
00959             // Obtem o endereco do servidor de despacho usado como base do 
00960             // mapeamento
00961             NATData input( Request.Target.IPaddress, Request.Target.Port );
00962             // TODO: Inicializa totalStorages com o numero de storages. Este
00963             // codigo devera ser removido quando a NetMgr e a RioNeti forem 
00964             // definitivamente removidas do codigo.
00965             m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
00966             
00967             for( i = 0; i < nrep ; i++ )
00968             {
00969                 // Garante que somente os discos do servidor de armazenamento
00970                 // ativo sejam utilizados.
00971                 dk = Request.Reps[i].disk;
00972                 if( CheckStorageStatus( dk ) )
00973                 {
00974                     // Descobre a ID do storage a partir do disco acessado.
00975                     storageid = ( dk - 1 ) / ( SNode::sn_maxdisks ) + 1;
00976                     // Se o cliente estiver atrás de NAT, o par (IP, porta) que 
00977                     // o servidor informar ao storage são diferentes, variam 
00978                     // para cada storage.
00979                     // Para descobrir qual par deve ser usado, deve-se consultar 
00980                     // o heap m_NAT_map, que sera usado como o par IP, porta a 
00981                     // ser enviado ao servidor de armazenamento
00982                     // Obs: Note que agora sempre deveremos ter o par IP, porta 
00983                     // do cliente no mapa, porque agora sempre enviamos o 
00984                     // mapeamento para o servidor, independentemente de o 
00985                     // cliente estar ou nao atras de NAT.
00986                     //
00987                     // TODO: Precisamos verificar agora se estamos usando a nova
00988                     // ou a implementacao antiga. Todo o codigo dentro do if e 
00989                     // do seu else devem ser removidos quando a NetMgr e a 
00990                     // RioNeti forem definitivamente retiradas do codigo.
00991                     if( ( Request.Operation == NonRealTimeRead )  && 
00992                         ( FindNATMapping( input, storageid + 
00993                                           totalStorages + 1 ) ) )
00994                     {
00995                         #ifdef RIO_DEBUG2
00996                         RioErr << "CRouter::DataReq usando o indice " 
00997                                << storageid + totalStorages + 1 << " ao inves "
00998                                << "do indice " << storageid << "." << endl;
00999                         #endif
01000                 
01001                         storageid = storageid + totalStorages + 1;
01002                     }
01003                     #ifdef RIO_DEBUG2
01004                     else
01005                         RioErr << "Router::DataReq o novo indice " 
01006                                << storageid + totalStorages + 1 << " nao foi "
01007                                << "usado. Request.Operation = " 
01008                                << Request.Operation << "." << endl;
01009                     #endif
01010                     
01011                     NATData result = m_NAT_map.getElement( input, storageid );
01012                     if( ( result.nat_addr != -1 ) || ( result.nat_port != 0 ) )
01013                     { 
01014                         if( Request.Operation == RealTimeRead )
01015                             nQueue = m_nRT[dk];
01016                         else
01017                             nQueue = m_nNRT[dk] + m_nRT[dk];
01018                         if( nQueue < Min )
01019                         {
01020                             Min = nQueue;
01021                             Rep = i;
01022                             //ClientAddr = result;
01023                         }
01024                     } 
01025                     #ifdef RIO_DEBUG2
01026                     else
01027                         RioErr << "Router WARNING: storage with ID " 
01028                                << ( ( dk - 1 ) / SNode::sn_maxdisks ) 
01029                                << " cannot be used" << endl; 
01030                     #endif           
01031                 }
01032                 #ifdef RIO_DEBUG2
01033                 else
01034                     RioErr << "Router WARNING: storage with ID " 
01035                            << ( ( dk - 1 ) / SNode::sn_maxdisks ) 
01036                            << " is down" << endl;
01037                 #endif            
01038             }
01039 
01040             if( Rep == -1 )
01041             {
01042                 // no disk queue found -- fail request
01043                 RioErr << "Router ERROR: Unavailable disk on read block " 
01044                        << Request.Block << " with reqid " << Request.Reqid 
01045                        << endl;
01046                 m_Request.Free( m_TempDiskRequest[0] );
01047                 Request.Status = ERROR_ROUTER + ERROR_INVALID_DISK;
01048                 RequestCompleted( &Request.streamobj, event );
01049                 return;
01050             }
01051 
01052            m_TempDiskRequest[0]->Disk = Request.Reps[Rep].disk;
01053            m_TempDiskRequest[0]->Pos  =
01054            ( ( (u64) Request.Reps[Rep].block ) * m_BlockSize );
01055         }
01056         // Write case
01057         else
01058         {
01059             int nQueue;
01060             int dk;
01061             int storageid;
01062             // TODO: Esta variavel devera ser removida quando a NetMgr e a 
01063             // RioNeti forem definitivamente removidas do codigo.
01064             unsigned int totalStorages;
01065             // Obtem o endereco do servidor de despacho usado como base do 
01066             // mapeamento
01067             NATData input( Request.Target.IPaddress, Request.Target.Port );
01068             // TODO: Inicializa totalStorages com o numero de storages. Este
01069             // codigo devera ser removido quando a NetMgr e a RioNeti forem 
01070             // definitivamente removidas do codigo.
01071             m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
01072 
01073             for( i = 0; i < nDiskRequests ; i++ )
01074             {
01075                 dk = Request.Reps[i].disk;
01076                 if( Request.Operation == NonRealTimeWrite )
01077                     nQueue = m_nNRT[dk] + m_nRT[dk];
01078                 else
01079                     nQueue = m_nRT[dk];
01080 
01081                 // Descobre a ID do storage a partir do disco acessado.
01082                 storageid = ( dk - 1 ) / ( SNode::sn_maxdisks ) + 1;
01083                 // Se o cliente estiver atrás de NAT, o par (IP, porta) 
01084                 // que o servidor informar ao storage são diferentes, 
01085                 // variam para cada storage.
01086                 // Para descobrir qual par deve ser usado, deve-se 
01087                 // consultar o heap m_NAT_map, que sera usado como o par 
01088                 // IP, porta a ser enviado ao servidor de armazenamento.
01089                 // Obs: Note que agora sempre deveremos ter o par IP, 
01090                 // porta do cliente no mapa, porque agora sempre 
01091                 // enviamos o mapeamento para o servidor, 
01092                 // independentemente de o cliente estar ou nao atras de 
01093                 // NAT.
01094                 //
01095                 // TODO: Precisamos verificar agora se estamos usando a nova
01096                 // ou a implementacao antiga. Todo o codigo dentro do if e 
01097                 // do seu else devem ser removidos quando a NetMgr e a 
01098                 // RioNeti forem definitivamente retiradas do codigo.
01099                 if( ( Request.Operation == NonRealTimeWrite )  &&  
01100                     ( FindNATMapping( input, storageid + totalStorages + 1 ) ) )
01101                 {
01102                     #ifdef RIO_DEBUG2
01103                     RioErr << "CRouter::DataReq usando o indice " 
01104                            << storageid + totalStorages + 1 << " ao inves do "
01105                            << "indice " << storageid << "." << endl;
01106                     #endif
01107               
01108                     storageid = storageid + totalStorages + 1;
01109                 }
01110                 #ifdef RIO_DEBUG2
01111                 else
01112                     RioErr << "Router::DataReq o novo indice " 
01113                            << storageid + totalStorages + 1 << " nao foi "
01114                            << "usado. Request.Operation = " 
01115                            << Request.Operation << "." << endl;
01116                 #endif
01117                 
01118                 NATData result = m_NAT_map.getElement( input, storageid );
01119 
01120                 if( ( nQueue > m_MaxQueueSize ) || 
01121                     ( !CheckStorageStatus( dk ) ) || 
01122                     ( ( result.nat_addr == -1 ) && ( result.nat_port == 0 ) ) )
01123                 {
01124                     RioErr << "Router ERROR: Unavailable disk " << dk 
01125                            << " on write block " << Request.Block 
01126                            << " with reqid " << Request.Reqid << endl;
01127                     RioErr << "nQueue " << nQueue << " m_MaxQueueSize "
01128                            << m_MaxQueueSize << endl;
01129                     for( j = 0; j < nDiskRequests; j++ )
01130                     {
01131                         m_Request.Free( m_TempDiskRequest[j] );
01132                     }
01133                     Request.Status = ERROR_ROUTER + ERROR_INVALID_DISK;
01134                     RequestCompleted( &Request.streamobj, event );
01135                     return;
01136                 }
01137                 // Se o disco e valido, inicializamos a sua requisicao.
01138                 m_TempDiskRequest[i]->Disk = dk;
01139                 m_TempDiskRequest[i]->Pos  =
01140                                 (((u64) Request.Reps[i].block ) * m_BlockSize );
01141             }
01142         }
01143 
01144         Request.RequestCounter = nDiskRequests;
01145 
01146         // Informa que a funcao RequestCompleted deve ser chamada.
01147         Request.RequestCancelled = false;        
01148 
01149         //Filling Disk request fields
01150         int dk;
01151         // Fill other fields of disk requests and include it of disk queue
01152         for( i = 0; i < nDiskRequests ; i++ )
01153         {
01154             
01155             #ifdef RIO_DEBUG2
01156             RioErr << "CRouter::DataReq colocando a requicao para a "
01157                    << "replicacao " << i+1 << " na fila do disco " 
01158                    << m_TempDiskRequest[ i ]->Disk << endl;
01159             #endif
01160                    
01161             m_TempDiskRequest[i]->StorageId = 0xffffffff;// Invalid storage id
01162             m_TempDiskRequest[i]->Size      = m_BlockSize;
01163             m_TempDiskRequest[i]->event     = ( EventDataRequest* ) event;
01164             dk = m_TempDiskRequest[i]->Disk;
01165 
01166             // Send request to storage node if there are not many pending
01167             // requests
01168             if( m_nPending[dk] < m_MaxPending )
01169             {
01170                 m_Pending[dk].Put( m_TempDiskRequest[i] );
01171                 // update disk queue sizes
01172                 m_nPending[dk]++;
01173                 if( ( Request.Operation == NonRealTimeWrite ) ||
01174                     ( Request.Operation == NonRealTimeRead ))
01175                 {
01176                     m_nNRT[dk]++;
01177                 }
01178                 else
01179                 {
01180                     if( m_CollectMeasures )
01181                     {
01182                         // update queue size to get estimated queue time
01183                         Request.QueueSize = m_nRT[dk];
01184                     }
01185                     // ----------------------------------------------------
01186                     m_nRT[dk]++;
01187                 }
01188                 
01189                 SendStorage(m_TempDiskRequest[i]);
01190             }
01191             // otherwise put request on disk queue
01192             else // else do if( m_nPending[dk] < m_MaxPending )
01193             {
01194                 // insert disk request at real time or non real time queue
01195                 if( ( Request.Operation == NonRealTimeWrite ) ||
01196                     ( Request.Operation == NonRealTimeRead ))
01197                 {
01198                     m_NRTqueue[dk].Put( m_TempDiskRequest[i]);
01199                     m_nNRT[dk]++;
01200                 }
01201                 else
01202                 {
01203                     m_RTqueue[dk].Put( m_TempDiskRequest[i]);
01204                     if( m_CollectMeasures )
01205                     {
01206                         // update queue size to get estimated queue time
01207                         Request.QueueSize = m_nRT[dk];
01208                     }
01209                     // --------------------------------------------------------
01210                     m_nRT[dk]++;
01211                     // --------------------------------------------------------
01212                 }
01213             } // fim do else do if( m_nPending[dk] < m_MaxPending )
01214 
01215             char name[1025];
01216             m_DiskMgr->GetDiskName( dk, name );
01217 
01218             RioStorageNodeInfo sn_info;
01219             m_DiskMgr->GetDiskStorageNodeInfo( dk, &sn_info );
01220 
01221             int queue_sum = m_nRT[ dk ]
01222                             + m_nNRT[ dk ]
01223                             + m_nPending[ dk ];
01224 
01225             UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
01226                                             sn_info.Hostname, name, queue_sum );
01227             m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
01228         } // fim do for( i = 0; i < nDiskRequests ; i++ )
01229     } // fim do else
01230 }

void CRouter::DataSendToClient ( Event event  )  [private]

Definition at line 620 of file Router.cpp.

00621 {
00622     NATData ClientAddr, ServerAddr;
00623     bool StorageUp, StorageEnabled;
00624     int storageid;
00625     //
00626     // TODO: Remover a variavel seguir quando a RioNeti e a NetMgr nao forem 
00627     // mais usadas.
00628     //
00629     unsigned int totalStorages;
00630 
00631     DataRequest& Request = (( EventDataRequest* )( event ))->Request;
00632     StrDiskRequest* m_TempDiskRequest = m_Request.Get( Request.BufferId );
00633     
00634     StorageUp = CheckStorageStatus( m_TempDiskRequest->Disk );
00635 
00636     // Obtem o par IP, porta do cliente
00637     ClientAddr.nat_addr = Request.Target.IPaddress;
00638     ClientAddr.nat_port = Request.Target.Port;
00639 
00640     // Descobre o numero de storages.
00641     //
00642     // TODO: Remover o comando seguir quando a RioNeti e a NetMgr nao forem mais
00643     // usadas.
00644     //
00645     m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
00646     
00647     // Descobre a id do storage.
00648     storageid = ( m_TempDiskRequest->Disk - 1 ) / SNode::sn_maxdisks + 1;
00649     // Ajusta a id para as copias em tempo nao real, caso existam pares 
00650     // IP, porta associados a nova implementacao de transmissao de dados.
00651     // (estes pares comecarao a partir da posicao do ultimo storage no vetor
00652     // mais 1, e o numero de pares sera o numero de storages.
00653     //
00654     // TODO: Remover a variavel seguir quando a RioNeti e a NetMgr nao forem 
00655     // mais usadas.
00656     //
00657     if( ( ( Request.Operation == NonRealTimeRead ) ||
00658           ( Request.Operation == NonRealTimeWrite ) ) && 
00659         ( FindNATMapping( ClientAddr, storageid + totalStorages + 1 ) ) )
00660     {
00661          #ifdef RIO_DEBUG2
00662          RioErr << "Router:DataSendToClient usando o indice " 
00663                 << storageid + totalStorages + 1 << " ao inves do indice "
00664                 << storageid << "." << endl;
00665          #endif
00666                 
00667          storageid = storageid + totalStorages + 1;
00668     }
00669     #ifdef RIO_DEBUG2
00670     else
00671         RioErr << "Router:DataSendToClient o novo indice " 
00672                << storageid + totalStorages + 1 << " nao foi usado. "
00673                << "Request.Operation = " << Request.Operation << "." << endl;
00674     #endif
00675     
00676     // Verifica se o disco deste servidor de armazenamento pode ser usado pelo
00677     // cliente.
00678     ServerAddr = GetNATMapping( ClientAddr, storageid );
00679     StorageEnabled = ( ( ServerAddr.nat_addr != 0 ) || 
00680                        ( ServerAddr.nat_port != 0 ) );
00681 
00682     if( ( m_TempDiskRequest ) && ( StorageUp ) && ( StorageEnabled ) )   
00683     {
00684 
00685         #ifdef RIO_DEBUG2
00686         RioErr << "Router:DataSendToClient => block "<<  Request.Block
00687                << " bufferid " << Request.BufferId
00688                << " disk " << m_TempDiskRequest->Disk << endl;
00689         #endif
00690 
00691         //update bufferstatus
00692         Request.streamobj->Stream()->decServerBufferStatus();
00693         //update tokenstosend
00694         Request.streamobj->Stream()->decTokensToSend();
00695 
00696         m_Pending[m_TempDiskRequest->Disk].Put( m_TempDiskRequest );
00697 
00698         if( m_CollectMeasures )
00699         {
00700             // don't use this sample
00701             Request.QueueSize = -1;
00702         }
00703         m_nPending[m_TempDiskRequest->Disk]++;
00704         SendStorage(m_TempDiskRequest);
00705         
00706         char name[1025];
00707         m_DiskMgr->GetDiskName( m_TempDiskRequest->Disk, name );
00708 
00709         RioStorageNodeInfo sn_info;
00710         m_DiskMgr->GetDiskStorageNodeInfo( m_TempDiskRequest->Disk, &sn_info );
00711 
00712         int queue_sum = m_nRT[ m_TempDiskRequest->Disk ]
00713                       + m_nNRT[ m_TempDiskRequest->Disk ]
00714                       + m_nPending[ m_TempDiskRequest->Disk ];
00715 
00716         UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( sn_info.Hostname, name,
00717                                                                 queue_sum );
00718         m_SystemManager->PostITEvent( (MonitorEvent *) it_event);
00719     }
00720     else if( m_TempDiskRequest == 0 )
00721     {
00722         RioErr << "ERROR: Router:DataSendToClient => Could not find request to send"
00723                << endl;
00724     }
00725     else if( StorageEnabled ) 
00726     {
00727         RioErr << "Router ERROR (DataSendToClient): Cannot access disk: " 
00728                << "storage is down" << endl;
00729     }
00730     else  
00731     {
00732         RioErr << "Router ERROR (DataSendToClient): Cannot access disk: " 
00733                << "storage is disabled for this client" << endl;
00734     }
00735 
00736 }

bool CRouter::FindNATMapping ( NATData  server_map,
int  stor_id 
)

Funcao para verificar se um mapeamento esta presente no mapa com os mapeamentos.

TODO: Remover a funcao quando a RioNeti e a NetMgr nao forem mais usadas.

Parameters:
server_map estrutura com o par IP, porta do servidor
stor_id identificador do servidor de armazenamento.
Returns:
true se o mapeamento esta presente ou false caso o mapeamento nao esteja presente.

Definition at line 2142 of file Router.cpp.

02143 {
02144     return m_NAT_map.findElement( server_map, stor_id );
02145 }

Event* CRouter::Get ( void   )  [inline, private]

Definition at line 146 of file Router.h.

00146 { return m_Queue.Get(); }

int CRouter::GetDiskServiceTime ( EventStorageRequest event,
u16  DiskId 
)

Definition at line 1983 of file Router.cpp.

01984 {
01985     int result;
01986     result = m_DiskMgr->GetDiskServiceTime( event, DiskId );
01987     if( result == -1 )
01988     {
01989         m_EstDiskServiceTimeOfDisk[ DiskId ] = -1;
01990     }
01991     return result;
01992 }

long double CRouter::GetEstimatedDiskQueueTime ( int  disk  ) 

Definition at line 1945 of file Router.cpp.

01946 {
01947     return m_EstDiskQueueTimeOfDisk[disk];
01948 }

long double CRouter::GetEstimatedQueueTimeOfAllDisks (  ) 
unsigned long long int CRouter::GetInvalidStorages ( NATData  ClientAddr  ) 

Deternima o vetor de bits com os servidores de armazenamento que nao podem ser usados pelo cliente com endereco passado como parametro, isto e, que estao inativos ou que foram invalidados (apos se tornarem novamente ativos).

Parameters:
ClientAddr endereco UDP do cliente. Se os campos da estrutura NATData forem ambos 0, somente iremos considerar se os servidores estao ou nao ativos. Em caso contrario, tambem vamos contar os servidores que nao podem ser usados pelo cliente.
Returns:
vetor com as informacoes dos storages inativos. Cada bit esta associado a um servidor de armazenamento e, quando igual a 1, indica que o servidor associado ao bit nao pode ser usado (e 0 quando pode ser usado).

Definition at line 2358 of file Router.cpp.

02359 {
02360     unsigned long long int InvalidStorages;
02361     unsigned int TotalStorages;
02362     NATData StorageAddr;
02363 
02364     // Determina o numero de servidores de armazenamento ativos.
02365     m_DiskMgr->GetNumberOfStorageNodes( &TotalStorages );
02366     
02367     // Obtem acesso exclusivo a variavel m_StoragesStatus. 
02368     pthread_mutex_lock( &m_StoragesStatusMutex );
02369     // Inicialmente os servidores que nao podem ser usados sao os que estao
02370     // inativos.
02371     InvalidStorages = m_StoragesStatus;
02372     // Libera o acesso exclusivo a variavel m_StoragesStatus. 
02373     pthread_mutex_unlock( &m_StoragesStatusMutex );
02374     // Adiciona os servidores de armazenamento que se tornaram ativos depois de
02375     // o cliente ter criado a conexao e que, portanto, nao podem ser usados.
02376     if( ( ClientAddr.nat_addr != 0 ) && ( ClientAddr.nat_port != 0 ) )
02377     {
02378         for( unsigned int Storage = 0; Storage < TotalStorages; Storage++ )
02379         {
02380             // Devemos adicionar mais 1 ao chamar GetNATMapping porque, no mapa
02381             // de mapeamentos do NAT, os servidores de armazenamento estao a 
02382             // partir da posicao 1 (a posicao 0 e usada pelo servidor de 
02383             // gerenciamento).
02384             StorageAddr = GetNATMapping( ClientAddr, Storage + 1 );
02385             if( ( StorageAddr.nat_addr == -1 ) && 
02386                 ( StorageAddr.nat_port == 0 ) )
02387             {
02388                 
02389                 #ifdef RIO_DEBUG2
02390                 RioErr << "CRouter::GetInvalidStorages o servidor de "
02391                        << "armazenamento com a ID " << Storage << " nao pode "
02392                        << "ser usado pelo cliente pois seu mapeamento foi "
02393                        << "invalidado." << endl;
02394                 #endif
02395                        
02396                 InvalidStorages = InvalidStorages | ( 1ull << Storage );
02397             } 
02398             
02399             // Verifica se o outro endereco para o storage para a nova classe e
02400             // valido (se existir). Este codigo devera ser removido quando a 
02401             // classe NetMgr for definitivamente retirada do codigo.
02402             //
02403             // TODO: Remover todo o codigo dentro do if a seguir, incluindo o 
02404             // comando if, quando a RioNeti e a NetMgr nao forem mais usadas.
02405             //
02406             if( FindNATMapping( ClientAddr, Storage + TotalStorages + 1 ) )
02407             {
02408                 StorageAddr = GetNATMapping( ClientAddr, Storage + 
02409                                              TotalStorages + 1 );
02410                 if( ( StorageAddr.nat_addr == -1 ) && 
02411                     ( StorageAddr.nat_port == 0 ) )
02412                 {
02413                 
02414                     #ifdef RIO_DEBUG2
02415                     RioErr << "CRouter::GetInvalidStorages o servidor de "
02416                            << "armazenamento com a ID " << Storage << " nao "
02417                            << "pode ser usado pelo cliente pois seu mapeamento "
02418                            << " foi invalidado (na nova implementacao)." 
02419                            << endl;
02420                     #endif
02421                        
02422                     InvalidStorages = InvalidStorages | ( 1ull << Storage );
02423                 }
02424             }
02425             #ifdef RIO_DEBUG2
02426             else
02427             RioErr << "CRouter::GetInvalidStorages a ID "  
02428                    << Storage + TotalStorages + 1 << " nao existe no mapa e "
02429                    << "nao foi considerada!" << endl;
02430             #endif
02431         }
02432     } 
02433     #ifdef RIO_DEBUG2
02434     // Imprime o vetor com os servidores que nao podem ser usados.
02435     RioErr << "CRouter::GetInvalidStorages Vetor com a identificacao dos "
02436            << "servidores de armazenamento que nao podem ser usados: ";
02437     for( int p = MAXNUMSTORAGES - 1; p >= 0 ; p-- )
02438     {
02439         unsigned int q = ( InvalidStorages >> p ) & 1ull; 
02440         RioErr << q;
02441     }
02442     RioErr << endl;
02443     #endif
02444            
02445     return InvalidStorages;
02446 }

int CRouter::GetMaxNumberOfDisks (  ) 

Definition at line 1967 of file Router.cpp.

01968 {
01969     return m_DiskMgr->GetMaxNumberOfDisks();
01970 }

NATData CRouter::GetNATMapping ( NATData  server_map,
int  stor_id 
)

Funcao para obter um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho.

Parameters:
server_map estrutura com o par IP, porta do servidor
stor_id identificador do servidor de armazenamento.

Definition at line 2126 of file Router.cpp.

02127 {
02128     return m_NAT_map.getElement( server_map, stor_id );
02129 }

int CRouter::GetNumberOfActiveDisks (  ) 

Definition at line 1974 of file Router.cpp.

01975 {
01976     unsigned int numd;
01977     m_DiskMgr->GetNumberOfActiveDisks( &numd);
01978     return (int) numd;
01979 }

int CRouter::GetNumberOfStorageNodes ( unsigned int *  NumberOfStorageNodes  ) 

Funcao para obter o numero de storage nodes.

Parameters:
NumberOfStorageNodes ponteiro para um inteiro nao sinalizado que armazenara o numero de storages.
Returns:
S_OK se nenhum erro ocorreu ao obter o numero de storages, ou um valor diferente de S_OK em caso contrario.

Definition at line 2493 of file Router.cpp.

02494 {
02495     return m_DiskMgr->GetNumberOfStorageNodes( NumberOfStorageNodes );
02496 }

void CRouter::GetStorageInfo ( NATData  ClientAddr,
bool *  CanRead,
bool *  CanWrite 
)

Deternima se podemos ler ou escrever blocos, no nomento, para um dado cliente cujo endereco e passado como parametro.

Parameters:
ClientAddr endereco UDP do cliente. Se os campos da estrutura NATData forem ambos 0, somente iremos considerar se os servidores estao ou nao ativos. Em caso contrario, tambem vamos contar os servidores que nao podem ser usados pelo cliente.
CanRead ponteiro para um valor booleano que informa se uma leitura de um bloco pode ser feita. Consideraremos que uma leitura nao podera ser feita se nao for possivel garantir que um bloco pode ser lido, isto e, se o numero de servidores de armazenamento inativos for maior ou igual ao numero de replicacoes.
CanWrite ponteiro para um valor booleano que informa se uma escrita de um bloco pode ser feita. Consideraremos que uma escrita nao podera ser feita se nao for possivel garantir um disco em um servidor de armazenamento diferente para cada replicacao do bloco, isto e, se o numero de servidores de armazenamento ativos for menor do que o numero de replicacoes.

Definition at line 2450 of file Router.cpp.

02452 {
02453     unsigned long long int InvalidStorages;
02454     unsigned int TotalStorages;
02455     int InactiveStorages, ActiveStorages;
02456     int NumberOfReplications;
02457 
02458     // Determina o numero de servidores de armazenamento ativos.
02459     m_DiskMgr->GetNumberOfStorageNodes( &TotalStorages );
02460     // Determina o numero de replicacoes.
02461     NumberOfReplications = m_DiskMgr->GetNumberOfReplications();
02462     // Determina quais servidores de armazenamento nao podem ser usados pelo
02463     // cliente
02464     InvalidStorages = GetInvalidStorages( ClientAddr );     
02465     // Conta o numero de servidores de armazenamento nao ativos.
02466     InactiveStorages = 0;
02467     for( unsigned int i = 0; i < MAXNUMSTORAGES; i++ )
02468         if( ( InvalidStorages & ( 1ull << i ) ) != 0 )
02469             ( InactiveStorages )++;
02470     // Define o numero de servidores ativos (igual ao numero total de servidores
02471     // menos os servidores inativos).        
02472     ActiveStorages = TotalStorages - InactiveStorages;
02473     // A leitura somente podera ser feita, com garantia de sucesso, se o numero
02474     // de servidores inativos for menor do que o numero de replicacoes.
02475     *CanRead = ( InactiveStorages < NumberOfReplications );
02476     // A escrita somente podera ser feita se o numero de servidores ativos for
02477     // maior ou igual ao numero de replicacoes pois, em caso contrario, nao 
02478     // existirao servidores suficientes para salvarmos todas as replicacoes de
02479     // um bloco.
02480     *CanWrite = ( ActiveStorages >= NumberOfReplications );
02481     #ifdef RIO_DEBUG2         
02482     RioErr << "RioSession::GetStorageInfo estado dos servidores de "
02483            << "armazenamento ao receber um pedido de dados : ativos = " 
02484            << ActiveStorages << ", inativos = " << InactiveStorages 
02485            << ", totais = " << TotalStorages << ", replicacoes = "
02486            << NumberOfReplications << ", CanRead = " 
02487            << ( *CanRead ? "true": "false" ) << ", CanWrite = " 
02488            << ( *CanWrite ? "true": "false" ) << endl;
02489     #endif        
02490 }

int CRouter::Initialize ( RouterConfig Config  ) 

Definition at line 199 of file Router.cpp.

00200 {
00201     int i;
00202 
00203     if( m_initialized )
00204     {
00205         RioErr << "Router.Initialize: already initialized" << endl;
00206         return ERROR_ROUTER + ERROR_INITIALIZED;
00207     }
00208 
00209     // Update number of disks
00210     m_nDisks = Config->nDisks;
00211 
00212     // Update block size
00213     m_BlockSize = Config->BlockSize;
00214 
00215     // Update maximum number of pending requests per disk
00216     m_MaxPending = Config->MaxPending;
00217 
00218     // Update flag to collect measures
00219     m_CollectMeasures = Config->CollectMeasures;
00220     // Update Parameter to collect measures
00221     m_EstimatedParameter = Config->EstimatedTimeParameter;
00222     m_UpdatedEstimatedDiskServiceTime = 0;
00223 
00224     #ifdef __QUEUE_RESP_LOG
00225     if( m_CollectMeasures )
00226     {
00227         // open files to save measures
00228         // Variavel usada para compor os nomes dos arquivos com os logs.
00229         char LogFileName[ MaxPathSize ];
00230         // Compoe o nome do arquivo com os logs.
00231         strcpy( LogFileName, Config->LogsDirectory );
00232         strcat( LogFileName, LOGRESP );
00233         m_logRESP.open( LogFileName );
00234         strcpy( LogFileName, Config->LogsDirectory );
00235         strcat( LogFileName, LOGSQT );
00236         m_logSQT.open ( LogFileName );
00237         m_logRESP << "#Measures in msec" << endl;
00238         m_logSQT  << "#Measures in msec" << endl;
00239     }
00240     #endif
00241     // ------------------------------------------------------------------------
00242 
00243     m_DiskMgr       = Config->DiskManager;
00244     m_SystemManager = Config->SystemManager;
00245 
00246     // allocate disk queues. 3 queues: real-time, non real-time and
00247     // pending requests (i.e. submited to storage node )
00248 
00249     m_RTqueue = new CDiskRequestQueue[m_nDisks];
00250     if( m_RTqueue == 0 )
00251     {
00252         RioErr << "Router.Initialize: Out of memory" << endl;
00253         CleanUp();
00254         return ERROR_ROUTER + ERROR_MEMORY;
00255     }
00256 
00257     m_NRTqueue = new CDiskRequestQueue[m_nDisks];
00258     if( m_NRTqueue == 0 )
00259     {
00260         RioErr << "Router.Initialize: Out of memory" << endl;
00261         CleanUp();
00262         return ERROR_ROUTER + ERROR_MEMORY;
00263     }
00264 
00265     m_Pending = new CDiskRequestQueue[m_nDisks];
00266     if( m_Pending == 0 )
00267     {
00268         RioErr << "Router.Initialize: Out of memory" << endl;
00269         CleanUp();
00270         return ERROR_ROUTER + ERROR_MEMORY;
00271     }
00272     m_nRT = new int [m_nDisks];
00273     if( m_nRT == 0 )
00274     {
00275         RioErr << "Router.Initialize: Out of memory" << endl;
00276         CleanUp();
00277         return ERROR_ROUTER + ERROR_MEMORY;
00278     }
00279 
00280     m_nNRT = new int [m_nDisks];
00281     if( m_nNRT == 0 )
00282     {
00283         RioErr << "Router.Initialize: Out of memory" << endl;
00284         CleanUp();
00285         return ERROR_ROUTER + ERROR_MEMORY;
00286     }
00287 
00288     m_nPending = new int [m_nDisks];
00289     if( m_nPending == 0 )
00290     {
00291         RioErr << "Router.Initialize: Out of memory" << endl;
00292         CleanUp();
00293         return ERROR_ROUTER + ERROR_MEMORY;
00294     }
00295 
00296     m_MaxQueueSize = ( Config->MaxDataRequests * MaxReplications );
00297 
00298     for( i = 0; i < m_nDisks; i++ )
00299     {
00300         // Set size of real time queues to large value
00301         // This indicates that the disk is not available
00302         m_nRT[i] = m_MaxQueueSize + 1;
00303         m_nNRT[i] = 0;
00304         m_nPending[i] = 0;
00305     }
00306 
00307     m_TempDiskRequest = new StrDiskRequest* [MaxReplications];
00308     if( m_TempDiskRequest == 0 )
00309     {
00310         RioErr << "Router.Initialize: Out of memory" << endl;
00311         CleanUp();
00312         return ERROR_ROUTER + ERROR_MEMORY;
00313     }
00314 
00315     // number of requests must be >= maxbuffer * maxclients so set
00316     // maxrequests to max value.
00317     unsigned int maxrequests = 0xffff;
00318 
00319     #ifdef RIO_DEBUG2
00320     RioErr << "Router: Max number of Requests " << maxrequests << endl;
00321     #endif
00322     // ------------------------------------------------------------------------
00323 
00324     int hResult;
00325     hResult = m_Request.Initialize( maxrequests );
00326     if( hResult )
00327     {
00328         RioErr << "Could not initialize m_Requests at Router"
00329                << ". (" << maxrequests << ")" << endl;
00330         return hResult;
00331     }
00332 
00333     m_initialized = true;
00334 
00335     return S_OK;
00336 }

void CRouter::PrintQueues (  )  [private]

Definition at line 1951 of file Router.cpp.

01952 {
01953     int i;
01954     RioErr << "Queues:" << endl;
01955     for( i = 0; i < m_nDisks; i++ )
01956     {
01957         RioErr << "  Disk " << i << ": "
01958                << m_nRT[i] << " "
01959                << m_nNRT[i] << " "
01960                << m_nPending[i] << endl;
01961 
01962     }
01963 }

void CRouter::Put ( Event ep  ) 

Definition at line 2092 of file Router.cpp.

02093 {
02094     EventDataRequest* event;
02095     event = ( EventDataRequest *) ep;
02096 
02097     if( ( ( ep->Type==EventTypeRTDataRequest  ) ||
02098           ( ep->Type==EventTypeNRTDataRequest ) 
02099         ) && ( m_CollectMeasures )
02100       )
02101     {
02102         gettimeofday( &event->Request.ArrivalTime, 0 );
02103     }
02104 
02105     m_Queue.Put(ep);
02106 }

void CRouter::RemoveNATMapping ( NATData  server_map,
int  stor_id 
)

Funcao para remover um mapeamento passado como parametro identificado pelo par IP, porta do servidor de gerenciamento e o identificador do servidor de despacho.

Parameters:
server_map estrutura com o par IP, porta do servidor
stor_id identificador do servidor de armazenamento.
Returns:
mapeamento para o servidor de armazenamenro.

Definition at line 2118 of file Router.cpp.

02119 {
02120     m_NAT_map.removeElement( server_map, stor_id );
02121 }

void CRouter::RequestCompleted ( RioStreamObj **  StreamObjAddr,
Event event 
)

Funcao para completar uma requisicao (chamando a RequestCompleted do objeto RioStreamObj passado como parametro) removendo, quando necessario, o objeto do stream e alterando o ponteiro passado como parametro para NULL.

Obs: Se o objeto passado for NULL, indicando que o objeto foi incorretamente removido, um erro sera impresso no log do servidor informando deste erro.

Parameters:
StreamObjAddr ponteiro com o endereco com o ponteiro para o objeto para o qual deveremos chamar a funcao RequestCompleted (isso e necessario porque, se removermos o objeto, o ponteiro para ele devera ser alterado para NULL).
event ponteiro do evento a ser passado a funcao RequestCompleted.

Definition at line 2501 of file Router.cpp.

02502 {
02503     // Variavel booleana para armazenar a informacao de se o objeto de stream
02504     // deve ou nao ser removido nesta funcao.
02505     bool RemoveStreamObj;
02506 
02507     if( ( *StreamObjAddr ) != NULL )
02508     {
02509         ( *StreamObjAddr )->RequestCompleted( event, &RemoveStreamObj );
02510         if( RemoveStreamObj )
02511         {
02512              // Neste caso, tratamos da ultima requisicao recebida e o stream ja
02513              // foi fechado, mas nao foi removido por causa das requisicoes que
02514              // ainda estavam pendentes quando ele foi fechado. Entao, devemos
02515              // remover o stream e alterar o ponteiro passado como parametro
02516              // para NULL, para podermos detectar eventuais erros futuros. Note
02517              // que o ponteiro nao deveria ser usado em outras partes do Router
02518              // porque, nestas partes, o stream ainda deveria estar aberto.
02519              //RioErr << "CRouter::RequestCompleted deletando o objeto "
02520              //        << "RioStreamObj cujo ponteiro e igual a 0x " << hex
02521              //    << ( unsigned long ) *StreamObjAddr << dec << "." << endl;
02522              delete *StreamObjAddr;
02523              *StreamObjAddr = NULL;
02524         }
02525     }
02526     else
02527     {
02528          RioErr << "[CRouter] RequestCompleted nao foi possivel executar a "
02529                 << "funcao porque foi passado um ponteiro NULO!" << endl;
02530     }
02531 }

void CRouter::routerthread (  )  [private]

Definition at line 398 of file Router.cpp.

00399 {
00400     Event* event;
00401 
00402     RioErr << "ROUTERTHREADID " << syscall( SYS_gettid ) << endl;
00403 
00404     while( 1 )
00405     {
00406         event = Get();
00407 
00408         #ifdef RIO_DEBUG2    
00409         RioErr << "CRouter::routerthread processando o evento com o tipo "
00410                << event->Type << endl;
00411         #endif       
00412 
00413         switch( event->Type )
00414         {
00415             case EventTypeRTDataRequest:
00416             case EventTypeNRTDataRequest:
00417                 DataReq( event );
00418                 break;
00419 
00420             case EventTypeStorageReply:
00421                 // -- free specific request for that disk
00422                 StorageReply(& (((EventStorageReply*)(event))->StorageReply ));
00423                 EventManager.Free(event);
00424                 break;
00425 
00426             case EventTypeAddDisk:
00427                 AddDisk((( EventAddDisk* )( event ))->Disk );
00428                 EventManager.Free(event);
00429                 break;
00430                 
00431             // Novos eventos usados para tratar do caso em que um servidor de
00432             // armazenamento cai e depois volta a estar disponivel.
00433             // Evento para tratar a queda de um servidor de armazenamento.
00434             case EventTypeStorageDown:
00435             
00436                 #ifdef RIO_DEBUG2
00437                 RioErr << "CRouter::routerthread recebida a mensagem "
00438                        << "EventTypeStorageDown da classe DiskMgr informando "
00439                        << "que o servidor de armazanamento com a ID "
00440                        << ( ( EventStorageDown* ) ( event ) )->StorageId 
00441                        << " parou de funcionar" << endl;
00442                 #endif       
00443                 
00444                 StorageDown( ( ( EventStorageDown* ) ( event ) )->StorageId,
00445                              ( ( EventStorageDown* ) ( event ) )->
00446                                 EmptyStorageQueue );
00447                 EventManager.Free( event );
00448                 break;
00449             // Evento para tratar a volta de um servidor de armazenamento.
00450             case EventTypeStorageUp:
00451             
00452                 #ifdef RIO_DEBUG2
00453                 RioErr << "CRouter::routerthread recebida a mensagem "
00454                        << "EventTypeStorageUp da classe DiskMgr informando "
00455                        << "que o servidor de armazanamento com a ID "
00456                        << ( ( EventStorageDown* ) ( event ) )->StorageId 
00457                        << " voltou a funcionar" << endl;
00458                 #endif       
00459                        
00460                 StorageUp( ( ( EventStorageUp* )( event ) )->StorageId );
00461                 EventManager.Free( event );
00462                 break;
00463                 
00464             // Novo evento usado para parar a thread do Router.
00465             case EventTypeFinalizeThread:
00466                 #ifdef RIO_DEBUG2
00467                 RioErr << "CRouter::routerthread recebida a mensagem "
00468                        << "EventTypeFinalizeThread informando que a thread "
00469                        << "deve ser finalizada" << endl;
00470                 #endif
00471                 EventManager.Free( event );
00472                 //pthread_exit( NULL );
00473                 return;       
00474                 break;    
00475 
00476             default:
00477                 RioErr << "Router.Routerthread unknown element type "
00478                        << (int *) event->Type << endl;
00479                 EventManager.Free(event);
00480         }
00481     }
00482 }

void * CRouter::routerthreadep ( void *  param  )  [static, private]

Definition at line 391 of file Router.cpp.

00392 {
00393     ( (CRouter *) parm)->routerthread();
00394     return 0;
00395 }

void CRouter::SendStorage ( StrDiskRequest Request  )  [private]

Definition at line 1609 of file Router.cpp.

01610 {
01611     EventStorageRequest* event = NULL;
01612 
01613     #ifdef RIO_DEBUG2
01614     int bufferid = ((EventDataRequest*)(Request->event))->Request.BufferId;
01615     RioErr << "Router:SendStorage => RouterId " << Request->Id
01616            << " BufferId " << bufferid
01617            << " Disk " << Request->Disk
01618            << " pending requests " <<  m_nPending[Request->Disk] << endl;
01619 
01620     if( m_nPending[Request->Disk] > m_MaxPending )
01621     {
01622         RioErr << "Router:SendStorage => "
01623                << " Disk " << Request->Disk
01624                << " pending requests " <<  m_nPending[Request->Disk]
01625                << " RT " <<m_nRT[Request->Disk] << endl;
01626     }
01627     #endif
01628 
01629     if( ( m_CollectMeasures ) && ( Request->event->Request.QueueSize != -1 ) )
01630     {
01631         struct timeval now;
01632         gettimeofday( &now, 0 );
01633         double sample_msec = getInterval( Request->event->Request.ArrivalTime,
01634                                           now );
01635         Request->event->Request.ArrivalTime = now;
01636 
01637         // updates m_EstimatedQueueTime of the disk
01638         int disk = Request->event->Request.Reps[0].disk;
01639         int queuesize = Request->event->Request.QueueSize;
01640 
01641         if( queuesize <  m_MaxPending + 1 )
01642             queuesize = 0;
01643         else if( queuesize > ( m_MaxPending + 300 ))
01644             queuesize = 300;
01645         else
01646             queuesize = queuesize - m_MaxPending;
01647 
01648         if( m_EstDiskQueueTime[ disk ][ queuesize ] == 0 )
01649         {
01650             m_EstDiskQueueTime[ disk ][ queuesize ] = sample_msec;
01651         }
01652         else
01653         {
01654             m_EstDiskQueueTime[ disk ][ queuesize ] =
01655                                                  (( 1 - m_EstimatedParameter) *
01656                                      m_EstDiskQueueTime[ disk ][ queuesize ]) +
01657                                             m_EstimatedParameter * sample_msec;
01658         }
01659         if( m_EstDiskQueueTimeOfDisk[ disk ] == 0 )
01660         {
01661             m_EstDiskQueueTimeOfDisk[ disk ] = sample_msec;
01662         }
01663         else
01664         {
01665             m_EstDiskQueueTimeOfDisk[ disk ] =    ( 1 - m_EstimatedParameter) *
01666                                              m_EstDiskQueueTimeOfDisk[ disk ] +
01667                                             m_EstimatedParameter * sample_msec;
01668         }
01669 
01670         #ifdef __QUEUE_RESP_LOG
01671         m_logSQT << " disk "   << disk
01672                  << " sample " << sample_msec
01673                  << " est "    << m_EstDiskQueueTimeOfDisk[ disk ] << endl;
01674         #endif
01675     }
01676 
01677     if( Request->event->Request.Operation == RealTimePrefetchBlock )
01678     {
01679         MsgRSSnewRequest* msg;
01680 
01681         Request->event->Request.RequestCounter = 1;
01682 
01683         event = (EventStorageRequest*) EventManager.New(
01684                                                     EventTypeStorageRequest );
01685         msg = &( event->StorageRequest.Request.New );
01686         msg->Type = MSG_RSS_FETCH | RSS_INTERMEDIATE;
01687         msg->Size = SizeMsgRSSnewRequest;
01688         msg->Token = RSS_TOKEN_STORAGE;
01689         // Doesn't know yet ClientId
01690         msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid;
01691         msg->RouterId = Request->Id;
01692         // Doesn't know yet IP
01693         msg->IPaddr =
01694           ((EventDataRequest*)(Request->event))->Request.Target.IPaddress;
01695         // Doesn't know yet Port
01696         msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port;
01697         // Se o cliente estiver atrás de NAT, o par (IP, porta) que o servidor
01698         // deve informar ao storage são diferentes, variam para cada storage
01699         // Para descobrir qual par deve ser usado, deve-se consultar o heap
01700         // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente não
01701         // está atrás de NAT e o par atual deve ser mantido; caso contrário,
01702         // utiliza-se o resultado da consulta
01703         // Aparentemente este trecho de código não é necessário, visto que o IP
01704         // e porta não são conhecidos neste momento. Se preciso, descomentar
01705         // o código abaixo e testar.
01706         //NATData input( msg->IPaddr, msg->Port );
01707         //NATData result = m_NAT_map.getElement( input,
01708         //                           (Request->Disk-1)/(SNode::sn_maxdisks)+1 );
01709         //if( result.nat_addr != 0 )
01710         //{
01711         //    msg->IPaddr = result.nat_addr;
01712         //    msg->Port = result.nat_port;
01713         //}
01714         msg->Disk = Request->Disk;
01715         msg->Pos = Request->Pos;
01716         msg->DataSize = Request->Size;
01717 
01718         #ifdef RIO_DEBUG2
01719         RioErr << " prefetch RouterId "<< Request->Id
01720                << " ClientId "<< msg->ClientId
01721                << " msg-Type (F+Inter)" << msg->Type << endl;
01722         #endif
01723     }
01724     else if( Request->event->Request.Operation == RealTimeCancelBlock )
01725     {
01726         if( Request->event->Request.BufferId == -1 )
01727         {
01728             #ifdef RIO_DEBUG2
01729             RioErr << "request block was not submit to storage yet, "
01730                    << " so just free request" << endl;  
01731             #endif
01732 
01733             m_nRT[Request->Disk]--;
01734             m_nPending[Request->Disk]--;
01735 
01736             //the request was not submit to storage yet, so just free request
01737             Request->event->Request.Status = 0;
01738             RequestCompleted( &Request->event->Request.streamobj,
01739                               ( Event* ) ( Request->event ) );
01740             char name[1025];
01741             m_DiskMgr->GetDiskName( Request->Disk, name );
01742 
01743             RioStorageNodeInfo sn_info;
01744             m_DiskMgr->GetDiskStorageNodeInfo( Request->Disk, &sn_info );
01745 
01746             int queue_sum = m_nRT[ Request->Disk ]
01747                           + m_nNRT[ Request->Disk ]
01748                           + m_nPending[ Request->Disk ];
01749 
01750             UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
01751                 sn_info.Hostname, name, queue_sum );
01752             m_SystemManager->PostITEvent( (MonitorEvent *) it_event );
01753             return;
01754         }
01755         else
01756         {
01757             MsgRSSpendRequest* msg;
01758             event = (EventStorageRequest*) EventManager.New(
01759                                                     EventTypeStorageRequest );
01760             msg = &( event->StorageRequest.Request.Pending );
01761             msg->Type = MSG_RSS_CANCEL;
01762             msg->Size = SizeMsgRSSpendRequest;
01763             msg->Token = RSS_TOKEN_STORAGE;
01764             //Update ClientId
01765             msg->ClientId = ((EventDataRequest*)(
01766                                             Request->event))->Request.Reqid;
01767             //Update StorageId
01768             msg->StorageId = Request->StorageId;
01769             msg->RouterId = Request->Id;
01770             //Update IP
01771             msg->IPaddr = ((EventDataRequest*)(
01772                                     Request->event))->Request.Target.IPaddress;
01773             //Update Port
01774             msg->Port = ((EventDataRequest*)(
01775                                     Request->event))->Request.Target.Port;
01776             // Se o cliente estiver atrás de NAT, o par (IP, porta) que o 
01777             // servidor deve informar ao storage são diferentes, variam para 
01778             // cada storage
01779             // Para descobrir qual par deve ser usado, deve-se consultar o heap
01780             // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente 
01781             // não está atrás de NAT e o par atual deve ser mantido; caso 
01782             // contrário, utiliza-se o resultado da consulta
01783             NATData input( msg->IPaddr, msg->Port );
01784             NATData result = m_NAT_map.getElement( input,
01785                                    (Request->Disk-1)/(SNode::sn_maxdisks)+1 );
01786             if( result.nat_addr != 0 )
01787             {
01788                 msg->IPaddr = result.nat_addr;
01789                 msg->Port = result.nat_port;
01790             }
01791 
01792             #ifdef RIO_DEBUG2
01793             RioErr << " cancel RouterId " << Request->Id
01794                    << " ClientId "        << msg->ClientId
01795                    << " StorageId "       << msg->StorageId
01796                    << " msg-Type C "      << msg->Type << endl;
01797             #endif
01798         }
01799     }
01800     else if( Request->event->Request.Operation == RealTimeSendBlock )
01801     {
01802         MsgRSSpendRequest* msg;
01803         event = (EventStorageRequest*) EventManager.New(
01804                                                     EventTypeStorageRequest );
01805         msg = &( event->StorageRequest.Request.Pending );
01806         msg->Type = MSG_RSS_SEND;
01807         msg->Size = SizeMsgRSSpendRequest;
01808         msg->Token = RSS_TOKEN_STORAGE;
01809         //Update ClientId
01810         msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid;
01811         //Update StorageId
01812         msg->StorageId = Request->StorageId;
01813         msg->RouterId = Request->Id;
01814         //Update IP
01815         msg->IPaddr = ((EventDataRequest*)(
01816                                     Request->event))->Request.Target.IPaddress;
01817         //Update Port
01818         msg->Port = ((EventDataRequest*)(Request->event))->Request.Target.Port;
01819         // Se o cliente estiver atrás de NAT, o par (IP, porta) que o servidor
01820         // deve informar ao storage são diferentes, variam para cada storage
01821         // Para descobrir qual par deve ser usado, deve-se consultar o heap
01822         // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente não
01823         // está atrás de NAT e o par atual deve ser mantido; caso contrário,
01824         // utiliza-se o resultado da consulta
01825         NATData input( msg->IPaddr, msg->Port );
01826         NATData result = m_NAT_map.getElement( input,
01827                                    (Request->Disk-1)/(SNode::sn_maxdisks)+1 );
01828         if( result.nat_addr != 0 )
01829         {
01830             msg->IPaddr = result.nat_addr;
01831             msg->Port = result.nat_port;
01832         }
01833 
01834         /*not send ack*/
01835         msg->StreamTraffic = ((Event*)(Request->event))->Type;
01836         
01837         // Adiciona a mensagem a taxa de transferencia do Video.
01838         ((EventDataRequest*)(Request->event))->Request.rioobject->GetVideoRate( &msg->VideoRate );
01839 
01840         #ifdef RIO_DEBUG2
01841         RioErr << " send RouterId " << Request->Id
01842                << " ClientId "      << msg->ClientId
01843                << " StorageId "     << msg->StorageId
01844                << " msg-Type S "    << msg->Type << endl;
01845         #endif
01846     }
01847     // ------------------------------------------------------------------------
01848     else
01849     {
01850         MsgRSSnewRequest* msg;
01851         // Armazena o identificador do storage.
01852         int storageid;
01853         // TODO: Esta variavel devera ser removida quando a NetMgr e a 
01854         // RioNeti forem definitivamente removidas do codigo.
01855         unsigned int totalStorages;
01856         
01857         event = (EventStorageRequest*)EventManager.New(EventTypeStorageRequest);
01858         msg = &( event->StorageRequest.Request.New );
01859         if( (  Request->event->Request.Operation == RealTimeRead ) ||
01860            (  Request->event->Request.Operation == NonRealTimeRead))
01861             msg->Type = MSG_RSS_READ;
01862         else
01863             msg->Type = MSG_RSS_WRITE;
01864 
01865         msg->Size     = SizeMsgRSSnewRequest;
01866         msg->Token    = RSS_TOKEN_STORAGE;
01867         msg->ClientId = ((EventDataRequest*)(Request->event))->Request.Reqid;
01868         msg->RouterId = Request->Id;
01869         msg->IPaddr   = ((EventDataRequest*)(
01870                                     Request->event))->Request.Target.IPaddress;
01871         msg->Port     = ((EventDataRequest*)(Request->event))->Request.Target.Port;
01872         
01873         // Descobre o identificador do storage.
01874         storageid = ( Request->Disk - 1 )/( SNode::sn_maxdisks ) + 1; 
01875 
01876         // TODO: Inicializa totalStorages com o numero de storages. Este codigo
01877         // devera ser removido quando a NetMgr e a RioNeti forem 
01878         // definitivamente removidas do codigo.
01879         m_DiskMgr->GetNumberOfStorageNodes( &totalStorages );
01880         
01881         // Obtem o endereco do servidor de despacho usado como base do 
01882         // mapeamento.
01883         NATData input( msg->IPaddr, msg->Port );
01884 
01885         // Se o cliente estiver atrás de NAT, o par (IP, porta) que o servidor
01886         // deve informar ao storage são diferentes, variam para cada storage
01887         // Para descobrir qual par deve ser usado, deve-se consultar o heap
01888         // m_NAT_map. Caso a resposta seja o par default (0, 0), o cliente não
01889         // está atrás de NAT e o par atual deve ser mantido; caso contrário,
01890         // utiliza-se o resultado da consulta
01891         //
01892         // TODO: Precisamos verificar agora se estamos usando a nova ou a 
01893         // implementacao antiga. Todo o codigo dentro do if e do seu else devem 
01894         // ser removidos quando a NetMgr e a RioNeti forem definitivamente 
01895         // retiradas do codigo.
01896         if( ( ( Request->event->Request.Operation == NonRealTimeRead ) ||
01897               ( Request->event->Request.Operation == NonRealTimeWrite ) ) && 
01898               ( FindNATMapping( input, storageid + totalStorages + 1 ) ) )
01899         {
01900             #ifdef RIO_DEBUG2
01901             RioErr << "CRouter::SendStorage usando o indice "  
01902                    << storageid + totalStorages + 1 << " ao inves do indice " 
01903                    << storageid << "." << endl;
01904             #endif
01905               
01906             storageid = storageid + totalStorages + 1;
01907         }
01908         #ifdef RIO_DEBUG2
01909         else
01910             RioErr << "Router::SendStorage o novo indice " 
01911                    << storageid + totalStorages + 1 << " nao foi usado. "
01912                    << "Request->event->Request.Operation = " 
01913                    << Request->event->Request.Operation << "." << endl;
01914         #endif
01915         
01916         // Obtem o mapeamento a ser usado pelo storage.
01917         NATData result = m_NAT_map.getElement( input, storageid );
01918         if( result.nat_addr != 0 )
01919         {
01920             msg->IPaddr = result.nat_addr;
01921             msg->Port = result.nat_port;
01922         }
01923         msg->Disk     = Request->Disk;
01924         msg->Pos      = Request->Pos;
01925         msg->DataSize = Request->Size;
01926 
01927         /*not send ack*/
01928         msg->StreamTraffic = ((Event*)(Request->event))->Type;
01929 
01930         // Adiciona a mensagem a taxa de transferencia do Video.
01931         ((EventDataRequest*)(Request->event))->Request.rioobject->GetVideoRate( &msg->VideoRate );
01932 
01933         #ifdef RIO_DEBUG2
01934         RioErr << " read and send RouterId "<< Request->Id
01935                << " ClientId "<< msg->ClientId
01936                << " msg-Type RS|RW " << msg->Type << endl;
01937         #endif
01938     }
01939 
01940     m_DiskMgr->SendStorageNode( event, u16 ( Request->Disk ));
01941     return;
01942 }

void CRouter::SetDiskServiceTime ( int  Disk,
double  EstServiceTimeOfDisk,
double  EstServiceTime[301] 
)

Definition at line 1996 of file Router.cpp.

01999 {
02000     for(int j = 0; j < 301; j++)
02001     {
02002         m_EstDiskServiceTime[ Disk ][ j ] = EstServiceTime[ j ];
02003     }
02004 
02005     m_EstDiskServiceTimeOfDisk[ Disk ] = EstServiceTimeOfDisk;
02006     pthread_mutex_lock(&m_MutexUpdated);
02007     m_UpdatedEstimatedDiskServiceTime++;
02008     pthread_cond_broadcast(&m_ConditionUpdated);
02009     pthread_mutex_unlock(&m_MutexUpdated);
02010 
02011     return;
02012 }

void CRouter::SetNATMapping ( NATData  server_map,
int  stor_id,
NATData  stor_map 
)

Definition at line 2111 of file Router.cpp.

02112 {
02113     m_NAT_map.insertElement( server_map, stor_id, stor_map );
02114 }

int CRouter::Start ( void   ) 

Definition at line 338 of file Router.cpp.

00339 {
00340     pthread_attr_t attrib;
00341 
00342     if( !m_initialized )
00343     {
00344         RioErr << "Router.Start: not initiallized" << endl;
00345         return ERROR_ROUTER + ERROR_NOT_INITIALIZED;
00346     }
00347     if( m_started )
00348     {
00349         RioErr << "Router.Start: already started" << endl;
00350         return ERROR_ROUTER + ERROR_STARTED;
00351     }
00352 
00353     pthread_attr_init( &attrib );
00354     pthread_attr_setstacksize( &attrib, 3*PTHREAD_STACK_MIN );
00355 
00356     if( pthread_create( &m_thread, &attrib, &routerthreadep, (void *)this ) )
00357     {
00358         RioErr << "Router.Start pthread_create failed "
00359                << strerror(errno) << endl;
00360         return (ERROR_ROUTER + ERROR_CREATE_THREAD);
00361     }
00362 
00363     m_started = true;
00364     return S_OK;
00365 }

int CRouter::Stop ( void   ) 

Definition at line 367 of file Router.cpp.

00368 {
00369     Event *event;
00370     
00371     if( !m_started )
00372     {
00373         RioErr << "Router.Stop: not started" << endl;
00374         return( ERROR_ROUTER + ERROR_NOT_STARTED );
00375     }
00376 
00377     // Coloca o evento do termino da thread na fila de eventos.
00378     event = EventManager.New( EventTypeFinalizeThread );
00379     Put( event );                                                  
00380 
00381     // ### int term should have timer event which will wake router
00382     //     soon so don't need to do anything other than set flag...
00383 
00384     pthread_join( m_thread, NULL );
00385 
00386     m_started = false;
00387     return S_OK;
00388 }

void CRouter::StorageDown ( int  StorageId,
bool  EmptyStorageQueue 
) [private]

Funcao para definir que um servidor de armazenamento esta temporariamente indisponivel.

Parameters:
ServerId identificador do servidor de armazenamento.
EmptyStorageQueue true se a fila do servidor de armazenamento deve ser esvaziada e false em caso contrario. Obs: a fila nao deve ser esvaziada se a mensagem e enviada devido ao servidor nao estiver habilitado quando o servidor de gerenciamento iniciar a sua execucao.

Definition at line 2157 of file Router.cpp.

02158 {
02159     StrDiskRequest *DiskRequest;
02160     RioStorageNodeInfo NodeInfo;
02161     RioStorageNodeInfo sn_info;
02162     char name[1025];
02163     int FirstDiskId;
02164     bool EmptyQueues;
02165     bool IsPending;
02166     // Obtem acesso exclusivo a variavel m_StoragesStatus. 
02167     pthread_mutex_lock( &m_StoragesStatusMutex );
02168     // Define o servidor StorageId como inativo.
02169     m_StoragesStatus = m_StoragesStatus | ( 1ull << StorageId );
02170     // Libera o acesso exclusivo a variavel m_StoragesStatus. 
02171     pthread_mutex_unlock( &m_StoragesStatusMutex );
02172     if( EmptyStorageQueue )
02173     {
02174         // Determina o numero de discos do servidor de armazenamento.
02175         m_DiskMgr->GetStorageNodeInfo( StorageId, &NodeInfo );
02176         // Calcula o identificador do primeiro disco do servidor de 
02177         // armazenamento. Ele sera igual ao Id do servidor multiplado pelo 
02178         // numero maximo de discos em cada servidor de armazanemento, dado por 
02179         // SNode::sn_maxdisks. Devemos somar mais 1 porque o identificador 0 e 
02180         // reservado.
02181         FirstDiskId = StorageId * SNode::sn_maxdisks + 1;
02182         // Remove as solicitacoes das filas de todos os discos do servidor de
02183         // armazenamento, e gera os codigos de erro esperados.
02184         for( int dk = FirstDiskId; dk < FirstDiskId + NodeInfo.NumberOfDisks; 
02185              dk++ )
02186         {
02187             EmptyQueues = true;
02188             // Remove as socilitacoes de todas as filas deste disco, gerando um
02189             // erro para o cliente
02190             while( EmptyQueues )
02191             {
02192                 // Primeiramente tentamos remover todas as solicitacoes de disco 
02193                 // da fila m_Pending, depois da fila m_RTqueue, e finalmente as 
02194                 // da fila m_NRTqueue.
02195                 if( m_nPending[ dk ] > 0 )
02196                 {
02197                     DiskRequest = m_Pending[ dk ].Get();
02198                     
02199                     #ifdef RIO_DEBUG2
02200                     RioErr << "CRouter::StorageDown removendo a requisicao "
02201                            << DiskRequest << " de m_Pending. m_Pending[ " << dk 
02202                            << " ] = " << m_nPending[ dk ] << endl;
02203                     #endif       
02204                            
02205                     m_nPending[ dk ]--;
02206                     // Verifica qual fila deve ser decrementada (pelo que notei, 
02207                     // sempre que algo e colocado em m_Pending, ou m_nRT[ dk ]
02208                     // e incrementado, ou m_nNRT[ dk ] e incrementado.
02209                     DataRequest& Request = DiskRequest->event->Request;
02210                     if( ( Request.Operation == NonRealTimeWrite ) ||
02211                         ( Request.Operation == NonRealTimeRead ) )
02212                         m_nNRT[dk]--;
02213                     else    
02214                         m_nRT[dk]--;
02215                     IsPending = true;
02216                 } 
02217                 else if( m_nRT[ dk ] > 0 )
02218                 {
02219                     DiskRequest = m_RTqueue[ dk ].Get();
02220                     
02221                     #ifdef RIO_DEBUG2
02222                     RioErr << "CRouter::StorageDown removendo a requisicao "
02223                            << DiskRequest << " de m_RTqueue. m_nRT[ " << dk 
02224                            << " ] = " << m_nRT[ dk ] << endl;
02225                     #endif       
02226                            
02227                     m_nRT[ dk ]--;
02228                     IsPending = false;
02229                 }
02230                 else if( m_nNRT[ dk ] > 0 )
02231                 {
02232                     DiskRequest = m_NRTqueue[ dk ].Get();
02233                     
02234                     #ifdef RIO_DEBUG2
02235                     RioErr << "CRouter::StorageDown removendo a requisicao "
02236                            << DiskRequest << " de m_NRTqueue. m_nNRT[ " << dk 
02237                            << " ] = " << m_nNRT[ dk ] << endl;
02238                     #endif       
02239                            
02240                     m_nNRT[ dk ]--;
02241                     IsPending = false;
02242                 }
02243                 else 
02244                    EmptyQueues = false; 
02245                 // Se ainda tivermos uma solicitacao de disco, retornamos o erro 
02246                 // ao cliente (informando que o servidor de armazanamento do 
02247                 // disco se tornou indisponivel).
02248                 if( ( EmptyQueues ) && ( DiskRequest != NULL ) )
02249                 {
02250                     // Obtem a solicitacao do cliente associada a solicitacao do 
02251                     // disco, a partir do evento armazenado em DiskRequest.
02252                     DataRequest& Request = DiskRequest->event->Request;
02253                     if( ( IsPending ) || 
02254                         ( ( Request.Operation != RealTimeRead ) &&
02255                           ( Request.Operation != RealTimePrefetchBlock ) ) )
02256                     {
02257                         // Cancela a requisicao (ou o pedido).
02258                         
02259                         #ifdef RIO_DEBUG2
02260                         RioErr << "CRouter::StorageDown removendo uma "
02261                                << "solicitacao  do disco " << dk << " do "
02262                                << "servidor de armazenamento " << StorageId 
02263                                << endl;
02264                         #endif       
02265                         // Define o codigo de erro para a requisicao. No caso da 
02266                         // operacao ser um cancelamento e BufferId for igual a 
02267                         // 1, devemos enviar uma resposta positiva (sem erros) 
02268                         // ao cliente. Em caso contrario, a resposta deve ser o 
02269                         // erro indicando que o servidor deixou de funcionar.
02270                         if( ( Request.Operation == RealTimeCancelBlock ) && 
02271                             ( Request.BufferId == -1 ) )
02272                             Request.Status = 0;
02273                         else
02274                             Request.Status = ERROR_ROUTER + 
02275                                         ERROR_SERVICE_TEMPORARY_UNAVAILABLE;
02276                         RequestCompleted( &Request.streamobj, ( Event * )
02277                                           DiskRequest->event );
02278 
02279                         // Isso e para evitar o envio de termino duplicado.
02280                         Request.RequestCancelled = true;        
02281                     } 
02282                     else
02283                     {
02284                         // Reenvia o evento para o Router.
02285                         #ifdef RIO_DEBUG2
02286                         RioErr << "CRouter::StorageDown Reenviando o evento "
02287                                << "com a requisicao para o Router" << endl;
02288                         #endif
02289                                
02290                         Put( ( Event *) DiskRequest->event );
02291                         
02292                     }
02293                     // Libera a requisicao do disco.
02294                     m_Request.Free( DiskRequest );
02295                 } 
02296                 else if( ( EmptyQueues ) && ( DiskRequest == NULL ) )
02297                     RioErr << "CRouter::StorageDown aviso DiskRequest == NULL"
02298                            << endl;
02299             }
02300         
02301             // Gera um evento para a classe de monitoramento, informando que as 
02302             // filas do disco ficaram vazias.
02303             m_DiskMgr->GetDiskName( dk, name );
02304             m_DiskMgr->GetDiskStorageNodeInfo( dk, &sn_info );
02305             UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent( 
02306                                                  sn_info.Hostname, name, 0 );
02307             m_SystemManager->PostITEvent( ( MonitorEvent * ) it_event );
02308         }
02309     }
02310 }

void CRouter::StorageReply ( MsgRSSstorage Msg  )  [private]

Definition at line 1293 of file Router.cpp.

01294 {
01295     StrDiskRequest*    Request;
01296     EventDataRequest*  event;
01297     DataRequest*       DataRequest;
01298 
01299     StrDiskRequest* NewRequest;
01300     int Disk;
01301     u16 Type;
01302     u32 Id;
01303     s32 Error;
01304     int ActiveThreads;
01305     // ------------------------------------------------------------------------
01306 
01307     switch( Message->Header.Type )
01308     {
01309         case MSG_RSS_READCOMPLETE:
01310         case MSG_RSS_WRITECOMPLETE:
01311         case MSG_RSS_RECEIVECOMPLETE:
01312         case MSG_RSS_SENDCOMPLETE:
01313         case MSG_RSS_READY:
01314         case MSG_RSS_CANCELCOMPLETE:
01315         {
01316             Type  = Message->Status.Type;
01317             Id    = Message->Status.RouterId;
01318             Error = Message->Status.Error;
01319             ActiveThreads = Message->Status.ActiveThreads;
01320             // ----------------------------------------------------------------
01321 
01322             Request = m_Request.Get( Id );
01323             if( Request == 0 )
01324             {
01325                 RioErr << "Router Error: Message from storage node"
01326                         " has invalid Router Id " << Id << endl;
01327                 return;
01328             }
01329 
01330             // Get data request event associated with disk request
01331             event = ( EventDataRequest* ) Request->event;
01332             DataRequest = & ( event->Request );
01333 
01334             // Status contains the first error detected
01335             // If more errors are detected they are ignored
01336             if( DataRequest->Status == 0 )
01337             {
01338                 DataRequest->Status = Error;
01339             }
01340 
01341             #ifdef RIO_DEBUG2
01342             struct in_addr clientip;
01343             clientip.s_addr = DataRequest->Target.IPaddress;
01344             RioErr << "Router:StorageReply => RouterId " << Id
01345                    << " StorageId "<< Message->Status.StorageId
01346                    << " Status " << DataRequest->Status
01347                    << " client IP " << inet_ntoa(clientip)
01348                    << " client Port " << ntohs( DataRequest->Target.Port )
01349                    << " block " << DataRequest->Block
01350                    << " BufferId " << DataRequest->BufferId
01351                    << " Msg->Error " << Error
01352                    << " RequestCounter " << DataRequest->RequestCounter
01353                    << endl;
01354             #endif
01355 
01356             Disk = Request->Disk;
01357             
01358             // Verifica se o disco perterce a um servidor de armazenamento que
01359             // acabou de parar de funcionar. 
01360             if( !CheckStorageStatus( Disk ) )
01361             {
01362                 RioErr << "Router Error: Message from storage node that is "
01363                           "down " << Id << endl;
01364                 // Decrementa o contador de solicitacoes, removendo a 
01365                 // solicitacao do servidor de armazenamento que caiu.
01366                 DataRequest->RequestCounter--;       
01367                 return;
01368                         
01369             }
01370 
01371             if( ( Type == MSG_RSS_READCOMPLETE ) && ( m_CollectMeasures ) )
01372             {
01373                 struct timeval now;
01374                 gettimeofday( &now, 0 );
01375                 UpdateMeasures( Disk, ActiveThreads,
01376                                 Request->event->Request.ArrivalTime, now );
01377             }
01378             // ----------------------------------------------------------------
01379 
01380             // Remove request from pending queue
01381             m_Pending[Disk].Remove(Request);
01382 
01383             // Check if there are queued requests not sent to storage node
01384             // and send one of them
01385             if( (( m_nRT[Disk] + m_nNRT[Disk] ) > m_nPending[Disk] ) &&
01386                 ( m_nPending[Disk] < m_MaxPending ))
01387             {
01388                 // Try to send real time request first
01389                 NewRequest = m_RTqueue[Disk].Get();
01390                 // If no RT request available, send a non RT one
01391                 // There should be at least one (if condition above)
01392                 if( NewRequest == 0 )
01393                 {
01394                     RioErr <<" Could not get next request from RT queue"
01395                            << endl;
01396                     NewRequest = m_NRTqueue[Disk].Get();
01397                 }
01398                 m_Pending[Disk].Put( NewRequest );
01399                 SendStorage( NewRequest );
01400             }
01401             // If no new disk request sent decrement number of pending
01402             // requests
01403             else
01404             {
01405                 m_nPending[Disk]--;
01406             }
01407 
01408             // Decrement size of queue
01409             if( ( DataRequest->Operation == NonRealTimeRead ) ||
01410                 ( DataRequest->Operation == NonRealTimeWrite)  )
01411             {
01412                 m_nNRT[Disk]--;
01413             }
01414             else
01415             {
01416                 if( DataRequest->Operation != RealTimeSendBlock )
01417                 {
01418                       m_nRT[Disk]--;
01419                 }
01420             }
01421 
01422             char name[1025];
01423             m_DiskMgr->GetDiskName( Disk, name );
01424 
01425             RioStorageNodeInfo sn_info;
01426             m_DiskMgr->GetDiskStorageNodeInfo( Disk, &sn_info );
01427 
01428             int queue_sum = m_nRT[ Disk ]
01429                           + m_nNRT[ Disk ]
01430                           + m_nPending[ Disk ];
01431 
01432             UpdateDiskQueueEvent *it_event = new UpdateDiskQueueEvent(
01433                 sn_info.Hostname, name, queue_sum );
01434             m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
01435 
01436             // Check if this is the last message storage node will send for
01437             // this disk request ( any message with an error code is the last)
01438             // ( success I/O is ended with
01439             // SENDCOMPLETE(read) or WRITECOMPLETE(write) )
01440             if( ( Type == MSG_RSS_SENDCOMPLETE )  ||
01441                 ( Type == MSG_RSS_WRITECOMPLETE ) ||
01442                 ( Type == MSG_RSS_CANCELCOMPLETE )||
01443                 ( Error != 0) )
01444             {
01445 
01446                 // if error != 0 and bufferid == -1  so prefetch failed
01447                 if( ( DataRequest->BufferId == -1 ) &&
01448                     ( Type == MSG_RSS_READCOMPLETE ) )
01449                 {
01450                     RioErr << "error at prefetch!! Error: "<< Error << endl;
01451 
01452                     DataRequest->streamobj->Stream()->getMutexUpdateEvent();
01453                     // updates StorageId and BufferId
01454                     Request->StorageId = Message->Status.StorageId;
01455                     DataRequest->BufferId = Id;
01456                     #ifdef RIO_DEBUG2
01457                     RioErr << "Router:StorageReply => new BufferId "  << Id
01458                            << " and StorageId " << Request->StorageId << endl;
01459                     #endif
01460                     // verifies if the block was canceled
01461                     if( DataRequest->Operation == RealTimeCancelBlock )
01462                     {
01463                         #ifdef RIO_DEBUG2
01464                         RioErr << "Block was canceled." << endl;
01465                         #endif
01466                         DataCancel( (Event *) event );
01467                         DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01468                     }
01469                     else
01470                     {
01471                         // tells a new block arrived
01472                         DataRequest->streamobj->Stream()->NewBlockArrival();
01473                         // verifies if the client requested the block
01474                         if( ( DataRequest->Target.IPaddress != 0 ) &&
01475                            ( DataRequest->Target.Port != 0 ))
01476                         {
01477                             #ifdef RIO_DEBUG2
01478                             RioErr << "Block was requested by client (cancel it)."
01479                                    << endl;
01480                             #endif
01481                             DataRequest->Operation = RealTimeCancelBlock;
01482                             DataCancel( (Event *) event );
01483 
01484                             //update tokenstosend
01485                             DataRequest->streamobj->Stream()->decTokensToSend();
01486                             //update buffer status
01487                             DataRequest->streamobj->Stream()->decServerBufferStatus();
01488                         }
01489                         DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01490                     }
01491                 }
01492                 // ------------------------------------------------------------
01493                 else
01494                 {
01495                     m_Request.Free(Request);
01496                     
01497                     #ifdef RIO_DEBUG2
01498                     RioErr << "CRouter::StorageReply requisicao " 
01499                            << DataRequest->RequestCounter << " recebida!" 
01500                            << endl;
01501                     #endif       
01502 
01503                     DataRequest->RequestCounter--;
01504                     // Check if we are done (all disk requests for this data
01505                     // request completed)
01506                     // Obs: RequestCompleted somente deve ser chamada se a 
01507                     // requisicao nao foi cancelada (porque um dos discos 
01508                     // associados a ela pertence a um servidor de armazenamento
01509                     // que ficou indisponivel) pois neste caso a funcao ja
01510                     // foi chamada.
01511                     if( ( DataRequest->RequestCounter == 0 ) &&
01512                         ( !DataRequest->RequestCancelled ) )
01513                     {
01514                         
01515                         #ifdef RIO_DEBUG2
01516                         RioErr << "CRouter::StorageReply operation " 
01517                                << DataRequest->Operation << " completed "
01518                                << "(DataRequest->RequestCounter == 0)" << endl;
01519                         #endif
01520                                 
01521                         RequestCompleted( &DataRequest->streamobj,
01522                                           ( Event* ) event );
01523                     }
01524                 }
01525             }
01526             // Not last message from storage node for this disk request
01527             else
01528             {
01529                 if( DataRequest->BufferId == -1 )
01530                 {
01531                     DataRequest->streamobj->Stream()->getMutexUpdateEvent();
01532                     // updates StorageId and BufferId
01533                     Request->StorageId = Message->Status.StorageId;
01534                     DataRequest->BufferId = Id;
01535                     #ifdef RIO_DEBUG2
01536                     RioErr << "Router:StorageReply => new BufferId "  << Id
01537                            << " and StorageId " << Request->StorageId << endl;
01538                     #endif
01539 
01540                     // verifies if the block was canceled
01541                     if( DataRequest->Operation == RealTimeCancelBlock )
01542                     {
01543                         #ifdef RIO_DEBUG2
01544                         RioErr << "Block was canceled." << endl;
01545                         #endif
01546 
01547                         DataCancel( (Event *) event );
01548                         DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01549                     }
01550                     else
01551                     {
01552                         // tells a new block arrived
01553                         DataRequest->streamobj->Stream()->NewBlockArrival();
01554 
01555                         // verifies if the client requested the block
01556                         if( ( DataRequest->Target.IPaddress != 0 )&&
01557                            ( DataRequest->Target.Port != 0))
01558                         {
01559                             #ifdef RIO_DEBUG2
01560                             RioErr << "Block was requested by client." << endl;
01561                             #endif
01562                             DataRequest->Operation = RealTimeSendBlock;
01563                             DataSendToClient( (Event *) event );
01564                         }
01565                         DataRequest->streamobj->Stream()->releaseMutexUpdateEvent();
01566                     }
01567                 }
01568                 // ------------------------------------------------------------
01569             }
01570             break;
01571         }
01572         default:
01573             RioErr << "Router ERROR: Received invalid message from storage node."
01574                    << endl;
01575     }
01576 }

void CRouter::StorageUp ( int  StorageId  )  [private]

Funcao para definir que um servidor de armazenamento voltou a estar disponivel.

Parameters:
ServerId identificador do servidor de armazenamento.

Definition at line 2314 of file Router.cpp.

02315 {
02316     // Obtem acesso exclusivo a variavel m_StoragesStatus. 
02317     pthread_mutex_lock( &m_StoragesStatusMutex );
02318     // Invalida todos os pares IP, porta dos clientes conectados associados a
02319     // este servidor de armazenamento. Precisamos somar 1 ao identificador 
02320     // porque o identificador 0 e usado pelo mapeamento do servidor de 
02321     // gerenciamento.
02322     m_SystemManager->InvalidateStorageNatMappings( StorageId + 1 );
02323     // Define o servidor StorageId como ativo.
02324     m_StoragesStatus = m_StoragesStatus & ( ~( 1ull << StorageId ) );
02325     // Libera o acesso exclusivo a variavel m_StoragesStatus. 
02326     pthread_mutex_unlock( &m_StoragesStatusMutex );
02327 }

void CRouter::UpdateDiskQueueTime ( double  EstQueueTimeOfDisk[100],
double  EstQueueTime[100][301] 
)

Definition at line 2072 of file Router.cpp.

02074 {
02075     int num = GetMaxNumberOfDisks();
02076 
02077     for( int i = 1; i < num; i++ )
02078     {
02079         for(int j = 0; j < 301; j++)
02080         {
02081             EstimatedQueueTime[i][j] = m_EstDiskQueueTime[i][j];
02082         }
02083         EstimatedQueueTimeOfDisk[i] = m_EstDiskQueueTimeOfDisk[i];
02084     }
02085 
02086     return;
02087 }

void CRouter::UpdateDiskResponseTime ( double  EstResponseTimeOfDisk[100],
double  EstResponseTime[100][301],
double  DevDiskResponseTimeOfDisk[100],
double  DevDiskResponseTime[100][301] 
)

Definition at line 2044 of file Router.cpp.

02048 {
02049     int num = GetMaxNumberOfDisks();
02050 
02051     for( int i = 1; i < num; i++ )
02052     {
02053         for(int j = 0; j < 301; j++)
02054         {
02055             if( m_EstDiskServiceTime[i][j] > m_EstDiskResponseTime[i][j] )
02056                 // server was initialized now
02057                 m_EstDiskResponseTime[i][j] = m_EstDiskServiceTime[i][j];
02058 
02059             EstimatedResponseTime[i][j] = m_EstDiskResponseTime[i][j];
02060             DevResponseTime[i][j] = m_DevDiskResponseTime[i][j];
02061         }
02062         EstimatedResponseTimeOfDisk[i] = m_EstDiskResponseTimeOfDisk[i];
02063         DevResponseTimeOfDisk[i] = m_DevDiskResponseTimeOfDisk[i];
02064     }
02065 
02066     return;
02067 }

void CRouter::UpdateDiskServiceTime ( double  EstServiceTimeOfDisk[100],
double  EstServiceTime[100][301] 
)

Definition at line 2016 of file Router.cpp.

02018 {
02019     int num  = GetMaxNumberOfDisks();
02020     int numd = GetNumberOfActiveDisks();
02021 
02022     //RioErr << "Router:UpdateDiskServiceTime " << numd << " of " << num <<endl;
02023 
02024     while( m_UpdatedEstimatedDiskServiceTime < numd )
02025     {
02026         pthread_mutex_lock( &m_MutexUpdated );
02027         pthread_cond_wait( &m_ConditionUpdated, &m_MutexUpdated );
02028         pthread_mutex_unlock( &m_MutexUpdated );
02029     }
02030 
02031     for( int i = 1; i < num; i++ )
02032     {
02033         for(int j = 0; j < 301; j++)
02034         {
02035             EstimatedServiceTime[i][j] = m_EstDiskServiceTime[i][j];
02036         }
02037         EstimatedServiceTimeOfDisk[i] = m_EstDiskServiceTimeOfDisk[i];
02038     }
02039 
02040     m_UpdatedEstimatedDiskServiceTime = 0;
02041     return;
02042 }

void CRouter::UpdateMeasures ( int  disk,
int  nThreads,
struct timeval  initial_time,
struct timeval  final_time 
)

Definition at line 1233 of file Router.cpp.

01237 {
01238     double sample_msec = getInterval( initial_time, final_time );
01239 
01240     if( nThreads > m_MaxPending )
01241     {
01242         nThreads = m_MaxPending;
01243     }
01244     else if( nThreads < 1  )
01245     {
01246         RioErr << " ActiveThreads " << nThreads << endl;
01247         nThreads = 1;
01248     }
01249 
01250     if( m_EstDiskResponseTime[ disk ][ nThreads ] == 0 )
01251     {
01252         m_EstDiskResponseTime[ disk ][ nThreads ] = sample_msec;
01253     }
01254     else
01255     {
01256         m_EstDiskResponseTime[ disk ][ nThreads ] =
01257                                                   (( 1 - m_EstimatedParameter) *
01258                                    m_EstDiskResponseTime[ disk ][ nThreads ] ) +
01259                                            m_EstimatedParameter * sample_msec;
01260     }
01261 
01262     //updates Dev of estimated response time of disk
01263     m_DevDiskResponseTime[ disk ][ nThreads ] =  ( 1 - m_EstimatedParameter ) *
01264                                     m_DevDiskResponseTime[ disk ][ nThreads ] +
01265                                                          m_EstimatedParameter *
01266                fabs( sample_msec - m_EstDiskResponseTime[ disk ][ nThreads ] );
01267 
01268     if( m_EstDiskResponseTimeOfDisk[ disk ] == 0 )
01269     {
01270         m_EstDiskResponseTimeOfDisk[ disk ] = sample_msec;
01271     }
01272     else
01273     {
01274         m_EstDiskResponseTimeOfDisk[ disk ] =     ( 1 - m_EstimatedParameter) *
01275                                           m_EstDiskResponseTimeOfDisk[ disk ] +
01276                                           m_EstimatedParameter * sample_msec;
01277     }
01278 
01279     m_DevDiskResponseTimeOfDisk[ disk ] =        ( 1 - m_EstimatedParameter ) *
01280                                           m_DevDiskResponseTimeOfDisk[ disk ] +
01281                                                          m_EstimatedParameter *
01282                       fabs( sample_msec - m_EstDiskResponseTimeOfDisk[ disk ]);
01283 
01284     #ifdef __QUEUE_RESP_LOG
01285     m_logRESP << " disk "   << disk
01286               << " sample " << sample_msec
01287               << " est "    << m_EstDiskResponseTimeOfDisk[ disk ]
01288               << endl;
01289     #endif
01290 }


Field Documentation

int CRouter::m_BlockSize [private]

Definition at line 100 of file Router.h.

Definition at line 78 of file Router.h.

pthread_cond_t CRouter::m_ConditionUpdated [private]

Definition at line 82 of file Router.h.

double CRouter::m_DevDiskResponseTime[100][301] [private]

Definition at line 68 of file Router.h.

double CRouter::m_DevDiskResponseTimeOfDisk[100] [private]

Definition at line 69 of file Router.h.

Definition at line 90 of file Router.h.

double CRouter::m_EstDiskQueueTime[100][301] [private]

Definition at line 74 of file Router.h.

double CRouter::m_EstDiskQueueTimeOfDisk[100] [private]

Definition at line 75 of file Router.h.

double CRouter::m_EstDiskResponseTime[100][301] [private]

Definition at line 66 of file Router.h.

double CRouter::m_EstDiskResponseTimeOfDisk[100] [private]

Definition at line 67 of file Router.h.

double CRouter::m_EstDiskServiceTime[100][301] [private]

Definition at line 61 of file Router.h.

double CRouter::m_EstDiskServiceTimeOfDisk[100] [private]

Definition at line 62 of file Router.h.

Definition at line 79 of file Router.h.

bool CRouter::m_initialized [private]

Definition at line 87 of file Router.h.

ofstream CRouter::m_logRESP [private]

Definition at line 84 of file Router.h.

ofstream CRouter::m_logSQT [private]

Definition at line 84 of file Router.h.

int CRouter::m_MaxPending [private]

Definition at line 103 of file Router.h.

int CRouter::m_MaxQueueSize [private]

Definition at line 108 of file Router.h.

pthread_mutex_t CRouter::m_MutexUpdated [private]

Definition at line 81 of file Router.h.

Definition at line 194 of file Router.h.

int CRouter::m_nDisks [private]

Definition at line 98 of file Router.h.

int* CRouter::m_nNRT [private]

Definition at line 115 of file Router.h.

int* CRouter::m_nPending [private]

Definition at line 116 of file Router.h.

int* CRouter::m_nRT [private]

Definition at line 114 of file Router.h.

Definition at line 111 of file Router.h.

Definition at line 112 of file Router.h.

Definition at line 94 of file Router.h.

Definition at line 119 of file Router.h.

Definition at line 110 of file Router.h.

bool CRouter::m_started [private]

Definition at line 88 of file Router.h.

unsigned long long int CRouter::m_StoragesStatus [private]

Definition at line 127 of file Router.h.

pthread_mutex_t CRouter::m_StoragesStatusMutex [private]

Definition at line 133 of file Router.h.

Definition at line 91 of file Router.h.

Definition at line 122 of file Router.h.

pthread_t CRouter::m_thread [private]

Definition at line 86 of file Router.h.

Definition at line 77 of file Router.h.


The documentation for this class was generated from the following files:
Generated on Wed Jul 4 16:03:33 2012 for RIO by  doxygen 1.6.3