CStorageDevice Class Reference

#include <storagedevice.h>

Public Member Functions

 CStorageDevice ()
 ~CStorageDevice ()
int Initialize (char *DeviceName)
void Stop ()
void ProcessRequest (StrRequest *Request)
int SectorSize ()
u64 Size ()
int Initialize (char *DeviceName, char *ip, int nDiskThreads, double EstimatedParameter, bool CollectMeasures, char *LogsDirectory)
void GetDiskServiceTime (double averageservicetime[301])
double GetEstimatedTime ()
void LoadMeasures (char *filename)

Data Fields

ofstream m_log
char logname [MaxPathSize]
int logopen
int m_MaxActiveDiskThreads
pthread_mutex_t m_Mutex
pthread_mutex_t m_MutexThreads [300]
pthread_cond_t m_Condition
pthread_cond_t m_ConditionNew [300]
pthread_cond_t m_ConditionExitThread
int m_ActiveDiskThreads
ThreadRequestParameter m_RequestsForThreads [300]
double m_EstServiceTime
double m_DevServiceTime
double m_EstServiceTimeAccThreads [301]
double m_DevServiceTimeAccThreads [301]
double m_EstResponseTime
double m_DevResponseTime
double m_EstResponseTimeAccThreads [301]
double m_DevResponseTimeAccThreads [301]
unsigned long long int m_NRespTSamplesAccThreads [301]
double m_AvgServiceTime_Model
double m_VarServiceTime_Model
unsigned long long int m_NSamples
double m_AvgServiceTimeAccThreads [301]
double m_VarServiceTimeAccThreads [301]
unsigned long long int m_NSamplesAccThreads [301]
double m_EstimatedParameter
bool m_CollectMeasures
struct timeval m_LastServiceTime
bool m_StartingServiceInterval

Static Private Member Functions

static void * Thread (void *Param)
static void * BlockThread (void *Param)

Private Attributes

CvsiThread m_Thread
CvsiDevice m_Device
CRequestQueue m_Queue
int m_Status
char * m_Name
unsigned int m_NumberOfSaveMeasures

Detailed Description

Definition at line 58 of file storagedevice.h.


Constructor & Destructor Documentation

CStorageDevice::CStorageDevice (  ) 

Definition at line 57 of file storagedevice.cpp.

00058 {
00059     /* Public variables */
00060     logopen = 0;
00061     m_MaxActiveDiskThreads = 1;
00062     m_ActiveDiskThreads    = 0;
00063 
00064     m_EstServiceTime       = 0.0;
00065     m_DevServiceTime       = 0.0;
00066     memset(m_EstServiceTimeAccThreads,0,sizeof(m_EstServiceTimeAccThreads));
00067     memset(m_DevServiceTimeAccThreads,0,sizeof(m_DevServiceTimeAccThreads));
00068 
00069     m_EstResponseTime = 0.0;
00070     m_DevResponseTime = 0.0;
00071     memset(m_EstResponseTimeAccThreads,0,sizeof(m_EstResponseTimeAccThreads));
00072     memset(m_DevResponseTimeAccThreads,0,sizeof(m_DevResponseTimeAccThreads));
00073     memset(m_NRespTSamplesAccThreads,0,sizeof(m_NRespTSamplesAccThreads));
00074 
00075     m_AvgServiceTime_Model = 0;
00076     m_VarServiceTime_Model = 0;
00077     m_NSamples = 0;
00078     memset(m_AvgServiceTimeAccThreads,0,sizeof(m_AvgServiceTimeAccThreads));
00079     memset(m_VarServiceTimeAccThreads,0,sizeof(m_VarServiceTimeAccThreads));
00080     memset(m_NSamplesAccThreads,0,sizeof(m_NSamplesAccThreads));
00081 
00082     m_EstimatedParameter      = 0;
00083     m_CollectMeasures         = false;
00084     m_StartingServiceInterval = false;
00085 
00086     /* Private variables */
00087     m_Status = DeviceInactive;
00088     m_Name = NULL;
00089     m_NumberOfSaveMeasures = 0;
00090 
00091     for( int i=0; i < 300; i++ )            
00092     {
00093        m_RequestsForThreads[ i ].Request = NULL;
00094        m_RequestsForThreads[ i ].ActiveThreads = 0;
00095        m_RequestsForThreads[ i ].Available = true;
00096        // Inicializa o mutex e a variavel de condicao usada pela thread "i".
00097        pthread_mutex_init( &m_MutexThreads[ i ], NULL );
00098        pthread_cond_init( &m_ConditionNew[ i ], NULL );
00099     }
00100 
00101     // Inicializa os mutexes e as variaveis de condicao (principais). 
00102     pthread_mutex_init( &m_Mutex, NULL );
00103     pthread_cond_init( &m_Condition, NULL );
00104     pthread_cond_init( &m_ConditionExitThread, NULL );
00105 }

CStorageDevice::~CStorageDevice (  ) 

Definition at line 108 of file storagedevice.cpp.

00109 {
00110     // Destroi os mutexes e as variaveis de condicao usada pelas threads.
00111     for( int i=0; i < 300; i++ )            
00112     {
00113        pthread_mutex_destroy( &m_MutexThreads[ i ] );
00114        pthread_cond_destroy( &m_ConditionNew[ i ] );
00115     }
00116     // Destroi os mutexes e as variaveis de condicao (principais). 
00117     pthread_mutex_destroy( &m_Mutex );
00118     pthread_cond_destroy( &m_Condition );
00119     pthread_cond_destroy( &m_ConditionExitThread );
00120 }


Member Function Documentation

void * CStorageDevice::BlockThread ( void *  Param  )  [static, private]

Definition at line 487 of file storagedevice.cpp.

