00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #define CLIENT_LIST_EVENT 0
00021 #define CONNECT_EVENT CLIENT_LIST_EVENT + 1
00022 #define UPDATE_QUEUE_EVENT CLIENT_LIST_EVENT + 2
00023 #define DISCONNECT_EVENT CLIENT_LIST_EVENT + 3
00024
00025 #define STORAGE_LIST_EVENT CLIENT_LIST_EVENT + 4
00026 #define ADD_NODE_EVENT 1
00027 #define ADD_DISK_EVENT 2
00028 #define UPDATE_DISK_QUEUE_EVENT 3
00029
00030 #define ALL_DISKS "All disks"
00031
00032 #include "SessionManager.h"
00033 #include <sstream>
00034
00035
00036 #include <unistd.h>
00037 #include <sys/syscall.h>
00038
00039
00040 MonitorEvent::MonitorEvent( int type )
00041 {
00042 this->type = type;
00043 }
00044
00045 MonitorEvent::~MonitorEvent()
00046 {
00047 }
00048
00049 int MonitorEvent::getType()
00050 {
00051 return type;
00052 }
00053
00054
00055 ClientListEvent::ClientListEvent( char *ip, unsigned short port, int value )
00056 : MonitorEvent( CLIENT_LIST_EVENT + value )
00057 {
00058 this->ip = strdup( ip );
00059 this->port = port;
00060 }
00061
00062 ClientListEvent::~ClientListEvent()
00063 {
00064 free( ip );
00065 }
00066
00067 char *ClientListEvent::getIP()
00068 {
00069 return ip;
00070 }
00071
00072 int ClientListEvent::getPort()
00073 {
00074 return port;
00075 }
00076
00077
00078 ConnectEvent::ConnectEvent( char *ip, unsigned short port, char *video )
00079 : ClientListEvent( ip, port, CONNECT_EVENT )
00080 {
00081 this->video = strdup( video );
00082 }
00083
00084 ConnectEvent::~ConnectEvent()
00085 {
00086 free( video );
00087 }
00088
00089 char *ConnectEvent::getVideo()
00090 {
00091 return video;
00092 }
00093
00094
00095 UpdateQueueEvent::UpdateQueueEvent( char *ip, unsigned short port, int queue )
00096 : ClientListEvent( ip, port, UPDATE_QUEUE_EVENT )
00097 {
00098 this->queue = queue;
00099 }
00100
00101 int UpdateQueueEvent::getQueue()
00102 {
00103 return queue;
00104 }
00105
00106
00107 DisconnectEvent::DisconnectEvent( char *ip, unsigned short port, char* video )
00108 : ClientListEvent( ip, port, DISCONNECT_EVENT )
00109 {
00110 this->video = strdup( video );
00111 }
00112
00113 DisconnectEvent::~DisconnectEvent()
00114 {
00115 free( video );
00116 }
00117
00118 char *DisconnectEvent::getVideo()
00119 {
00120 return video;
00121 }
00122
00123
00124 StorageListEvent::StorageListEvent( char *hostname, int value )
00125 : MonitorEvent( STORAGE_LIST_EVENT + value )
00126 {
00127 this->hostname = strdup( hostname );
00128 }
00129
00130 StorageListEvent::~StorageListEvent()
00131 {
00132 free( hostname );
00133 }
00134
00135 char *StorageListEvent::getHostName()
00136 {
00137 return hostname;
00138 }
00139
00140
00141 AddNodeEvent::AddNodeEvent( char *hostname, char *ip )
00142 : StorageListEvent( hostname, ADD_NODE_EVENT )
00143 {
00144 this->ip = strdup( ip );
00145 }
00146
00147 AddNodeEvent::~AddNodeEvent()
00148 {
00149 free( ip );
00150 }
00151
00152 char *AddNodeEvent::getIP()
00153 {
00154 return ip;
00155 }
00156
00157
00158 AddDiskEvent::AddDiskEvent( char *hostname, char *diskname )
00159 : StorageListEvent( hostname, ADD_DISK_EVENT )
00160 {
00161 this->diskname = strdup( diskname );
00162 }
00163
00164 AddDiskEvent::~AddDiskEvent()
00165 {
00166 free( diskname );
00167 }
00168
00169 char *AddDiskEvent::getDiskName()
00170 {
00171 return diskname;
00172 }
00173
00174
00175 UpdateDiskQueueEvent::UpdateDiskQueueEvent( char *hostname, char *diskname, int queue )
00176 : StorageListEvent( hostname, UPDATE_DISK_QUEUE_EVENT )
00177 {
00178
00179
00180
00181
00182
00183
00184
00185 this->diskname = strdup( diskname );
00186 this->queue = queue;
00187 }
00188
00189 UpdateDiskQueueEvent::~UpdateDiskQueueEvent()
00190 {
00191 free( diskname );
00192 }
00193
00194 char *UpdateDiskQueueEvent::getDiskName()
00195 {
00196 return diskname;
00197 }
00198
00199 int UpdateDiskQueueEvent::getQueue()
00200 {
00201 return queue;
00202 }
00203
00204 MonitorTable::MonitorTable()
00205 {
00206 pthread_mutex_init( &table_access, NULL );
00207 sem_init( &event_queue_semaphore, 0, 0 );
00208
00209
00210 stopThread = false;
00211 }
00212
00213 void MonitorTable::getData( vector<ClientData> &clientTable,
00214 vector<StorageData> &storageTable )
00215 {
00216 pthread_mutex_lock( &table_access );
00217 clientTable.insert( clientTable.begin(), client_data.begin(), client_data.end() );
00218 storageTable.insert( storageTable.begin(), storage_data.begin(), storage_data.end() );
00219 pthread_mutex_unlock( &table_access );
00220 }
00221
00222 void MonitorTable::putMonitorEvent( MonitorEvent *event )
00223 {
00224 pthread_mutex_lock( &table_access );
00225 event_queue.push( event );
00226 sem_post( &event_queue_semaphore );
00227 pthread_mutex_unlock( &table_access );
00228 }
00229
00230 void MonitorTable::Monitor()
00231 {
00232
00233 RioErr << "MONITORTHREADID " << syscall( SYS_gettid ) << endl;
00234
00235 while( true )
00236 {
00237 sem_wait( &event_queue_semaphore );
00238 pthread_mutex_lock( &table_access );
00239
00240
00241 if( stopThread )
00242 {
00243 RioErr << "MonitorTable::Monitor thread terminada!" << endl;
00244 pthread_mutex_unlock( &table_access );
00245 pthread_exit( NULL );
00246
00247 }
00248 if( event_queue.empty() )
00249 {
00250 pthread_mutex_unlock( &table_access );
00251 continue;
00252 }
00253
00254 MonitorEvent *event = event_queue.front();
00255 switch( event->getType() )
00256 {
00257 case CLIENT_LIST_EVENT + CONNECT_EVENT:
00258 {
00259 ConnectEvent *c_event = (ConnectEvent *)event;
00260 ClientData data;
00261 data.m_ip = c_event->getIP();
00262 data.m_port = c_event->getPort();
00263 data.m_video = c_event->getVideo();
00264 time( &(data.m_time) );
00265 data.m_queue = 0;
00266 client_data.push_back( data );
00267 break;
00268 }
00269 case CLIENT_LIST_EVENT + UPDATE_QUEUE_EVENT:
00270 {
00271 UpdateQueueEvent *uq_event = (UpdateQueueEvent *)event;
00272 vector<ClientData>::iterator it;
00273 for( it = client_data.begin(); it != client_data.end(); it++ )
00274 {
00275 if( (*it).m_ip != uq_event->getIP() )
00276 continue;
00277 if( (*it).m_port != uq_event->getPort() )
00278 continue;
00279 (*it).m_queue = uq_event->getQueue();
00280 }
00281 break;
00282 }
00283 case CLIENT_LIST_EVENT + DISCONNECT_EVENT:
00284 {
00285 DisconnectEvent *d_event = (DisconnectEvent *)event;
00286 vector<ClientData>::iterator it;
00287 for( it = client_data.begin(); it != client_data.end(); it++ )
00288 {
00289 if( (*it).m_ip != d_event->getIP() )
00290 continue;
00291 if( (*it).m_port != d_event->getPort() )
00292 continue;
00293 if( (*it).m_video != d_event->getVideo() )
00294 continue;
00295 client_data.erase( it );
00296 it--;
00297 break;
00298 }
00299 break;
00300 }
00301 case STORAGE_LIST_EVENT + ADD_NODE_EVENT:
00302 {
00303 AddNodeEvent *an_event = (AddNodeEvent *)event;
00304 StorageData data;
00305 data.m_hostname = an_event->getHostName();
00306 data.m_ip = an_event->getIP();
00307 data.m_diskname = ALL_DISKS;
00308 data.m_queue = 0;
00309 storage_data.push_back( data );
00310 break;
00311 }
00312 case STORAGE_LIST_EVENT + ADD_DISK_EVENT:
00313 {
00314 AddDiskEvent *ad_event = (AddDiskEvent *)event;
00315 vector<StorageData>::iterator it;
00316 for( it = storage_data.begin(); it != storage_data.end(); it++ )
00317 {
00318 if( (*it).m_hostname == ad_event->getHostName() )
00319 {
00320 StorageData data;
00321 data.m_ip = "";
00322 data.m_hostname = ad_event->getHostName();
00323 data.m_diskname = ad_event->getDiskName();
00324 data.m_queue = 0;
00325
00326 it++;
00327 storage_data.insert( it, data );
00328 break;
00329 }
00330 }
00331 break;
00332 }
00333 case STORAGE_LIST_EVENT + UPDATE_DISK_QUEUE_EVENT:
00334 {
00335 UpdateDiskQueueEvent *udq_event = (UpdateDiskQueueEvent *)event;
00336 vector<StorageData>::iterator it, disk_it;
00337 for( it = storage_data.begin(); it != storage_data.end(); it++ )
00338 {
00339 if( (*it).m_hostname == udq_event->getHostName() )
00340 break;
00341 }
00342 (*it).m_queue = 0;
00343 for( disk_it = it + 1; disk_it != storage_data.end(); disk_it++ )
00344 {
00345 if( (*disk_it).m_hostname != udq_event->getHostName() )
00346 break;
00347
00348 if( (*disk_it).m_diskname == udq_event->getDiskName() )
00349 (*disk_it).m_queue = udq_event->getQueue();
00350
00351 (*it).m_queue += (*disk_it).m_queue;
00352 }
00353 break;
00354 }
00355 }
00356 event_queue.pop();
00357 delete event;
00358 pthread_mutex_unlock( &table_access );
00359 }
00360 }
00361
00362
00363 void MonitorTable::Stop()
00364 {
00365 pthread_mutex_lock( &table_access );
00366
00367 stopThread = true;
00368
00369 sem_post( &event_queue_semaphore );
00370 pthread_mutex_unlock( &table_access );
00371 }