00001 /* 00002 * Copyright (C) 2009, Edmundo Albuquerque de Souza e Silva. 00003 * 00004 * This file may be distributed under the terms of the Q Public License 00005 * as defined by Trolltech AS of Norway and appearing in the file 00006 * LICENSE.QPL included in the packaging of this file. 00007 * 00008 * THIS FILE IS PROVIDED AS IS WITH NO WARRANTY OF ANY KIND, INCLUDING 00009 * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR 00010 * PURPOSE. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, 00011 * INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING 00012 * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 00013 * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION 00014 * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 00015 * 00016 * Thanks: Jose Renato Santos 00017 * 00018 */ 00019 00020 //////////////////////////////////////////////////////////////////////////////// 00021 // Router.h: Declaration of the CRouter 00022 //////////////////////////////////////////////////////////////////////////////// 00023 00024 #ifndef __ROUTER_H_ 00025 #define __ROUTER_H_ 00026 00027 #include "RioTypes.h" 00028 #include "ServerTypes.h" 00029 #include "RioQueue.h" 00030 #include "SystemManager.h" 00031 00032 #include "msgrouterstorage.h" 00033 #include "diskrequest.h" 00034 #include "Event.h" 00035 #include "NATMap.h" 00036 00037 #include <fstream> 00038 #include <pthread.h> 00039 00040 using namespace std; 00041 00042 //////////////////////////////////////////////////////////////////////////////// 00043 // Forward declaration of needed classes 00044 00045 class RioStreamObj; 00046 class RioObject; 00047 class DiskMgr; 00048 00049 /////////////////////////////////////////////////////////////////////////////// 00050 struct RouterConfig; 00051 00052 struct Rdisk; // router view of disk status 00053 00054 // CRouter 00055 class CRouter 00056 { 00057 private: 00058 // for each disk, estimated service time 00059 // according number of active threads 00060 // calculated at each storage 00061 double m_EstDiskServiceTime[100][301]; 00062 double m_EstDiskServiceTimeOfDisk[100]; 00063 00064 // for each disk, estimated total response time 00065 // according number of active threads 00066 double m_EstDiskResponseTime[100][301]; 00067 double m_EstDiskResponseTimeOfDisk[100]; 00068 double m_DevDiskResponseTime[100][301]; 00069 double m_DevDiskResponseTimeOfDisk[100]; 00070 00071 // for each disk, estimated queue time 00072 // according queue size at arrival 00073 // calculated at Router 00074 double m_EstDiskQueueTime[100][301]; 00075 double m_EstDiskQueueTimeOfDisk[ 100 ]; 00076 00077 int m_UpdatedEstimatedDiskServiceTime; 00078 bool m_CollectMeasures; 00079 double m_EstimatedParameter; 00080 00081 pthread_mutex_t m_MutexUpdated; 00082 pthread_cond_t m_ConditionUpdated; 00083 00084 ofstream m_logSQT, m_logRESP; 00085 00086 pthread_t m_thread; 00087 bool m_initialized; 00088 bool m_started; 00089 00090 DiskMgr *m_DiskMgr; 00091 CSystemManager *m_SystemManager; 00092 00093 // router inbound queue 00094 EventQueue m_Queue; 00095 00096 // Maximum number of disks 00097 // (some disks may not exist or may be inactive) 00098 int m_nDisks; 00099 // Block Size 00100 int m_BlockSize; 00101 // maximum number of pending requests per disk (i.e. maximum 00102 // number of requests sent to storage node for this disk) 00103 int m_MaxPending; 00104 00105 // Maximum number of elements on a disk queue 00106 // Inactive disk queues are set to have this size plus 1, such that 00107 // no request are queued there 00108 int m_MaxQueueSize; 00109 // Disk queues 00110 CDiskRequestQueue* m_RTqueue; 00111 CDiskRequestQueue* m_NRTqueue; 00112 CDiskRequestQueue* m_Pending; 00113 // Sizes of disk queues 00114 int* m_nRT; 00115 int* m_nNRT; 00116 int* m_nPending; 00117 00118 // disk request pool 00119 CDiskRequestManager m_Request; 00120 00121 // Temporary storage space for disk requests for current data request 00122 StrDiskRequest** m_TempDiskRequest; 00123 00124 // Nova variavel indicando quais servidores de armazenamento estao 00125 // inativos. Esta variavel e de 64 bits, e cada bit i indica o estado 00126 // (1 inativo e 0 ativo) do servidor de armazenamento com a ID i. 00127 unsigned long long int m_StoragesStatus; 00128 // Mutex para garantir acesso exclusivo a variavel m_StoragesStatus 00129 // (esta variavel e acessada pela thread principal do servidor (ao ver 00130 // se pode tratar dos pedidos dos clientes) a partir da funcao 00131 // GetStoragesStatus, e pelas funcoes chamadas a partir da 00132 // thread routerthread. 00133 pthread_mutex_t m_StoragesStatusMutex; 00134 00135 // Method that process messages received from storage nodes 00136 void StorageReply( MsgRSSstorage* Msg ); 00137 // Method that makes a disk active 00138 void AddDisk( int Disk ); 00139 // Method that sends disk request to storage node 00140 void SendStorage( StrDiskRequest *Request ); 00141 00142 // Free members space 00143 void CleanUp(); 00144 00145 // Get element from router input queue 00146 Event* Get() { return m_Queue.Get(); } 00147 00148 // Thread routine 00149 static void *routerthreadep( void *param ); 00150 void routerthread(); 00151 00152 // Methods that process data requests received from client 00153 void DataReq( Event *ep ); 00154 00155 void DataPrefetch( Event *event ); 00156 void DataSendToClient( Event *event ); 00157 void DataCancel( Event *event ); 00158 00159 // Temporary for debuging 00160 void PrintQueues(); 00161 00162 // Novas funcoes usada pela implementacao do tratamento da queda e do 00163 // retorno dos servidores de armazenamento. 00164 /** 00165 * Funcao para definir que um servidor de armazenamento esta 00166 * temporariamente indisponivel. 00167 * @param ServerId identificador do servidor de armazenamento. 00168 * @param EmptyStorageQueue true se a fila do servidor de armazenamento 00169 * deve ser esvaziada e false em caso contrario. 00170 * Obs: a fila nao deve ser esvaziada se a mensagem e enviada devido ao 00171 * servidor nao estiver habilitado quando o servidor de gerenciamento 00172 * iniciar a sua execucao. 00173 */ 00174 void StorageDown( int StorageId, bool EmptyStorageQueue ); 00175 /** 00176 * Funcao para definir que um servidor de armazenamento voltou a estar 00177 * disponivel. 00178 * @param ServerId identificador do servidor de armazenamento. 00179 */ 00180 void StorageUp( int StorageId ); 00181 /** 00182 * Funcao usada para verificar se o servidor de armazenamento com uma 00183 * dada ID esta ou nao parado. 00184 * @param DiskId identificador do disco do servidor de armazenamento. 00185 * @return true se o servidor esta funcionando ou false se o servidor 00186 * parou de funcionar. 00187 */ 00188 bool CheckStorageStatus( int DiskId ); 00189 // -------------------------------------------------------------------- 00190 public: 00191 // Heap that stores the NAT mappings of all clients 00192 // For all clients behind NAT, there must be one entry on the heap 00193 // for each storage and an additional entry for the server 00194 NATMap m_NAT_map; 00195 00196 CRouter(); 00197 ~CRouter(); 00198 00199 // Initializing, finalizing 00200 // Alteracao da funcao initializa da classe CRouter. Agora o parametro 00201 // DiskManager sera passado na estrutura RouterConfig. 00202 int Initialize( RouterConfig *Config ); 00203 int Start(); 00204 int Stop(); 00205 00206 // used to send event to Router thread from other threads 00207 void Put( Event *ep ); 00208 00209 int GetMaxNumberOfDisks(); 00210 int GetNumberOfActiveDisks(); 00211 int GetDiskServiceTime( EventStorageRequest* event, u16 DiskId ); 00212 void SetDiskServiceTime ( int Disk, 00213 double EstServiceTimeOfDisk, 00214 double EstServiceTime[301] ); 00215 void UpdateDiskServiceTime ( double EstServiceTimeOfDisk[100], 00216 double EstServiceTime[100][301]); 00217 void UpdateDiskResponseTime( double EstResponseTimeOfDisk[100], 00218 double EstResponseTime[100][301], 00219 double DevDiskResponseTimeOfDisk[100], 00220 double DevDiskResponseTime[100][301]); 00221 void UpdateDiskQueueTime ( double EstQueueTimeOfDisk[100], 00222 double EstQueueTime[100][301]); 00223 void UpdateMeasures( int disk, 00224 int nThreads, 00225 struct timeval initial_time, 00226 struct timeval final_time ); 00227 00228 long double GetEstimatedQueueTimeOfAllDisks(); 00229 long double GetEstimatedDiskQueueTime( int disk ); 00230 00231 void SetNATMapping( NATData server_map, int stor_id, NATData stor_map ); 00232 00233 // Funcoes usadas pela implementacao que permite clientes atras de NAT. 00234 00235 /** 00236 * Funcao para obter um mapeamento passado como parametro identificado 00237 * pelo par IP, porta do servidor de gerenciamento e o identificador do 00238 * servidor de despacho. 00239 * @param server_map estrutura com o par IP, porta do servidor 00240 * @param stor_id identificador do servidor de armazenamento. 00241 */ 00242 NATData GetNATMapping( NATData server_map, int stor_id ); 00243 /** 00244 * Funcao para remover um mapeamento passado como parametro identificado 00245 * pelo par IP, porta do servidor de gerenciamento e o identificador do 00246 * servidor de despacho. 00247 * @param server_map estrutura com o par IP, porta do servidor 00248 * @param stor_id identificador do servidor de armazenamento. 00249 * @return mapeamento para o servidor de armazenamenro. 00250 */ 00251 void RemoveNATMapping( NATData server_map, int stor_id ); 00252 #ifdef RIO_DEBUG2 00253 /** 00254 * Funcao para imprimir o vetor atual de mapeamento. 00255 */ 00256 void PrintNATMapping( void ); 00257 #endif 00258 00259 /** 00260 * Funcao para verificar se um mapeamento esta presente no mapa com os 00261 * mapeamentos. 00262 * 00263 * TODO: Remover a funcao quando a RioNeti e a NetMgr nao forem mais 00264 * usadas. 00265 * 00266 * @param server_map estrutura com o par IP, porta do servidor 00267 * @param stor_id identificador do servidor de armazenamento. 00268 * @return true se o mapeamento esta presente ou false caso o mapeamento 00269 * nao esteja presente. 00270 */ 00271 bool FindNATMapping( NATData server_map, int stor_id ); 00272 00273 // Nova funcao usada pela implementacao do tratamento da queda e do 00274 // retorno dos servidores de armazenamento. 00275 /** 00276 * Deternima o vetor de bits com os servidores de armazenamento que nao 00277 * podem ser usados pelo cliente com endereco passado como parametro, 00278 * isto e, que estao inativos ou que foram invalidados (apos se tornarem 00279 * novamente ativos). 00280 * @param ClientAddr endereco UDP do cliente. Se os campos da estrutura 00281 * NATData forem ambos 0, somente iremos considerar se os servidores 00282 * estao ou nao ativos. Em caso contrario, tambem vamos contar os 00283 * servidores que nao podem ser usados pelo cliente. 00284 * @return vetor com as informacoes dos storages inativos. Cada bit esta 00285 * associado a um servidor de armazenamento e, quando igual a 1, indica 00286 * que o servidor associado ao bit nao pode ser usado (e 0 quando pode 00287 * ser usado). 00288 */ 00289 unsigned long long int GetInvalidStorages( NATData ClientAddr ); 00290 /** 00291 * Deternima se podemos ler ou escrever blocos, no nomento, para um 00292 * dado cliente cujo endereco e passado como parametro. 00293 * @param ClientAddr endereco UDP do cliente. Se os campos da estrutura 00294 * NATData forem ambos 0, somente iremos considerar se os servidores 00295 * estao ou nao ativos. Em caso contrario, tambem vamos contar os 00296 * servidores que nao podem ser usados pelo cliente. 00297 * @param CanRead ponteiro para um valor booleano que informa se uma 00298 * leitura de um bloco pode ser feita. Consideraremos que uma leitura 00299 * nao podera ser feita se nao for possivel garantir que um bloco pode 00300 * ser lido, isto e, se o numero de servidores de armazenamento inativos 00301 * for maior ou igual ao numero de replicacoes. 00302 * @param CanWrite ponteiro para um valor booleano que informa se uma 00303 * escrita de um bloco pode ser feita. Consideraremos que uma escrita 00304 * nao podera ser feita se nao for possivel garantir um disco em um 00305 * servidor de armazenamento diferente para cada replicacao do bloco, 00306 * isto e, se o numero de servidores de armazenamento ativos for menor 00307 * do que o numero de replicacoes. 00308 */ 00309 void GetStorageInfo( NATData ClientAddr, bool *CanRead, 00310 bool *CanWrite ); 00311 /** 00312 * Funcao para obter o numero de storage nodes. 00313 * @param NumberOfStorageNodes ponteiro para um inteiro nao sinalizado 00314 * que armazenara o numero de storages. 00315 * @return S_OK se nenhum erro ocorreu ao obter o numero de storages, 00316 * ou um valor diferente de S_OK em caso contrario. 00317 */ 00318 int GetNumberOfStorageNodes( unsigned int *NumberOfStorageNodes ); 00319 00320 /** 00321 * Funcao para completar uma requisicao (chamando a RequestCompleted 00322 * do objeto RioStreamObj passado como parametro) removendo, quando 00323 * necessario, o objeto do stream e alterando o ponteiro passado como 00324 * parametro para NULL. 00325 * 00326 * Obs: Se o objeto passado for NULL, indicando que o objeto foi 00327 * incorretamente removido, um erro sera impresso no log do servidor 00328 * informando deste erro. 00329 * 00330 * @param StreamObjAddr ponteiro com o endereco com o ponteiro para o 00331 * objeto para o qual deveremos chamar a funcao RequestCompleted (isso e 00332 * necessario porque, se removermos o objeto, o ponteiro para ele devera 00333 * ser alterado para NULL). 00334 * @param event ponteiro do evento a ser passado a funcao 00335 * RequestCompleted. 00336 */ 00337 void RequestCompleted( RioStreamObj **StreamObjAddr, Event *event ); 00338 }; 00339 #endif //__ROUTER_H_