00488 {
00489     ThreadParameter *BParam = ( ThreadParameter * ) Param;
00490 
00491     char *devicename        = BParam->DeviceName;
00492     int ThreadNumber        = BParam->ThreadNumber;
00493     u64 size                = BParam->DeviceSize;
00494     CStorageDevice* SDevice = BParam->Device;
00495     double ETParameter      = BParam->Device->m_EstimatedParameter;
00496     int activeThreads       = 0;
00497     int FlagStatus;
00498     double average, sample, sample_servicetime;
00499     u16 Command;
00500     struct timeval initialtime, finaltime, interval;
00501 
00502     CvsiDevice Device;
00503     StrRequest* Request;
00504 
00505     int stat = Device.OpenDeviceAgain( devicename, size );
00506     if( stat < 0 )
00507     {
00508         RioErr << "BlockThread: Could not open device "<< devicename << endl;
00509         return( (void *)ERROR_SS_THREAD_CREATE );
00510     }
00511     
00512     RioErr << "BLOCKTHREADID " << syscall( SYS_gettid ) << endl;
00513 
00514     SDevice->m_log << "Thread N " << ThreadNumber << endl;
00515     while( 1 )
00516     {
00517         Request = NULL;
00518         pthread_mutex_lock( &SDevice->m_MutexThreads[ThreadNumber] );
00519         //SDevice->m_log << "Thread " << ThreadNumber  << " esperando "<<endl ;
00520         if( SDevice->m_RequestsForThreads[ ThreadNumber ].Available == false )
00521         {
00522             Request = SDevice->m_RequestsForThreads[ ThreadNumber ].Request;
00523             SDevice->m_RequestsForThreads[ ThreadNumber  ].Available = true;
00524             activeThreads   = SDevice->m_RequestsForThreads[ ThreadNumber ].ActiveThreads;
00525             SDevice->m_log << "Thread " << ThreadNumber  << "(" << pthread_self() << ") entrou "<<endl ;
00526 
00527             #ifdef RIO_DEBUG2
00528             {
00529                 struct in_addr client_IP;
00530                 client_IP.s_addr = Request->IPaddress;
00531                 struct sockaddr_in client_PORT;
00532                 client_PORT.sin_port = Request->Port;
00533 
00534                 RioErr << " ----Thread " << pthread_self()
00535                        << " atendendo requisicao abaixo----" << endl;
00536                 RioErr << " ClientId: " << Request->ClientId << endl;
00537                 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl;
00538                 RioErr << " Port " << htons(client_PORT.sin_port) << endl;
00539                 RioErr << " Request->Disk " << Request->Disk << endl;
00540             }
00541             #endif
00542         }
00543         else
00544         {
00545 
00546            #ifdef RIO_DEBUG2
00547            RioErr << " ----Esperando em m_ConditionNew[" << ThreadNumber << "]----" 
00548                   << endl;
00549            #endif
00550 
00551            pthread_cond_wait( &SDevice->m_ConditionNew[ThreadNumber],
00552                               &SDevice->m_MutexThreads[ThreadNumber] );
00553            if( SDevice->m_RequestsForThreads[ ThreadNumber ].Available ==
00554                false
00555              )
00556            {
00557                 Request = SDevice->m_RequestsForThreads[ ThreadNumber ].Request;
00558                 SDevice->m_RequestsForThreads[ ThreadNumber  ].Available = true;
00559                 activeThreads   = SDevice->m_RequestsForThreads[ ThreadNumber ].ActiveThreads;
00560                 SDevice->m_log << "Thread " << ThreadNumber  << "(" << pthread_self() << ") entrou "<<endl ;
00561            }
00562         }
00563         pthread_mutex_unlock(&SDevice->m_MutexThreads[ThreadNumber]);
00564 
00565         if( Request == NULL )
00566         {
00567             break;
00568         }
00569 
00570         Command = Request->Command;
00571 
00572         // Test if this I/O will require a status message to be sent to router.
00573         if(( Command & 0xf ) == MSG_RSS_READ )
00574         {
00575             // if command is read this is the first of two steps
00576             // Send status message only if bit RSS_INTERMEDIATE is set
00577             if (Command & RSS_INTERMEDIATE)
00578                 FlagStatus = 1;
00579             else
00580                 FlagStatus = 0;
00581         }
00582         else
00583         {
00584             // If this something else (not "read") this is the last step
00585             // Send status message if bit RSS_NOCOMPLETE is not set
00586             if( Command & RSS_NOCOMPLETE )
00587                 FlagStatus = 0;
00588             else
00589                 FlagStatus = 1;
00590         }
00591 
00592         // Reset flag bits on command
00593         Command = Command & 0xf;
00594 
00595         switch( Command )
00596         {
00597             case MSG_RSS_WRITE:
00598             case MSG_RSS_FLUSH:
00599                 Request->Status = RequestStatusWrite;
00600                 gettimeofday( &initialtime,0 );
00601                 stat = Device.Write(Request->Pos,Request->Size,Request->Data);
00602                 
00603                 // Esta parte foi comentada pois ainda nao esta sendo usada pela 
00604                 // emulacao mas futuramente pretendemos criar um script para 
00605                 // usar esta informacao.
00606                 //#ifdef RIO_DEBUG_EMUL
00607                 //RioErr << "DEVICEWRITE " << ( int ) time( NULL ) << " " 
00608                 //       << Request->Size << endl;
00609                 //#endif       
00610                        
00611                 Request->Status = RequestStatusWriteDone;
00612                 break;
00613             case MSG_RSS_READ:
00614             case MSG_RSS_FETCH:
00615                 Request->Status = RequestStatusRead;
00616                 gettimeofday( &initialtime,0 );
00617                 stat = Device.Read(Request->Pos,Request->Size,Request->Data);
00618                 
00619                 // Esta impressao foi comentada porque estes logs ainda nao sao 
00620                 // necessarios. Se for necessario usar estes logs, esta linha do 
00621                 // codigo pode ser descomentada (ja existe um script para
00622                 // processar estes logs).
00623                 //#ifdef RIO_DEBUG_EMUL
00624                 //RioErr << "DEVICEREAD " << ( int ) time( NULL ) << " " 
00625                 //       << Request->Size << endl;
00626                 //#endif       
00627                        
00628                 Request->Status = RequestStatusReadDone;
00629                 break;
00630             default:
00631                 RioErr << "Invalid Command code on Request: " << Command 
00632                        << endl;
00633                 RioErr << "Unexpected error on storage device thread. "
00634                        << "Aborting thread " << endl;
00635                 return 0;
00636         }
00637 
00638         //updates measures
00639         pthread_mutex_lock(&SDevice->m_Mutex);
00640 
00641         gettimeofday( &finaltime,0 );
00642         interval.tv_sec  = finaltime.tv_sec  - initialtime.tv_sec;
00643         interval.tv_usec = finaltime.tv_usec - initialtime.tv_usec;
00644         if( interval.tv_usec < 0)
00645         {
00646             interval.tv_sec -= 1;
00647             interval.tv_usec += 1000000;
00648         }
00649 
00650         sample = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0;
00651 
00652         //decrements number of active threads
00653         SDevice->m_ActiveDiskThreads--;
00654 
00655         if( SDevice->m_StartingServiceInterval == true )
00656         {
00657             SDevice->m_StartingServiceInterval = false;
00658         }
00659         else
00660         {
00661             interval.tv_sec  = finaltime.tv_sec  - SDevice->m_LastServiceTime.tv_sec;
00662             interval.tv_usec = finaltime.tv_usec - SDevice->m_LastServiceTime.tv_usec;
00663             if( interval.tv_usec < 0)
00664             {
00665                 interval.tv_sec -= 1;
00666                 interval.tv_usec += 1000000;
00667             }
00668 
00669             sample_servicetime = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0;
00670 
00671             //updates estimated service time of disk
00672             if( SDevice->m_EstServiceTime == 0)
00673             {
00674                 SDevice->m_EstServiceTime =  sample_servicetime;
00675             }
00676             else
00677             {
00678                 SDevice->m_EstServiceTime = ( 1 - ETParameter ) * SDevice->m_EstServiceTime
00679                                             + ETParameter * sample_servicetime;
00680             }
00681 
00682             //updates Deviation of estimated service time of disk
00683             SDevice->m_DevServiceTime = ( 1 - ETParameter ) * SDevice->m_DevServiceTime +
00684             ETParameter * fabs( sample_servicetime - SDevice->m_EstServiceTime );
00685 
00686             // updates estimated service time according threads
00687             if( SDevice->m_EstServiceTimeAccThreads[activeThreads] == 0)
00688             {
00689                 SDevice->m_EstServiceTimeAccThreads[activeThreads] = sample_servicetime;
00690             }
00691             else
00692             {
00693                 SDevice->m_EstServiceTimeAccThreads[activeThreads] =
00694                 ( 1 - ETParameter ) * SDevice->m_EstServiceTimeAccThreads[activeThreads]
00695                                                     + ETParameter * sample_servicetime;
00696             }
00697 
00698             //updates Deviation of estimated service time according number of threads
00699             SDevice->m_DevServiceTimeAccThreads[activeThreads] =
00700                 ( 1 - ETParameter ) * SDevice->m_DevServiceTimeAccThreads[activeThreads] +
00701             ETParameter * fabs( sample_servicetime - SDevice->m_EstServiceTimeAccThreads[activeThreads] );
00702 
00703             //updates average service time of disk
00704             average  = SDevice->m_AvgServiceTime_Model;
00705             SDevice->m_AvgServiceTime_Model = SDevice->m_AvgServiceTime_Model +
00706                 (( sample_servicetime - SDevice->m_AvgServiceTime_Model )/( SDevice->m_NSamples + 1 ));
00707 
00708             //updates variance of average service time of disk
00709             if( SDevice->m_NSamples >= 1 )
00710             {
00711                 SDevice->m_VarServiceTime_Model = (1 - (float) 1/SDevice->m_NSamples ) * SDevice->m_VarServiceTime_Model
00712                                 + (SDevice->m_NSamples + 1) * pow((SDevice->m_AvgServiceTime_Model - average), 2);
00713             }
00714             SDevice->m_NSamples++;
00715 
00716             //updates average service time according number of threads
00717             average = SDevice->m_AvgServiceTimeAccThreads[activeThreads];
00718             SDevice->m_AvgServiceTimeAccThreads[activeThreads] = SDevice->m_AvgServiceTimeAccThreads[activeThreads] +
00719             (( sample_servicetime - SDevice->m_AvgServiceTimeAccThreads[activeThreads])/( SDevice->m_NSamplesAccThreads[activeThreads] + 1 ));
00720 
00721             //updates variance of average service time according according number of threads
00722             if( SDevice->m_NSamplesAccThreads[activeThreads] >= 1 )
00723             {
00724                 SDevice->m_VarServiceTimeAccThreads[activeThreads] = (1 - (float) 1/SDevice->m_NSamplesAccThreads[activeThreads] ) *
00725                   SDevice->m_VarServiceTimeAccThreads[activeThreads]
00726                                 + (SDevice->m_NSamplesAccThreads[activeThreads] + 1) * pow((SDevice->m_AvgServiceTimeAccThreads[activeThreads] - average), 2);
00727             }
00728             SDevice->m_NSamplesAccThreads[activeThreads]++;
00729 
00730             //SDevice->m_log <<"Threads " << activeThreads << "\t" 
00731             //               << sample_servicetime << "\t"<< sample 
00732             //               << " finaltime " << finaltime.tv_sec  << " " <<finaltime.tv_usec
00733             //               << " m_LastServiceTime " << SDevice->m_LastServiceTime.tv_sec << " " 
00734             //               << SDevice->m_LastServiceTime.tv_usec 
00735             //               << endl;
00736         } // fim do else do if( SDevice->m_StartingServiceInterval == true )
00737 
00738         SDevice->m_LastServiceTime = finaltime;
00739 
00740         //updates estimated service time according number of threads
00741         if( SDevice->m_EstResponseTimeAccThreads[activeThreads] == 0)
00742         {
00743             SDevice->m_EstResponseTimeAccThreads[activeThreads] = sample;
00744         }
00745         else
00746         {
00747             SDevice->m_EstResponseTimeAccThreads[activeThreads] =
00748         ( 1 - ETParameter ) * SDevice->m_EstResponseTimeAccThreads[activeThreads]
00749                                             + ETParameter * sample;
00750         }
00751 
00752         //updates Deviation of estimated service time according number of threads
00753         SDevice->m_DevResponseTimeAccThreads[activeThreads] =
00754         ( 1 - ETParameter ) *  SDevice->m_DevResponseTimeAccThreads[activeThreads] +
00755         ETParameter * fabs( sample - SDevice->m_EstResponseTimeAccThreads[activeThreads] );
00756 
00757         SDevice->m_NRespTSamplesAccThreads[activeThreads]++;
00758 
00759         //updates estimated service time of disk
00760         if( SDevice->m_EstResponseTime == 0)
00761         {
00762             SDevice->m_EstResponseTime =  sample;
00763         }
00764         else
00765         {
00766             SDevice->m_EstResponseTime = ( 1 - ETParameter ) * SDevice->m_EstResponseTime +
00767                                         ETParameter * sample;
00768         }
00769 
00770         //updates Dev of estimated service time of disk
00771         SDevice->m_DevResponseTime = ( 1 - ETParameter ) * SDevice->m_DevResponseTime +
00772             ETParameter * fabs( sample - SDevice->m_EstResponseTime );
00773 
00774         SDevice->m_log<< "Thread "<< ThreadNumber << "("<< pthread_self() << ") saiu " <<endl;
00775 
00776         #ifdef RIO_DEBUG2
00777         RioErr << " ----Desbloqueando a thread principal do disco----" << endl;
00778         #endif
00779         pthread_cond_broadcast(&SDevice->m_Condition);
00780         pthread_mutex_unlock(&SDevice->m_Mutex);
00781 
00782         // Prepare status message to be sent to router if required
00783         StrMsg* Slot = NULL;
00784         if(( FlagStatus ) || ( stat < 0 ))
00785         {
00786             Slot = MsgManager.New();
00787             if( stat < 0 )
00788             {
00789                 RioErr << "Storage: IO Failed (";
00790                 switch( stat )
00791                 {
00792                     case VSI_ERROR_DEVICE_POSITION:
00793                         RioErr <<"VSI_ERROR_DEVICE_POSITION) ";
00794                         break;
00795                     case VSI_ERROR_DEVICE_SEEK:
00796                         RioErr <<"VSI_ERROR_DEVICE_SEEK) ";
00797                         break;
00798                     case VSI_ERROR_DEVICE_READ:
00799                         RioErr <<"VSI_ERROR_DEVICE_READ) ";
00800                 }
00801                 RioErr << "RouterId " << Request->RouterId << endl;
00802                 stat = ERROR_RSS_IO_FAILED;
00803             }
00804             if(( Command == MSG_RSS_WRITE ) || ( Command == MSG_RSS_FLUSH ))
00805             {
00806                 Slot->Msg.Storage.Status.Type  = MSG_RSS_WRITECOMPLETE;
00807             }
00808             else
00809             {
00810                 Slot->Msg.Storage.Status.Type  = MSG_RSS_READCOMPLETE;
00811             }
00812             Slot->Msg.Storage.Status.Size      = SizeMsgRSSstatus;
00813             Slot->Msg.Storage.Status.RouterId  = Request->RouterId;
00814             Slot->Msg.Storage.Status.StorageId = Request->Id;
00815             Slot->Msg.Storage.Status.Error     = stat;
00816             Slot->Msg.Storage.Status.ActiveThreads  = activeThreads;
00817         }
00818 
00819         // I/O done. should send request to client, wait for router command
00820         // (i.e. do nothing) or free request.
00821         switch( Command )
00822         {
00823             case MSG_RSS_WRITE:
00824                 // If write, we are done. Release request
00825                 RequestManager.Free(Request);
00826                 break;
00827             case MSG_RSS_FLUSH:
00828                 // If flush, we are done. Release request
00829                 RequestManager.Free(Request);
00830                 break;
00831             case MSG_RSS_READ:
00832                 // if read, do nothing now. Send data to client after status
00833                 // message is sent to router
00834                 break;
00835             case MSG_RSS_FETCH:
00836                 // For fetch need to update Request status before sending message
00837                 // to router to avoid race condition if the router reply arrives
00838                 // before setting the state
00839                 // Fetch. Keep request around
00840                 //RioErr << "Sending status (FETCH COMPLETE) to router " << endl;
00841                 Request->Status = RequestStatusWaitSendCommand;
00842                 break;
00843             default:
00844                 RioErr << "Invalid Command code on Request: " << Command 
00845                        << endl;
00846                 RioErr << "Unexpected error on storage device thread. "
00847                        << "Aborting thread" << endl;
00848                 return 0;
00849         }
00850 
00851         // Send message to router if required
00852         if(( FlagStatus) || (stat < 0) )
00853         {
00854             #ifdef RIO_DEBUG2
00855             RioErr <<"Sending status (sotoragedevice.blockthread) to router." 
00856                    << endl;
00857             #endif
00858 
00859             Router.Send(Slot);
00860         };
00861 
00862         // Wait to send read to client just after status is sent to
00863         // router. This avoids router receiving status out of order
00864         // (assuming network guarantees ordered delivery)
00865         if( Command == MSG_RSS_READ )
00866         {
00867             // Successfull Read. Send data to client
00868             if( stat >= 0 )
00869             {
00870                 Client.Send( Request );
00871             }
00872         }
00873     } //fim do while (1)
00874 
00875     Device.Close();
00876     pthread_exit( NULL );
00877 }

