00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <storagedevice.h>
00026 #include <msgqueue.h>
00027 #include <sserror.h>
00028 #include <extern.h>
00029 #include <iostream>
00030
00031 #include <vsierror.h>
00032 #include <stdio.h>
00033 #include <string.h>
00034 #include <errno.h>
00035 #include <math.h>
00036
00037 #include <netinet/in.h>
00038 #include <arpa/inet.h>
00039 #include <sys/socket.h>
00040
00041
00042 #include <unistd.h>
00043 #include <sys/syscall.h>
00044
00045 #include "../interface/RioError.h"
00046
00047 const int DeviceInactive = 0;
00048 const int DeviceActive = 1;
00049
00050 struct timeval hoje;
00051
00052
00053
00054
00055
00056
00057 CStorageDevice::CStorageDevice()
00058 {
00059
00060 logopen = 0;
00061 m_MaxActiveDiskThreads = 1;
00062 m_ActiveDiskThreads = 0;
00063
00064 m_EstServiceTime = 0.0;
00065 m_DevServiceTime = 0.0;
00066 memset(m_EstServiceTimeAccThreads,0,sizeof(m_EstServiceTimeAccThreads));
00067 memset(m_DevServiceTimeAccThreads,0,sizeof(m_DevServiceTimeAccThreads));
00068
00069 m_EstResponseTime = 0.0;
00070 m_DevResponseTime = 0.0;
00071 memset(m_EstResponseTimeAccThreads,0,sizeof(m_EstResponseTimeAccThreads));
00072 memset(m_DevResponseTimeAccThreads,0,sizeof(m_DevResponseTimeAccThreads));
00073 memset(m_NRespTSamplesAccThreads,0,sizeof(m_NRespTSamplesAccThreads));
00074
00075 m_AvgServiceTime_Model = 0;
00076 m_VarServiceTime_Model = 0;
00077 m_NSamples = 0;
00078 memset(m_AvgServiceTimeAccThreads,0,sizeof(m_AvgServiceTimeAccThreads));
00079 memset(m_VarServiceTimeAccThreads,0,sizeof(m_VarServiceTimeAccThreads));
00080 memset(m_NSamplesAccThreads,0,sizeof(m_NSamplesAccThreads));
00081
00082 m_EstimatedParameter = 0;
00083 m_CollectMeasures = false;
00084 m_StartingServiceInterval = false;
00085
00086
00087 m_Status = DeviceInactive;
00088 m_Name = NULL;
00089 m_NumberOfSaveMeasures = 0;
00090
00091 for( int i=0; i < 300; i++ )
00092 {
00093 m_RequestsForThreads[ i ].Request = NULL;
00094 m_RequestsForThreads[ i ].ActiveThreads = 0;
00095 m_RequestsForThreads[ i ].Available = true;
00096
00097 pthread_mutex_init( &m_MutexThreads[ i ], NULL );
00098 pthread_cond_init( &m_ConditionNew[ i ], NULL );
00099 }
00100
00101
00102 pthread_mutex_init( &m_Mutex, NULL );
00103 pthread_cond_init( &m_Condition, NULL );
00104 pthread_cond_init( &m_ConditionExitThread, NULL );
00105 }
00106
00107
00108 CStorageDevice::~CStorageDevice()
00109 {
00110
00111 for( int i=0; i < 300; i++ )
00112 {
00113 pthread_mutex_destroy( &m_MutexThreads[ i ] );
00114 pthread_cond_destroy( &m_ConditionNew[ i ] );
00115 }
00116
00117 pthread_mutex_destroy( &m_Mutex );
00118 pthread_cond_destroy( &m_Condition );
00119 pthread_cond_destroy( &m_ConditionExitThread );
00120 }
00121
00122
00123
00124 int CStorageDevice::Initialize(char* DeviceName)
00125 {
00126 if( m_Status == DeviceActive )
00127 {
00128 return ERROR_SS_PROGRAM_ERROR;
00129 }
00130
00131 m_Status = DeviceActive;
00132
00133 int stat;
00134 stat = m_Device.Open( DeviceName );
00135 if(stat < 0)
00136 {
00137 Stop();
00138 return ERROR_SS_INVALID_DEVICE;
00139 }
00140
00141 stat = m_Thread.Create(Thread,(void*) this);
00142 if( stat < 0 )
00143 {
00144 Stop();
00145 return ERROR_SS_THREAD_CREATE;
00146 }
00147
00148
00149 m_Name = DeviceName;
00150
00151 return 0;
00152 }
00153
00154
00155
00156
00157
00158
00159
00160 int CStorageDevice::Initialize( char* DeviceName,
00161 char* ip,
00162 int nDiskThreads,
00163 double EstimatedParameter,
00164 bool CollectMeasures,
00165 char* LogsDirectory )
00166 {
00167 char device_name[200] = "" , log[ MaxPathSize ] = "";
00168
00169
00170 int i = 0;
00171 while( DeviceName[i] != '\0' )
00172 {
00173 if( DeviceName[i] == '/' )
00174 device_name[i] = '_';
00175 else
00176 device_name[i] = DeviceName[i];
00177 i++;
00178 if( i == 190 ) break;
00179 }
00180
00181 RioErr << "[StorageDevice] CollectMeasures: \t" << CollectMeasures
00182 << endl;
00183 if( CollectMeasures )
00184 {
00185
00186
00187 if( LogsDirectory == NULL )
00188
00189
00190 sprintf( log, "RIOdisk.%s.%s.log", ip, device_name );
00191 else if( LogsDirectory[ strlen( LogsDirectory ) - 1 ] != '/' )
00192
00193 sprintf( log, "%s/RIOdisk.%s.%s.log", LogsDirectory, ip, device_name );
00194 else
00195
00196 sprintf( log, "%sRIOdisk.%s.%s.log", LogsDirectory, ip, device_name );
00197 logopen = 1;
00198 }
00199
00200 if( m_Status == DeviceActive )
00201 {
00202 return ERROR_SS_PROGRAM_ERROR;
00203 }
00204
00205 m_Status = DeviceActive;
00206
00207 int stat;
00208
00209 stat = m_Device.Open( DeviceName );
00210 if(stat < 0)
00211 {
00212 Stop();
00213 return ERROR_SS_INVALID_DEVICE;
00214 }
00215
00216
00217 m_Name = DeviceName;
00218 m_MaxActiveDiskThreads = nDiskThreads;
00219
00220 if( CollectMeasures )
00221 {
00222 m_log.open( log );
00223 m_log << "Device Name : " << m_Name <<endl;
00224 m_log << "Max Active Threads : " << m_MaxActiveDiskThreads <<endl;
00225 }
00226
00227
00228
00229 if( LogsDirectory == NULL )
00230
00231 sprintf( logname, "RIOdisk.%s.%s.log.times", ip, device_name );
00232 else if( LogsDirectory[ strlen( LogsDirectory ) - 1 ] != '/' )
00233
00234 sprintf( logname, "%s/RIOdisk.%s.%s.log.times", LogsDirectory, ip,
00235 device_name );
00236 else
00237
00238 sprintf( logname, "%sRIOdisk.%s.%s.log.times", LogsDirectory, ip,
00239 device_name );
00240
00241 stat = m_Thread.Create(Thread,(void*) this);
00242 if( stat < 0 )
00243 {
00244 Stop();
00245 return ERROR_SS_THREAD_CREATE;
00246 }
00247
00248 m_CollectMeasures = CollectMeasures;
00249 m_EstimatedParameter = EstimatedParameter;
00250
00251 if( CollectMeasures )
00252 {
00253 m_log << "CollectMeasures \t" << m_CollectMeasures <<endl;
00254 m_log << "EstimatedTimeParameter \t" << m_EstimatedParameter <<endl;
00255 }
00256
00257 for( int i = 0; i < m_MaxActiveDiskThreads; i++ )
00258 {
00259 pthread_t thread_id;
00260 pthread_attr_t attr;
00261 ThreadParameter *Param = new ThreadParameter;
00262 Param->DeviceName = m_Name;
00263 Param->DeviceSize = m_Device.Size();
00264 Param->Device = this;
00265 Param->ThreadNumber = i;
00266
00267 pthread_attr_init( &attr );
00268 pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
00269 pthread_attr_setstacksize( &attr, 2*PTHREAD_STACK_MIN );
00270 if((pthread_create( &thread_id, &attr, &BlockThread,(void*) Param )) != 0 )
00271 {
00272 RioErr << "Storage: Could not start BlockThread " << i << "("
00273 << strerror(errno) << ")!"<<endl;
00274
00275 return ERROR_SS_THREAD_CREATE;
00276 }
00277 }
00278 LoadMeasures( logname );
00279 return 0;
00280 }
00281
00282
00283
00284 void CStorageDevice::Stop()
00285 {
00286
00287 m_Status = DeviceInactive;
00288
00289
00290 m_Queue.Signal();
00291
00292 m_Thread.Join(0);
00293
00294 if( logopen == 1 ) m_log.close();
00295 }
00296
00297
00298
00299
00300
00301 void CStorageDevice::ProcessRequest(StrRequest* Request)
00302 {
00303 #ifdef RIO_DEBUG2
00304 struct in_addr client_IP;
00305 struct sockaddr_in client_PORT;
00306 if( Request != NULL )
00307 {
00308 client_IP.s_addr = Request->IPaddress;
00309 client_PORT.sin_port = Request->Port;
00310 RioErr << " ----A requisicao abaixo sera colocada da fila do disco----"
00311 << endl;
00312 RioErr << " ClientId: " << Request->ClientId
00313 << endl;
00314 RioErr << " IPaddress " << inet_ntoa( client_IP )
00315 << endl;
00316 RioErr << " Port " << htons(client_PORT.sin_port)
00317 << endl;
00318 RioErr << " Request->Disk " << Request->Disk << endl;
00319 }
00320 else
00321 RioErr << " ----A requisicao sera nula colocada da fila do disco----"
00322 << endl;
00323 #endif
00324
00325 m_Queue.Put(Request);
00326
00327 #ifdef RIO_DEBUG2
00328 if( Request != NULL )
00329 {
00330 client_IP.s_addr = Request->IPaddress;
00331 client_PORT.sin_port = Request->Port;
00332 RioErr << " ----A requisicao abaixo foi colocada com sucesso na fila "
00333 << "do disco----" << endl;
00334 RioErr << " ClientId: " << Request->ClientId
00335 << endl;
00336 RioErr << " IPaddress " << inet_ntoa( client_IP )
00337 << endl;
00338 RioErr << " Port " << htons(client_PORT.sin_port)
00339 << endl;
00340 RioErr << " Request->Disk " << Request->Disk << endl;
00341 }
00342 else
00343 RioErr << " ----A requisicao nula foi colocada da fila do disco----"
00344 << endl;
00345 #endif
00346 }
00347
00348
00349
00350
00351 void* CStorageDevice::Thread(void* Param)
00352 {
00353 int i;
00354
00355
00356 CStorageDevice* Device = (CStorageDevice*) Param;
00357
00358
00359 CRequestQueue &m_Queue = Device->m_Queue;
00360 int &m_Status = Device->m_Status;
00361
00362 if( Device->m_log.is_open() )
00363 {
00364 ofstream &m_log = Device->m_log;
00365
00366 m_log << "Device Thread running ..." << endl;
00367 }
00368
00369 RioErr << "DEVICETHREADID " << syscall( SYS_gettid ) << endl;
00370
00371
00372 while( 1 )
00373 {
00374 pthread_mutex_lock(&Device->m_Mutex);
00375 if( Device->m_ActiveDiskThreads < Device->m_MaxActiveDiskThreads )
00376 {
00377
00378 pthread_mutex_unlock(&Device->m_Mutex);
00379
00380 #ifdef RIO_DEBUG2
00381
00382
00383 bool isServed = false;
00384 #endif
00385
00386
00387
00388 #ifdef RIO_DEBUG2
00389 RioErr << " ----Tentando obter e esperando, se necessario, uma "
00390 << "requisicao da fila----" << endl;
00391 #endif
00392 StrRequest* Request = m_Queue.Get();
00393
00394 #ifdef RIO_DEBUG2
00395 struct in_addr client_IP;
00396 struct sockaddr_in client_PORT;
00397 if( Request != NULL )
00398 {
00399 client_IP.s_addr = Request->IPaddress;
00400 client_PORT.sin_port = Request->Port;
00401 RioErr << " ----A requisicao abaixo foi removida da fila----"
00402 << endl;
00403 RioErr << " ClientId: " << Request->ClientId
00404 << endl;
00405 RioErr << " IPaddress " << inet_ntoa( client_IP )
00406 << endl;
00407 RioErr << " Port " << htons(client_PORT.sin_port)
00408 << endl;
00409 RioErr << " Request->Disk " << Request->Disk << endl;
00410 }
00411 else
00412 RioErr << " ----A requisicao nula foi removida da fila----"
00413 << endl;
00414 #endif
00415
00416 if(( m_Status != DeviceActive ) || ( Request == 0 ))
00417 {
00418 if(m_Status != DeviceActive)
00419 RioErr << "Storage: loop: Interface is not active!" << endl;
00420 else
00421 RioErr << "Storage: loop: Request = 0!" << endl;
00422 return( 0 );
00423 }
00424
00425 pthread_mutex_lock(&Device->m_Mutex);
00426
00427 Device->m_ActiveDiskThreads++;
00428 if( Device->m_ActiveDiskThreads == 1 )
00429 Device->m_StartingServiceInterval = true;
00430
00431
00432 pthread_mutex_unlock( &Device->m_Mutex );
00433
00434 for( i = 0; i < Device->m_MaxActiveDiskThreads; i++ )
00435 {
00436 if( Device->m_RequestsForThreads[ i ].Available == true )
00437 {
00438 pthread_mutex_lock( &Device->m_MutexThreads[i] );
00439 Device->m_RequestsForThreads[ i ].Request = Request;
00440 Device->m_RequestsForThreads[ i ].Available = false;
00441 Device->m_RequestsForThreads[ i ].ActiveThreads =
00442 Device->m_ActiveDiskThreads;
00443
00444 #ifdef RIO_DEBUG2
00445 RioErr << " ----Liberando thread no storagedevice para "
00446 << " a requisicao abaixo----" << endl;
00447 RioErr << " ClientId: " << Request->ClientId
00448 << endl;
00449 RioErr << " IPaddress " << inet_ntoa( client_IP )
00450 << endl;
00451 RioErr << " Port " << htons(client_PORT.sin_port)
00452 << endl;
00453 RioErr << " Request->Disk " << Request->Disk << endl;
00454
00455 isServed = true;
00456 #endif
00457
00458 pthread_cond_broadcast(&Device->m_ConditionNew[i]);
00459 pthread_mutex_unlock( &Device->m_MutexThreads[i] );
00460 break;
00461 }
00462 }
00463 #ifdef RIO_DEBUG2
00464 if( !isServed )
00465 {
00466 RioErr << " ----Aviso: a requisicao abaixo foi ignorada, pois "
00467 << "incorretamente achamos que existiam de disco "
00468 << "disponiveis! threads ativas = "
00469 << Device->m_ActiveDiskThreads << " e threads totais = "
00470 << Device->m_MaxActiveDiskThreads << " ----" << endl;
00471 RioErr << " ClientId: " << Request->ClientId << endl;
00472 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl;
00473 RioErr << " Port " << htons( client_PORT.sin_port ) << endl;
00474 RioErr << " Request->Disk " << Request->Disk << endl;
00475 }
00476 #endif
00477 }
00478 else
00479 {
00480 RioErr << "Storage: Waiting Available Thread!" << endl;
00481 pthread_cond_wait( &Device->m_Condition,&Device->m_Mutex );
00482 pthread_mutex_unlock( &Device->m_Mutex );
00483 }
00484 }
00485 }
00486
00487 void * CStorageDevice::BlockThread( void * Param)
00488 {
00489 ThreadParameter *BParam = ( ThreadParameter * ) Param;
00490
00491 char *devicename = BParam->DeviceName;
00492 int ThreadNumber = BParam->ThreadNumber;
00493 u64 size = BParam->DeviceSize;
00494 CStorageDevice* SDevice = BParam->Device;
00495 double ETParameter = BParam->Device->m_EstimatedParameter;
00496 int activeThreads = 0;
00497 int FlagStatus;
00498 double average, sample, sample_servicetime;
00499 u16 Command;
00500 struct timeval initialtime, finaltime, interval;
00501
00502 CvsiDevice Device;
00503 StrRequest* Request;
00504
00505 int stat = Device.OpenDeviceAgain( devicename, size );
00506 if( stat < 0 )
00507 {
00508 RioErr << "BlockThread: Could not open device "<< devicename << endl;
00509 return( (void *)ERROR_SS_THREAD_CREATE );
00510 }
00511
00512 RioErr << "BLOCKTHREADID " << syscall( SYS_gettid ) << endl;
00513
00514 SDevice->m_log << "Thread N " << ThreadNumber << endl;
00515 while( 1 )
00516 {
00517 Request = NULL;
00518 pthread_mutex_lock( &SDevice->m_MutexThreads[ThreadNumber] );
00519
00520 if( SDevice->m_RequestsForThreads[ ThreadNumber ].Available == false )
00521 {
00522 Request = SDevice->m_RequestsForThreads[ ThreadNumber ].Request;
00523 SDevice->m_RequestsForThreads[ ThreadNumber ].Available = true;
00524 activeThreads = SDevice->m_RequestsForThreads[ ThreadNumber ].ActiveThreads;
00525 SDevice->m_log << "Thread " << ThreadNumber << "(" << pthread_self() << ") entrou "<<endl ;
00526
00527 #ifdef RIO_DEBUG2
00528 {
00529 struct in_addr client_IP;
00530 client_IP.s_addr = Request->IPaddress;
00531 struct sockaddr_in client_PORT;
00532 client_PORT.sin_port = Request->Port;
00533
00534 RioErr << " ----Thread " << pthread_self()
00535 << " atendendo requisicao abaixo----" << endl;
00536 RioErr << " ClientId: " << Request->ClientId << endl;
00537 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl;
00538 RioErr << " Port " << htons(client_PORT.sin_port) << endl;
00539 RioErr << " Request->Disk " << Request->Disk << endl;
00540 }
00541 #endif
00542 }
00543 else
00544 {
00545
00546 #ifdef RIO_DEBUG2
00547 RioErr << " ----Esperando em m_ConditionNew[" << ThreadNumber << "]----"
00548 << endl;
00549 #endif
00550
00551 pthread_cond_wait( &SDevice->m_ConditionNew[ThreadNumber],
00552 &SDevice->m_MutexThreads[ThreadNumber] );
00553 if( SDevice->m_RequestsForThreads[ ThreadNumber ].Available ==
00554 false
00555 )
00556 {
00557 Request = SDevice->m_RequestsForThreads[ ThreadNumber ].Request;
00558 SDevice->m_RequestsForThreads[ ThreadNumber ].Available = true;
00559 activeThreads = SDevice->m_RequestsForThreads[ ThreadNumber ].ActiveThreads;
00560 SDevice->m_log << "Thread " << ThreadNumber << "(" << pthread_self() << ") entrou "<<endl ;
00561 }
00562 }
00563 pthread_mutex_unlock(&SDevice->m_MutexThreads[ThreadNumber]);
00564
00565 if( Request == NULL )
00566 {
00567 break;
00568 }
00569
00570 Command = Request->Command;
00571
00572
00573 if(( Command & 0xf ) == MSG_RSS_READ )
00574 {
00575
00576
00577 if (Command & RSS_INTERMEDIATE)
00578 FlagStatus = 1;
00579 else
00580 FlagStatus = 0;
00581 }
00582 else
00583 {
00584
00585
00586 if( Command & RSS_NOCOMPLETE )
00587 FlagStatus = 0;
00588 else
00589 FlagStatus = 1;
00590 }
00591
00592
00593 Command = Command & 0xf;
00594
00595 switch( Command )
00596 {
00597 case MSG_RSS_WRITE:
00598 case MSG_RSS_FLUSH:
00599 Request->Status = RequestStatusWrite;
00600 gettimeofday( &initialtime,0 );
00601 stat = Device.Write(Request->Pos,Request->Size,Request->Data);
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611 Request->Status = RequestStatusWriteDone;
00612 break;
00613 case MSG_RSS_READ:
00614 case MSG_RSS_FETCH:
00615 Request->Status = RequestStatusRead;
00616 gettimeofday( &initialtime,0 );
00617 stat = Device.Read(Request->Pos,Request->Size,Request->Data);
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628 Request->Status = RequestStatusReadDone;
00629 break;
00630 default:
00631 RioErr << "Invalid Command code on Request: " << Command
00632 << endl;
00633 RioErr << "Unexpected error on storage device thread. "
00634 << "Aborting thread " << endl;
00635 return 0;
00636 }
00637
00638
00639 pthread_mutex_lock(&SDevice->m_Mutex);
00640
00641 gettimeofday( &finaltime,0 );
00642 interval.tv_sec = finaltime.tv_sec - initialtime.tv_sec;
00643 interval.tv_usec = finaltime.tv_usec - initialtime.tv_usec;
00644 if( interval.tv_usec < 0)
00645 {
00646 interval.tv_sec -= 1;
00647 interval.tv_usec += 1000000;
00648 }
00649
00650 sample = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0;
00651
00652
00653 SDevice->m_ActiveDiskThreads--;
00654
00655 if( SDevice->m_StartingServiceInterval == true )
00656 {
00657 SDevice->m_StartingServiceInterval = false;
00658 }
00659 else
00660 {
00661 interval.tv_sec = finaltime.tv_sec - SDevice->m_LastServiceTime.tv_sec;
00662 interval.tv_usec = finaltime.tv_usec - SDevice->m_LastServiceTime.tv_usec;
00663 if( interval.tv_usec < 0)
00664 {
00665 interval.tv_sec -= 1;
00666 interval.tv_usec += 1000000;
00667 }
00668
00669 sample_servicetime = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0;
00670
00671
00672 if( SDevice->m_EstServiceTime == 0)
00673 {
00674 SDevice->m_EstServiceTime = sample_servicetime;
00675 }
00676 else
00677 {
00678 SDevice->m_EstServiceTime = ( 1 - ETParameter ) * SDevice->m_EstServiceTime
00679 + ETParameter * sample_servicetime;
00680 }
00681
00682
00683 SDevice->m_DevServiceTime = ( 1 - ETParameter ) * SDevice->m_DevServiceTime +
00684 ETParameter * fabs( sample_servicetime - SDevice->m_EstServiceTime );
00685
00686
00687 if( SDevice->m_EstServiceTimeAccThreads[activeThreads] == 0)
00688 {
00689 SDevice->m_EstServiceTimeAccThreads[activeThreads] = sample_servicetime;
00690 }
00691 else
00692 {
00693 SDevice->m_EstServiceTimeAccThreads[activeThreads] =
00694 ( 1 - ETParameter ) * SDevice->m_EstServiceTimeAccThreads[activeThreads]
00695 + ETParameter * sample_servicetime;
00696 }
00697
00698
00699 SDevice->m_DevServiceTimeAccThreads[activeThreads] =
00700 ( 1 - ETParameter ) * SDevice->m_DevServiceTimeAccThreads[activeThreads] +
00701 ETParameter * fabs( sample_servicetime - SDevice->m_EstServiceTimeAccThreads[activeThreads] );
00702
00703
00704 average = SDevice->m_AvgServiceTime_Model;
00705 SDevice->m_AvgServiceTime_Model = SDevice->m_AvgServiceTime_Model +
00706 (( sample_servicetime - SDevice->m_AvgServiceTime_Model )/( SDevice->m_NSamples + 1 ));
00707
00708
00709 if( SDevice->m_NSamples >= 1 )
00710 {
00711 SDevice->m_VarServiceTime_Model = (1 - (float) 1/SDevice->m_NSamples ) * SDevice->m_VarServiceTime_Model
00712 + (SDevice->m_NSamples + 1) * pow((SDevice->m_AvgServiceTime_Model - average), 2);
00713 }
00714 SDevice->m_NSamples++;
00715
00716
00717 average = SDevice->m_AvgServiceTimeAccThreads[activeThreads];
00718 SDevice->m_AvgServiceTimeAccThreads[activeThreads] = SDevice->m_AvgServiceTimeAccThreads[activeThreads] +
00719 (( sample_servicetime - SDevice->m_AvgServiceTimeAccThreads[activeThreads])/( SDevice->m_NSamplesAccThreads[activeThreads] + 1 ));
00720
00721
00722 if( SDevice->m_NSamplesAccThreads[activeThreads] >= 1 )
00723 {
00724 SDevice->m_VarServiceTimeAccThreads[activeThreads] = (1 - (float) 1/SDevice->m_NSamplesAccThreads[activeThreads] ) *
00725 SDevice->m_VarServiceTimeAccThreads[activeThreads]
00726 + (SDevice->m_NSamplesAccThreads[activeThreads] + 1) * pow((SDevice->m_AvgServiceTimeAccThreads[activeThreads] - average), 2);
00727 }
00728 SDevice->m_NSamplesAccThreads[activeThreads]++;
00729
00730
00731
00732
00733
00734
00735
00736 }
00737
00738 SDevice->m_LastServiceTime = finaltime;
00739
00740
00741 if( SDevice->m_EstResponseTimeAccThreads[activeThreads] == 0)
00742 {
00743 SDevice->m_EstResponseTimeAccThreads[activeThreads] = sample;
00744 }
00745 else
00746 {
00747 SDevice->m_EstResponseTimeAccThreads[activeThreads] =
00748 ( 1 - ETParameter ) * SDevice->m_EstResponseTimeAccThreads[activeThreads]
00749 + ETParameter * sample;
00750 }
00751
00752
00753 SDevice->m_DevResponseTimeAccThreads[activeThreads] =
00754 ( 1 - ETParameter ) * SDevice->m_DevResponseTimeAccThreads[activeThreads] +
00755 ETParameter * fabs( sample - SDevice->m_EstResponseTimeAccThreads[activeThreads] );
00756
00757 SDevice->m_NRespTSamplesAccThreads[activeThreads]++;
00758
00759
00760 if( SDevice->m_EstResponseTime == 0)
00761 {
00762 SDevice->m_EstResponseTime = sample;
00763 }
00764 else
00765 {
00766 SDevice->m_EstResponseTime = ( 1 - ETParameter ) * SDevice->m_EstResponseTime +
00767 ETParameter * sample;
00768 }
00769
00770
00771 SDevice->m_DevResponseTime = ( 1 - ETParameter ) * SDevice->m_DevResponseTime +
00772 ETParameter * fabs( sample - SDevice->m_EstResponseTime );
00773
00774 SDevice->m_log<< "Thread "<< ThreadNumber << "("<< pthread_self() << ") saiu " <<endl;
00775
00776 #ifdef RIO_DEBUG2
00777 RioErr << " ----Desbloqueando a thread principal do disco----" << endl;
00778 #endif
00779 pthread_cond_broadcast(&SDevice->m_Condition);
00780 pthread_mutex_unlock(&SDevice->m_Mutex);
00781
00782
00783 StrMsg* Slot = NULL;
00784 if(( FlagStatus ) || ( stat < 0 ))
00785 {
00786 Slot = MsgManager.New();
00787 if( stat < 0 )
00788 {
00789 RioErr << "Storage: IO Failed (";
00790 switch( stat )
00791 {
00792 case VSI_ERROR_DEVICE_POSITION:
00793 RioErr <<"VSI_ERROR_DEVICE_POSITION) ";
00794 break;
00795 case VSI_ERROR_DEVICE_SEEK:
00796 RioErr <<"VSI_ERROR_DEVICE_SEEK) ";
00797 break;
00798 case VSI_ERROR_DEVICE_READ:
00799 RioErr <<"VSI_ERROR_DEVICE_READ) ";
00800 }
00801 RioErr << "RouterId " << Request->RouterId << endl;
00802 stat = ERROR_RSS_IO_FAILED;
00803 }
00804 if(( Command == MSG_RSS_WRITE ) || ( Command == MSG_RSS_FLUSH ))
00805 {
00806 Slot->Msg.Storage.Status.Type = MSG_RSS_WRITECOMPLETE;
00807 }
00808 else
00809 {
00810 Slot->Msg.Storage.Status.Type = MSG_RSS_READCOMPLETE;
00811 }
00812 Slot->Msg.Storage.Status.Size = SizeMsgRSSstatus;
00813 Slot->Msg.Storage.Status.RouterId = Request->RouterId;
00814 Slot->Msg.Storage.Status.StorageId = Request->Id;
00815 Slot->Msg.Storage.Status.Error = stat;
00816 Slot->Msg.Storage.Status.ActiveThreads = activeThreads;
00817 }
00818
00819
00820
00821 switch( Command )
00822 {
00823 case MSG_RSS_WRITE:
00824
00825 RequestManager.Free(Request);
00826 break;
00827 case MSG_RSS_FLUSH:
00828
00829 RequestManager.Free(Request);
00830 break;
00831 case MSG_RSS_READ:
00832
00833
00834 break;
00835 case MSG_RSS_FETCH:
00836
00837
00838
00839
00840
00841 Request->Status = RequestStatusWaitSendCommand;
00842 break;
00843 default:
00844 RioErr << "Invalid Command code on Request: " << Command
00845 << endl;
00846 RioErr << "Unexpected error on storage device thread. "
00847 << "Aborting thread" << endl;
00848 return 0;
00849 }
00850
00851
00852 if(( FlagStatus) || (stat < 0) )
00853 {
00854 #ifdef RIO_DEBUG2
00855 RioErr <<"Sending status (sotoragedevice.blockthread) to router."
00856 << endl;
00857 #endif
00858
00859 Router.Send(Slot);
00860 };
00861
00862
00863
00864
00865 if( Command == MSG_RSS_READ )
00866 {
00867
00868 if( stat >= 0 )
00869 {
00870 Client.Send( Request );
00871 }
00872 }
00873 }
00874
00875 Device.Close();
00876 pthread_exit( NULL );
00877 }
00878
00879
00880 int CStorageDevice::SectorSize()
00881 {
00882 return m_Device.SectorSize();
00883 }
00884
00885
00886 u64 CStorageDevice::Size()
00887 {
00888 return m_Device.Size();
00889 }
00890
00891
00892 void CStorageDevice::GetDiskServiceTime( double averageservicetime[301])
00893 {
00894 for( int aux = 0; aux < 301; aux++ )
00895 {
00896 averageservicetime[aux] = m_EstServiceTimeAccThreads[aux];
00897
00898
00899 }
00900 m_NumberOfSaveMeasures++;
00901
00902
00903 if( m_NumberOfSaveMeasures % 5 == 0 && m_CollectMeasures )
00904 {
00905 char filename[350];
00906 FILE *handle;
00907 sprintf( filename, "%s.%d", logname, m_NumberOfSaveMeasures );
00908
00909 if( ( handle = fopen( filename, "w")) == NULL )
00910 {
00911 if( m_log.is_open() )
00912 m_log << " Could not create logtimes!" << strerror ( errno )
00913 << endl;
00914 }
00915 else
00916 {
00917 fprintf( handle, "# EstimatedServiceTime of Disk %10.10f msec\n", m_EstServiceTime );
00918 fprintf( handle, "# DeviationServiceTime %f\n", m_DevServiceTime );
00919 fprintf( handle, "\n# AverageServiceTime of Disk %10.10f msec\n", m_AvgServiceTime_Model );
00920 fprintf( handle, "# VarianceServiceTime %f \n",m_VarServiceTime_Model );
00921 fprintf( handle, "# Number of Samples %lld \n",m_NSamples );
00922 fprintf( handle, "\n# EstimatedResponseTime of Disk %10.10f msec\n", m_EstResponseTime );
00923 fprintf( handle, "# DeviationResponseTime %f\n", m_DevResponseTime );
00924 fprintf( handle, "\n# Measures in msec.\n");
00925 fprintf( handle, "#\tEstServT\tDevServT\tAvgServT\tVarServT\tSamples\t\tEstRespT\tDevRespT\tNRespTSamples\n");
00926 for( int aux = 1; aux < 301; aux++ )
00927 {
00928 fprintf( handle, "%3d\t%6.6f\t%6.6f\t%6.6f\t%6.6f\t%lld\t\t%6.6f\t%6.6f\t%lld\n",
00929 aux,
00930 m_EstServiceTimeAccThreads[aux],
00931 m_DevServiceTimeAccThreads[aux],
00932 m_AvgServiceTimeAccThreads[aux],
00933 m_VarServiceTimeAccThreads[aux],
00934 m_NSamplesAccThreads[aux],
00935 m_EstResponseTimeAccThreads[aux],
00936 m_DevResponseTimeAccThreads[aux],
00937 m_NRespTSamplesAccThreads[aux]);
00938 }
00939 fclose( handle );
00940 }
00941 }
00942 return;
00943 }
00944
00945
00946 double CStorageDevice::GetEstimatedTime()
00947 {
00948 return m_EstServiceTime;
00949 }
00950
00951 void CStorageDevice::LoadMeasures( char* filename )
00952 {
00953 FILE *handle;
00954 int i;
00955
00956 if( ( handle = fopen( filename, "r")) == NULL )
00957 {
00958 if( m_log.is_open() )
00959 m_log << " Could not read logtimes(" << filename << ")!"
00960 << strerror ( errno )<< endl;
00961 }
00962 else
00963 {
00964 if( fscanf( handle, "# EstimatedServiceTime of Disk %lf msec\n",
00965 &m_EstServiceTime ) != 1 )
00966 {
00967 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00968 << "ler o arquivo " << filename << endl;
00969 }
00970 if( fscanf( handle, "# DeviationServiceTime %lf\n",
00971 &m_DevServiceTime ) != 1 )
00972 {
00973 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00974 << "ler o arquivo " << filename << endl;
00975 }
00976 if( fscanf( handle, "\n# AverageServiceTime of Disk %lf msec\n",
00977 &m_AvgServiceTime_Model ) != 1 )
00978 {
00979 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00980 << "ler o arquivo " << filename << endl;
00981 }
00982 if( fscanf( handle, "# VarianceServiceTime %lf \n",
00983 &m_VarServiceTime_Model ) != 1 )
00984 {
00985 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00986 << "ler o arquivo " << filename << endl;
00987 }
00988 if( fscanf( handle, "# Number of Samples %lld \n",
00989 &m_NSamples ) != 1 )
00990 {
00991 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00992 << "ler o arquivo " << filename << endl;
00993 }
00994 if( fscanf( handle, "\n# EstimatedResponseTime of Disk %lf msec\n",
00995 &m_EstResponseTime ) != 1 )
00996 {
00997 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00998 << "ler o arquivo " << filename << endl;
00999 }
01000 if( fscanf( handle, "# DeviationResponseTime %lf\n",
01001 &m_DevResponseTime ) != 1 )
01002 {
01003 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01004 << "ler o arquivo " << filename << endl;
01005 }
01006 if( fscanf( handle, "\n# Measures in msec.\n") != 0 )
01007 {
01008 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01009 << "ler o arquivo " << filename << endl;
01010 }
01011 if( fscanf( handle, "#\tEstServT\tDevServT\tAvgServT\tVarServT\tSamples\t\tEstRespT\tDevRespT\tNRespTSamples\n")
01012 != 0 )
01013 {
01014 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01015 << "ler o arquivo " << filename << endl;
01016 }
01017 for( int aux = 1; (aux < 301) && (!feof( handle )); aux++ )
01018 {
01019 if( fscanf( handle, "%3d\t%lf\t%lf\t%lf\t%lf\t%lld\t\t%lf\t%lf\t%lld\n",
01020 &i,
01021 &m_EstServiceTimeAccThreads[aux],
01022 &m_DevServiceTimeAccThreads[aux],
01023 &m_AvgServiceTimeAccThreads[aux],
01024 &m_VarServiceTimeAccThreads[aux],
01025 &m_NSamplesAccThreads[aux],
01026 &m_EstResponseTimeAccThreads[aux],
01027 &m_DevResponseTimeAccThreads[aux],
01028 &m_NRespTSamplesAccThreads[aux] ) != 9 )
01029 {
01030 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01031 << "ler o arquivo " << filename << endl;
01032 }
01033
01034 }
01035 fclose( handle );
01036 }
01037
01038 return;
01039 }
01040
01041