void CStorageDevice::GetDiskServiceTime ( double  averageservicetime[301]  ) 

Definition at line 892 of file storagedevice.cpp.

00893 {
00894     for( int aux = 0; aux < 301; aux++ )
00895     {
00896         averageservicetime[aux] = m_EstServiceTimeAccThreads[aux];
00897         //averageservicetime[aux] = m_AvgServiceTimeAccThreads[aux];
00898 
00899     }
00900     m_NumberOfSaveMeasures++;
00901 
00902 
00903     if( m_NumberOfSaveMeasures % 5 == 0 && m_CollectMeasures )
00904     {
00905         char filename[350];
00906         FILE *handle;
00907         sprintf( filename, "%s.%d", logname, m_NumberOfSaveMeasures );
00908 
00909         if( ( handle = fopen( filename, "w")) == NULL )
00910         {
00911             if( m_log.is_open() )
00912                 m_log << " Could not create logtimes!" << strerror ( errno )
00913                       << endl;
00914         }
00915         else
00916         {
00917             fprintf( handle, "# EstimatedServiceTime of Disk %10.10f msec\n", m_EstServiceTime );
00918             fprintf( handle, "# DeviationServiceTime %f\n", m_DevServiceTime );
00919             fprintf( handle, "\n# AverageServiceTime of Disk %10.10f msec\n", m_AvgServiceTime_Model );
00920             fprintf( handle, "# VarianceServiceTime %f \n",m_VarServiceTime_Model  );
00921             fprintf( handle, "# Number of Samples %lld \n",m_NSamples );
00922             fprintf( handle, "\n# EstimatedResponseTime of Disk %10.10f msec\n", m_EstResponseTime );
00923             fprintf( handle, "# DeviationResponseTime %f\n", m_DevResponseTime );
00924             fprintf( handle, "\n# Measures in msec.\n");
00925             fprintf( handle, "#\tEstServT\tDevServT\tAvgServT\tVarServT\tSamples\t\tEstRespT\tDevRespT\tNRespTSamples\n");
00926             for( int aux = 1; aux < 301; aux++ )
00927             {
00928                 fprintf( handle, "%3d\t%6.6f\t%6.6f\t%6.6f\t%6.6f\t%lld\t\t%6.6f\t%6.6f\t%lld\n",
00929                     aux,
00930                     m_EstServiceTimeAccThreads[aux],
00931                     m_DevServiceTimeAccThreads[aux],
00932                     m_AvgServiceTimeAccThreads[aux],
00933                     m_VarServiceTimeAccThreads[aux],
00934                     m_NSamplesAccThreads[aux],
00935                     m_EstResponseTimeAccThreads[aux],
00936                     m_DevResponseTimeAccThreads[aux],
00937                     m_NRespTSamplesAccThreads[aux]);
00938             }
00939             fclose( handle );
00940         }
00941     }
00942     return;
00943 }

double CStorageDevice::GetEstimatedTime (  ) 

Definition at line 946 of file storagedevice.cpp.

00947 {
00948     return m_EstServiceTime;
00949 }

int CStorageDevice::Initialize ( char *  DeviceName,
char *  ip,
int  nDiskThreads,
double  EstimatedParameter,
bool  CollectMeasures,
char *  LogsDirectory 
)

Definition at line 160 of file storagedevice.cpp.

00166 {
00167     char device_name[200] = "" , log[ MaxPathSize ] = ""; // 200
00168 
00169     // changes '/' by '_'
00170     int i = 0;
00171     while( DeviceName[i] != '\0' )
00172     {
00173         if( DeviceName[i] == '/' )
00174             device_name[i] = '_';
00175         else
00176             device_name[i] = DeviceName[i];
00177         i++;
00178         if( i == 190 ) break;
00179     }
00180 
00181     RioErr << "[StorageDevice] CollectMeasures: \t" << CollectMeasures
00182            << endl;
00183     if( CollectMeasures )
00184     {
00185         // Alteracao para armazenar os logs em um diretorio diferente daquele com
00186         // o codigo do storage server.
00187         if( LogsDirectory == NULL )
00188             // Arquivo armazenado no diretorio com o codigo do storage
00189             // server
00190             sprintf( log, "RIOdisk.%s.%s.log", ip, device_name );
00191         else if( LogsDirectory[ strlen( LogsDirectory ) - 1 ] != '/' ) 
00192             // Devemos colocar a "/" entre o nome do arquivo e do diretorio.
00193             sprintf( log, "%s/RIOdisk.%s.%s.log", LogsDirectory, ip, device_name );
00194         else
00195             // Nao devemos colocar a "/" entre o nome do arquivo e do diretorio.
00196             sprintf( log, "%sRIOdisk.%s.%s.log", LogsDirectory, ip, device_name );
00197         logopen = 1;
00198     }
00199 
00200     if( m_Status == DeviceActive )
00201     {
00202         return ERROR_SS_PROGRAM_ERROR;
00203     }
00204 
00205     m_Status = DeviceActive;
00206 
00207     int stat;
00208 
00209     stat = m_Device.Open( DeviceName );
00210     if(stat < 0)
00211     {
00212         Stop();
00213         return ERROR_SS_INVALID_DEVICE;
00214     }
00215 
00216     // Save device name for message trace purposes
00217     m_Name = DeviceName;
00218     m_MaxActiveDiskThreads = nDiskThreads;
00219 
00220     if( CollectMeasures )
00221     {
00222         m_log.open( log );
00223         m_log << "Device Name : " << m_Name <<endl;
00224         m_log << "Max Active Threads : " <<  m_MaxActiveDiskThreads  <<endl;
00225     }
00226 
00227     // Alteracao para armazenar os logs em um diretorio diferente daquele com
00228     // o codigo do storage server.
00229     if( LogsDirectory == NULL )
00230         // Arquivo armazenado no diretorio com o codigo do storage server
00231         sprintf( logname, "RIOdisk.%s.%s.log.times", ip, device_name );
00232     else if( LogsDirectory[ strlen( LogsDirectory ) - 1 ] != '/' ) 
00233         // Devemos colocar a "/" entre o nome do arquivo e do diretorio.
00234         sprintf( logname, "%s/RIOdisk.%s.%s.log.times", LogsDirectory, ip, 
00235                  device_name );
00236     else
00237         // Nao devemos colocar a "/" entre o nome do arquivo e do diretorio.
00238         sprintf( logname, "%sRIOdisk.%s.%s.log.times", LogsDirectory, ip, 
00239                  device_name );
00240 
00241     stat = m_Thread.Create(Thread,(void*) this);
00242     if( stat < 0 )
00243     {
00244         Stop();
00245         return ERROR_SS_THREAD_CREATE;
00246     }
00247 
00248     m_CollectMeasures    = CollectMeasures;
00249     m_EstimatedParameter = EstimatedParameter;
00250 
00251     if( CollectMeasures )
00252     {
00253         m_log << "CollectMeasures \t"        << m_CollectMeasures    <<endl;
00254         m_log << "EstimatedTimeParameter \t" << m_EstimatedParameter <<endl;
00255     }
00256 
00257     for( int i = 0; i < m_MaxActiveDiskThreads; i++ )
00258     {
00259         pthread_t thread_id;
00260         pthread_attr_t attr;
00261         ThreadParameter *Param = new ThreadParameter;
00262         Param->DeviceName     = m_Name;
00263         Param->DeviceSize     = m_Device.Size();
00264         Param->Device         = this;
00265         Param->ThreadNumber   = i;
00266 
00267         pthread_attr_init( &attr );
00268         pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
00269         pthread_attr_setstacksize( &attr, 2*PTHREAD_STACK_MIN );
00270         if((pthread_create( &thread_id, &attr, &BlockThread,(void*) Param )) != 0 )
00271         {
00272             RioErr << "Storage: Could not start BlockThread " << i << "("
00273                    << strerror(errno) << ")!"<<endl;
00274 
00275             return ERROR_SS_THREAD_CREATE;
00276         }                                
00277     }
00278     LoadMeasures( logname );
00279     return 0;
00280 }

int CStorageDevice::Initialize ( char *  DeviceName  ) 

Definition at line 124 of file storagedevice.cpp.

00125 {
00126     if( m_Status == DeviceActive )
00127     {
00128         return ERROR_SS_PROGRAM_ERROR;
00129     }
00130 
00131     m_Status = DeviceActive;
00132 
00133     int stat;
00134     stat = m_Device.Open( DeviceName );
00135     if(stat < 0)
00136     {
00137         Stop();
00138         return ERROR_SS_INVALID_DEVICE;
00139     }
00140 
00141     stat = m_Thread.Create(Thread,(void*) this);
00142     if( stat < 0 )
00143     {
00144         Stop();
00145         return ERROR_SS_THREAD_CREATE;
00146     }
00147 
00148     // Save device name for message trace purposes
00149     m_Name = DeviceName;
00150 
00151     return 0;
00152 }

void CStorageDevice::LoadMeasures ( char *  filename  ) 

Definition at line 951 of file storagedevice.cpp.

00952 {
00953         FILE *handle;
00954         int i;
00955 
00956         if( ( handle = fopen( filename, "r")) == NULL )
00957         {
00958             if( m_log.is_open() )
00959                 m_log << " Could not read logtimes(" << filename << ")!"
00960                       << strerror ( errno )<< endl;
00961         }
00962         else
00963         {
00964             if( fscanf( handle, "# EstimatedServiceTime of Disk %lf msec\n", 
00965                 &m_EstServiceTime ) != 1 )
00966             {
00967                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00968                        << "ler o arquivo " << filename << endl;
00969             }
00970             if( fscanf( handle, "# DeviationServiceTime %lf\n", 
00971                 &m_DevServiceTime ) != 1 )
00972             {
00973                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00974                        << "ler o arquivo " << filename << endl;
00975             }
00976             if( fscanf( handle, "\n# AverageServiceTime of Disk %lf msec\n", 
00977                 &m_AvgServiceTime_Model ) != 1 )
00978             {
00979                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00980                        << "ler o arquivo " << filename << endl;
00981             }
00982             if( fscanf( handle, "# VarianceServiceTime %lf \n",
00983                 &m_VarServiceTime_Model  ) != 1 )
00984             {
00985                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00986                        << "ler o arquivo " << filename << endl;
00987             }
00988             if( fscanf( handle, "# Number of Samples %lld \n",
00989                 &m_NSamples ) != 1 )
00990             {
00991                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00992                        << "ler o arquivo " << filename << endl;
00993             }
00994             if( fscanf( handle, "\n# EstimatedResponseTime of Disk %lf msec\n", 
00995                 &m_EstResponseTime ) != 1 )
00996             {
00997                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
00998                        << "ler o arquivo " << filename << endl;
00999             }
01000             if( fscanf( handle, "# DeviationResponseTime %lf\n", 
01001                 &m_DevResponseTime ) != 1 )
01002             {
01003                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01004                        << "ler o arquivo " << filename << endl;
01005             }
01006             if( fscanf( handle, "\n# Measures in msec.\n") != 0 )
01007             {
01008                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01009                        << "ler o arquivo " << filename << endl;
01010             }
01011             if( fscanf( handle, "#\tEstServT\tDevServT\tAvgServT\tVarServT\tSamples\t\tEstRespT\tDevRespT\tNRespTSamples\n") 
01012                 != 0 )
01013             {
01014                 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01015                        << "ler o arquivo " << filename << endl;
01016             }
01017             for( int aux = 1; (aux < 301) && (!feof( handle )); aux++ )
01018             {
01019                 if( fscanf( handle, "%3d\t%lf\t%lf\t%lf\t%lf\t%lld\t\t%lf\t%lf\t%lld\n",
01020                     &i,
01021                     &m_EstServiceTimeAccThreads[aux],
01022                     &m_DevServiceTimeAccThreads[aux],
01023                     &m_AvgServiceTimeAccThreads[aux],
01024                     &m_VarServiceTimeAccThreads[aux],
01025                     &m_NSamplesAccThreads[aux],
01026                     &m_EstResponseTimeAccThreads[aux],
01027                     &m_DevResponseTimeAccThreads[aux],
01028                     &m_NRespTSamplesAccThreads[aux] ) != 9 )
01029                 {
01030                     RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao "
01031                            << "ler o arquivo " << filename << endl;
01032                 }
01033                         
01034             }
01035             fclose( handle );
01036         }
01037 
01038     return;
01039 }

void CStorageDevice::ProcessRequest ( StrRequest Request  ) 

Definition at line 301 of file storagedevice.cpp.

00302 {
00303     #ifdef RIO_DEBUG2
00304     struct in_addr client_IP;
00305     struct sockaddr_in client_PORT;
00306     if( Request != NULL )
00307     {
00308         client_IP.s_addr = Request->IPaddress;
00309         client_PORT.sin_port = Request->Port;
00310         RioErr << " ----A requisicao abaixo sera colocada da fila do disco----"
00311                << endl;
00312         RioErr << " ClientId: " << Request->ClientId
00313                << endl;
00314         RioErr << " IPaddress " << inet_ntoa( client_IP )
00315                << endl;
00316         RioErr << " Port " << htons(client_PORT.sin_port)
00317                << endl;
00318         RioErr << " Request->Disk " << Request->Disk << endl;
00319     }
00320     else
00321         RioErr << " ----A requisicao sera nula colocada da fila do disco----" 
00322                << endl;
00323     #endif
00324 
00325     m_Queue.Put(Request);
00326 
00327     #ifdef RIO_DEBUG2
00328     if( Request != NULL )
00329     {
00330         client_IP.s_addr = Request->IPaddress;
00331         client_PORT.sin_port = Request->Port;
00332         RioErr << " ----A requisicao abaixo foi colocada com sucesso na fila "
00333                << "do disco----" << endl;
00334         RioErr << " ClientId: " << Request->ClientId
00335                << endl;
00336         RioErr << " IPaddress " << inet_ntoa( client_IP )
00337                << endl;
00338         RioErr << " Port " << htons(client_PORT.sin_port)
00339                << endl;
00340         RioErr << " Request->Disk " << Request->Disk << endl;
00341     }
00342     else
00343         RioErr << " ----A requisicao nula foi colocada da fila do disco----" 
00344                << endl;
00345     #endif
00346 }

int CStorageDevice::SectorSize (  ) 

Definition at line 880 of file storagedevice.cpp.

00881 {
00882     return m_Device.SectorSize();
00883 }

u64 CStorageDevice::Size (  ) 

Definition at line 886 of file storagedevice.cpp.

00887 {
00888     return m_Device.Size();
00889 }

void CStorageDevice::Stop ( void   ) 

Definition at line 284 of file storagedevice.cpp.

00285 {
00286     // change status to cause threads to exit
00287     m_Status = DeviceInactive;
00288 
00289     // Make sure thread return from Get() on request queue.
00290     m_Queue.Signal();
00291 
00292     m_Thread.Join(0);
00293 
00294     if( logopen == 1 ) m_log.close();
00295 }

void * CStorageDevice::Thread ( void *  Param  )  [static, private]

Definition at line 351 of file storagedevice.cpp.

00352 {
00353     int i;
00354 
00355     // Get this class pointer
00356     CStorageDevice* Device = (CStorageDevice*) Param;
00357 
00358     // Get members of this class
00359     CRequestQueue &m_Queue = Device->m_Queue;
00360     int &m_Status          = Device->m_Status;
00361 
00362     if( Device->m_log.is_open() )
00363     {
00364         ofstream &m_log        = Device->m_log;
00365 
00366         m_log << "Device Thread running ..." << endl;
00367     }
00368     
00369     RioErr << "DEVICETHREADID " << syscall( SYS_gettid ) << endl;
00370 
00371     // Loop forever
00372     while( 1 )
00373     {
00374         pthread_mutex_lock(&Device->m_Mutex);
00375         if( Device->m_ActiveDiskThreads < Device->m_MaxActiveDiskThreads )
00376         {
00377 
00378             pthread_mutex_unlock(&Device->m_Mutex);
00379 
00380             #ifdef RIO_DEBUG2
00381             // Variavel booleana usada para indicar que o pedido foi atribuido
00382             // a alguma thread de bloco.
00383             bool isServed = false;
00384             #endif
00385 
00386             // Wait and get next request
00387 
00388             #ifdef RIO_DEBUG2
00389             RioErr << " ----Tentando obter e esperando, se necessario, uma "
00390                    << "requisicao da fila----" << endl;
00391             #endif
00392             StrRequest* Request = m_Queue.Get();
00393 
00394             #ifdef RIO_DEBUG2
00395             struct in_addr client_IP;
00396             struct sockaddr_in client_PORT;
00397             if( Request != NULL )
00398             {
00399                 client_IP.s_addr = Request->IPaddress;
00400                 client_PORT.sin_port = Request->Port;
00401                 RioErr << " ----A requisicao abaixo foi removida da fila----"
00402                        << endl;
00403                 RioErr << " ClientId: " << Request->ClientId
00404                        << endl;
00405                 RioErr << " IPaddress " << inet_ntoa( client_IP )
00406                        << endl;
00407                 RioErr << " Port " << htons(client_PORT.sin_port)
00408                        << endl;
00409                 RioErr << " Request->Disk " << Request->Disk << endl;
00410             }
00411             else
00412                 RioErr << " ----A requisicao nula foi removida da fila----"
00413                        << endl;
00414             #endif
00415 
00416             if(( m_Status != DeviceActive ) || ( Request == 0 ))
00417             {
00418                 if(m_Status != DeviceActive)
00419                     RioErr << "Storage: loop: Interface is not active!" << endl;
00420                 else
00421                     RioErr << "Storage: loop: Request = 0!" << endl;
00422                 return( 0 );
00423             }
00424 
00425             pthread_mutex_lock(&Device->m_Mutex);
00426             //increments number of active threads
00427             Device->m_ActiveDiskThreads++;
00428             if(  Device->m_ActiveDiskThreads == 1 )
00429                 Device->m_StartingServiceInterval = true;
00430 
00431             //Device->m_log << "vai pegar mutex "<<endl ;
00432             pthread_mutex_unlock( &Device->m_Mutex );
00433 
00434             for( i = 0; i < Device->m_MaxActiveDiskThreads; i++ )
00435             {
00436                 if( Device->m_RequestsForThreads[ i ].Available == true )
00437                 {
00438                     pthread_mutex_lock( &Device->m_MutexThreads[i] );
00439                     Device->m_RequestsForThreads[ i ].Request = Request;
00440                     Device->m_RequestsForThreads[ i ].Available = false;
00441                     Device->m_RequestsForThreads[ i ].ActiveThreads =
00442                         Device->m_ActiveDiskThreads;
00443 
00444                     #ifdef RIO_DEBUG2
00445                     RioErr << " ----Liberando thread no storagedevice para "
00446                            << " a requisicao abaixo----" << endl;
00447                     RioErr << " ClientId: " << Request->ClientId
00448                            << endl;
00449                     RioErr << " IPaddress " << inet_ntoa( client_IP )
00450                            << endl;
00451                     RioErr << " Port " << htons(client_PORT.sin_port)
00452                            << endl;
00453                     RioErr << " Request->Disk " << Request->Disk << endl;
00454 
00455                     isServed = true;
00456                     #endif
00457 
00458                     pthread_cond_broadcast(&Device->m_ConditionNew[i]);
00459                     pthread_mutex_unlock( &Device->m_MutexThreads[i] );
00460                     break;
00461                 }
00462             }
00463             #ifdef RIO_DEBUG2
00464             if( !isServed )
00465             {
00466                 RioErr << " ----Aviso: a requisicao abaixo foi ignorada, pois "
00467                        << "incorretamente achamos que existiam de disco "
00468                        << "disponiveis! threads ativas = "
00469                        << Device->m_ActiveDiskThreads << " e threads totais = "
00470                        << Device->m_MaxActiveDiskThreads << " ----" << endl;
00471                 RioErr << " ClientId: " << Request->ClientId << endl;
00472                 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl;
00473                 RioErr << " Port " << htons( client_PORT.sin_port ) << endl;
00474                 RioErr << " Request->Disk " << Request->Disk << endl;
00475             }
00476             #endif
00477         }
00478         else // else do if( Device->m_ActiveDiskThreads < Device->m_MaxActiv...
00479         {
00480             RioErr << "Storage: Waiting Available Thread!" << endl;
00481             pthread_cond_wait( &Device->m_Condition,&Device->m_Mutex );
00482             pthread_mutex_unlock( &Device->m_Mutex );
00483         }
00484     } // fim do while( 1 )*/
00485 }


Field Documentation

char CStorageDevice::logname[MaxPathSize]

Definition at line 87 of file storagedevice.h.

Definition at line 88 of file storagedevice.h.

Definition at line 94 of file storagedevice.h.

Definition at line 108 of file storagedevice.h.

Definition at line 111 of file storagedevice.h.

Definition at line 116 of file storagedevice.h.

Definition at line 91 of file storagedevice.h.

Definition at line 93 of file storagedevice.h.

pthread_cond_t CStorageDevice::m_ConditionNew[300]

Definition at line 92 of file storagedevice.h.

Definition at line 124 of file storagedevice.h.

Definition at line 103 of file storagedevice.h.

Definition at line 105 of file storagedevice.h.

Definition at line 98 of file storagedevice.h.

Definition at line 100 of file storagedevice.h.

Definition at line 115 of file storagedevice.h.

Definition at line 102 of file storagedevice.h.

Definition at line 104 of file storagedevice.h.

Definition at line 97 of file storagedevice.h.

Definition at line 99 of file storagedevice.h.

Definition at line 117 of file storagedevice.h.

Definition at line 86 of file storagedevice.h.

Definition at line 89 of file storagedevice.h.

pthread_mutex_t CStorageDevice::m_Mutex

Definition at line 90 of file storagedevice.h.

pthread_mutex_t CStorageDevice::m_MutexThreads[300]

Definition at line 90 of file storagedevice.h.

char* CStorageDevice::m_Name [private]

Definition at line 127 of file storagedevice.h.

unsigned long long int CStorageDevice::m_NRespTSamplesAccThreads[301]

Definition at line 106 of file storagedevice.h.

unsigned long long int CStorageDevice::m_NSamples

Definition at line 110 of file storagedevice.h.

unsigned long long int CStorageDevice::m_NSamplesAccThreads[301]

Definition at line 113 of file storagedevice.h.

Definition at line 128 of file storagedevice.h.

Definition at line 125 of file storagedevice.h.

Definition at line 95 of file storagedevice.h.

Definition at line 118 of file storagedevice.h.

int CStorageDevice::m_Status [private]

Definition at line 126 of file storagedevice.h.

Definition at line 123 of file storagedevice.h.

Definition at line 109 of file storagedevice.h.

Definition at line 112 of file storagedevice.h.


The documentation for this class was generated from the following files:
Generated on Wed Jul 4 16:03:34 2012 for RIO by  doxygen 1.6.3