#include <storagedevice.h>
Public Member Functions | |
CStorageDevice () | |
~CStorageDevice () | |
int | Initialize (char *DeviceName) |
void | Stop () |
void | ProcessRequest (StrRequest *Request) |
int | SectorSize () |
u64 | Size () |
int | Initialize (char *DeviceName, char *ip, int nDiskThreads, double EstimatedParameter, bool CollectMeasures, char *LogsDirectory) |
void | GetDiskServiceTime (double averageservicetime[301]) |
double | GetEstimatedTime () |
void | LoadMeasures (char *filename) |
Data Fields | |
ofstream | m_log |
char | logname [MaxPathSize] |
int | logopen |
int | m_MaxActiveDiskThreads |
pthread_mutex_t | m_Mutex |
pthread_mutex_t | m_MutexThreads [300] |
pthread_cond_t | m_Condition |
pthread_cond_t | m_ConditionNew [300] |
pthread_cond_t | m_ConditionExitThread |
int | m_ActiveDiskThreads |
ThreadRequestParameter | m_RequestsForThreads [300] |
double | m_EstServiceTime |
double | m_DevServiceTime |
double | m_EstServiceTimeAccThreads [301] |
double | m_DevServiceTimeAccThreads [301] |
double | m_EstResponseTime |
double | m_DevResponseTime |
double | m_EstResponseTimeAccThreads [301] |
double | m_DevResponseTimeAccThreads [301] |
unsigned long long int | m_NRespTSamplesAccThreads [301] |
double | m_AvgServiceTime_Model |
double | m_VarServiceTime_Model |
unsigned long long int | m_NSamples |
double | m_AvgServiceTimeAccThreads [301] |
double | m_VarServiceTimeAccThreads [301] |
unsigned long long int | m_NSamplesAccThreads [301] |
double | m_EstimatedParameter |
bool | m_CollectMeasures |
struct timeval | m_LastServiceTime |
bool | m_StartingServiceInterval |
Static Private Member Functions | |
static void * | Thread (void *Param) |
static void * | BlockThread (void *Param) |
Private Attributes | |
CvsiThread | m_Thread |
CvsiDevice | m_Device |
CRequestQueue | m_Queue |
int | m_Status |
char * | m_Name |
unsigned int | m_NumberOfSaveMeasures |
Definition at line 58 of file storagedevice.h.
CStorageDevice::CStorageDevice | ( | ) |
Definition at line 57 of file storagedevice.cpp.
00058 { 00059 /* Public variables */ 00060 logopen = 0; 00061 m_MaxActiveDiskThreads = 1; 00062 m_ActiveDiskThreads = 0; 00063 00064 m_EstServiceTime = 0.0; 00065 m_DevServiceTime = 0.0; 00066 memset(m_EstServiceTimeAccThreads,0,sizeof(m_EstServiceTimeAccThreads)); 00067 memset(m_DevServiceTimeAccThreads,0,sizeof(m_DevServiceTimeAccThreads)); 00068 00069 m_EstResponseTime = 0.0; 00070 m_DevResponseTime = 0.0; 00071 memset(m_EstResponseTimeAccThreads,0,sizeof(m_EstResponseTimeAccThreads)); 00072 memset(m_DevResponseTimeAccThreads,0,sizeof(m_DevResponseTimeAccThreads)); 00073 memset(m_NRespTSamplesAccThreads,0,sizeof(m_NRespTSamplesAccThreads)); 00074 00075 m_AvgServiceTime_Model = 0; 00076 m_VarServiceTime_Model = 0; 00077 m_NSamples = 0; 00078 memset(m_AvgServiceTimeAccThreads,0,sizeof(m_AvgServiceTimeAccThreads)); 00079 memset(m_VarServiceTimeAccThreads,0,sizeof(m_VarServiceTimeAccThreads)); 00080 memset(m_NSamplesAccThreads,0,sizeof(m_NSamplesAccThreads)); 00081 00082 m_EstimatedParameter = 0; 00083 m_CollectMeasures = false; 00084 m_StartingServiceInterval = false; 00085 00086 /* Private variables */ 00087 m_Status = DeviceInactive; 00088 m_Name = NULL; 00089 m_NumberOfSaveMeasures = 0; 00090 00091 for( int i=0; i < 300; i++ ) 00092 { 00093 m_RequestsForThreads[ i ].Request = NULL; 00094 m_RequestsForThreads[ i ].ActiveThreads = 0; 00095 m_RequestsForThreads[ i ].Available = true; 00096 // Inicializa o mutex e a variavel de condicao usada pela thread "i". 00097 pthread_mutex_init( &m_MutexThreads[ i ], NULL ); 00098 pthread_cond_init( &m_ConditionNew[ i ], NULL ); 00099 } 00100 00101 // Inicializa os mutexes e as variaveis de condicao (principais). 00102 pthread_mutex_init( &m_Mutex, NULL ); 00103 pthread_cond_init( &m_Condition, NULL ); 00104 pthread_cond_init( &m_ConditionExitThread, NULL ); 00105 }
CStorageDevice::~CStorageDevice | ( | ) |
Definition at line 108 of file storagedevice.cpp.
00109 { 00110 // Destroi os mutexes e as variaveis de condicao usada pelas threads. 00111 for( int i=0; i < 300; i++ ) 00112 { 00113 pthread_mutex_destroy( &m_MutexThreads[ i ] ); 00114 pthread_cond_destroy( &m_ConditionNew[ i ] ); 00115 } 00116 // Destroi os mutexes e as variaveis de condicao (principais). 00117 pthread_mutex_destroy( &m_Mutex ); 00118 pthread_cond_destroy( &m_Condition ); 00119 pthread_cond_destroy( &m_ConditionExitThread ); 00120 }
void * CStorageDevice::BlockThread | ( | void * | Param | ) | [static, private] |
Definition at line 487 of file storagedevice.cpp.
00488 { 00489 ThreadParameter *BParam = ( ThreadParameter * ) Param; 00490 00491 char *devicename = BParam->DeviceName; 00492 int ThreadNumber = BParam->ThreadNumber; 00493 u64 size = BParam->DeviceSize; 00494 CStorageDevice* SDevice = BParam->Device; 00495 double ETParameter = BParam->Device->m_EstimatedParameter; 00496 int activeThreads = 0; 00497 int FlagStatus; 00498 double average, sample, sample_servicetime; 00499 u16 Command; 00500 struct timeval initialtime, finaltime, interval; 00501 00502 CvsiDevice Device; 00503 StrRequest* Request; 00504 00505 int stat = Device.OpenDeviceAgain( devicename, size ); 00506 if( stat < 0 ) 00507 { 00508 RioErr << "BlockThread: Could not open device "<< devicename << endl; 00509 return( (void *)ERROR_SS_THREAD_CREATE ); 00510 } 00511 00512 RioErr << "BLOCKTHREADID " << syscall( SYS_gettid ) << endl; 00513 00514 SDevice->m_log << "Thread N " << ThreadNumber << endl; 00515 while( 1 ) 00516 { 00517 Request = NULL; 00518 pthread_mutex_lock( &SDevice->m_MutexThreads[ThreadNumber] ); 00519 //SDevice->m_log << "Thread " << ThreadNumber << " esperando "<<endl ; 00520 if( SDevice->m_RequestsForThreads[ ThreadNumber ].Available == false ) 00521 { 00522 Request = SDevice->m_RequestsForThreads[ ThreadNumber ].Request; 00523 SDevice->m_RequestsForThreads[ ThreadNumber ].Available = true; 00524 activeThreads = SDevice->m_RequestsForThreads[ ThreadNumber ].ActiveThreads; 00525 SDevice->m_log << "Thread " << ThreadNumber << "(" << pthread_self() << ") entrou "<<endl ; 00526 00527 #ifdef RIO_DEBUG2 00528 { 00529 struct in_addr client_IP; 00530 client_IP.s_addr = Request->IPaddress; 00531 struct sockaddr_in client_PORT; 00532 client_PORT.sin_port = Request->Port; 00533 00534 RioErr << " ----Thread " << pthread_self() 00535 << " atendendo requisicao abaixo----" << endl; 00536 RioErr << " ClientId: " << Request->ClientId << endl; 00537 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl; 00538 RioErr << " Port " << htons(client_PORT.sin_port) << endl; 00539 RioErr << " Request->Disk " << Request->Disk << endl; 00540 } 00541 #endif 00542 } 00543 else 00544 { 00545 00546 #ifdef RIO_DEBUG2 00547 RioErr << " ----Esperando em m_ConditionNew[" << ThreadNumber << "]----" 00548 << endl; 00549 #endif 00550 00551 pthread_cond_wait( &SDevice->m_ConditionNew[ThreadNumber], 00552 &SDevice->m_MutexThreads[ThreadNumber] ); 00553 if( SDevice->m_RequestsForThreads[ ThreadNumber ].Available == 00554 false 00555 ) 00556 { 00557 Request = SDevice->m_RequestsForThreads[ ThreadNumber ].Request; 00558 SDevice->m_RequestsForThreads[ ThreadNumber ].Available = true; 00559 activeThreads = SDevice->m_RequestsForThreads[ ThreadNumber ].ActiveThreads; 00560 SDevice->m_log << "Thread " << ThreadNumber << "(" << pthread_self() << ") entrou "<<endl ; 00561 } 00562 } 00563 pthread_mutex_unlock(&SDevice->m_MutexThreads[ThreadNumber]); 00564 00565 if( Request == NULL ) 00566 { 00567 break; 00568 } 00569 00570 Command = Request->Command; 00571 00572 // Test if this I/O will require a status message to be sent to router. 00573 if(( Command & 0xf ) == MSG_RSS_READ ) 00574 { 00575 // if command is read this is the first of two steps 00576 // Send status message only if bit RSS_INTERMEDIATE is set 00577 if (Command & RSS_INTERMEDIATE) 00578 FlagStatus = 1; 00579 else 00580 FlagStatus = 0; 00581 } 00582 else 00583 { 00584 // If this something else (not "read") this is the last step 00585 // Send status message if bit RSS_NOCOMPLETE is not set 00586 if( Command & RSS_NOCOMPLETE ) 00587 FlagStatus = 0; 00588 else 00589 FlagStatus = 1; 00590 } 00591 00592 // Reset flag bits on command 00593 Command = Command & 0xf; 00594 00595 switch( Command ) 00596 { 00597 case MSG_RSS_WRITE: 00598 case MSG_RSS_FLUSH: 00599 Request->Status = RequestStatusWrite; 00600 gettimeofday( &initialtime,0 ); 00601 stat = Device.Write(Request->Pos,Request->Size,Request->Data); 00602 00603 // Esta parte foi comentada pois ainda nao esta sendo usada pela 00604 // emulacao mas futuramente pretendemos criar um script para 00605 // usar esta informacao. 00606 //#ifdef RIO_DEBUG_EMUL 00607 //RioErr << "DEVICEWRITE " << ( int ) time( NULL ) << " " 00608 // << Request->Size << endl; 00609 //#endif 00610 00611 Request->Status = RequestStatusWriteDone; 00612 break; 00613 case MSG_RSS_READ: 00614 case MSG_RSS_FETCH: 00615 Request->Status = RequestStatusRead; 00616 gettimeofday( &initialtime,0 ); 00617 stat = Device.Read(Request->Pos,Request->Size,Request->Data); 00618 00619 // Esta impressao foi comentada porque estes logs ainda nao sao 00620 // necessarios. Se for necessario usar estes logs, esta linha do 00621 // codigo pode ser descomentada (ja existe um script para 00622 // processar estes logs). 00623 //#ifdef RIO_DEBUG_EMUL 00624 //RioErr << "DEVICEREAD " << ( int ) time( NULL ) << " " 00625 // << Request->Size << endl; 00626 //#endif 00627 00628 Request->Status = RequestStatusReadDone; 00629 break; 00630 default: 00631 RioErr << "Invalid Command code on Request: " << Command 00632 << endl; 00633 RioErr << "Unexpected error on storage device thread. " 00634 << "Aborting thread " << endl; 00635 return 0; 00636 } 00637 00638 //updates measures 00639 pthread_mutex_lock(&SDevice->m_Mutex); 00640 00641 gettimeofday( &finaltime,0 ); 00642 interval.tv_sec = finaltime.tv_sec - initialtime.tv_sec; 00643 interval.tv_usec = finaltime.tv_usec - initialtime.tv_usec; 00644 if( interval.tv_usec < 0) 00645 { 00646 interval.tv_sec -= 1; 00647 interval.tv_usec += 1000000; 00648 } 00649 00650 sample = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0; 00651 00652 //decrements number of active threads 00653 SDevice->m_ActiveDiskThreads--; 00654 00655 if( SDevice->m_StartingServiceInterval == true ) 00656 { 00657 SDevice->m_StartingServiceInterval = false; 00658 } 00659 else 00660 { 00661 interval.tv_sec = finaltime.tv_sec - SDevice->m_LastServiceTime.tv_sec; 00662 interval.tv_usec = finaltime.tv_usec - SDevice->m_LastServiceTime.tv_usec; 00663 if( interval.tv_usec < 0) 00664 { 00665 interval.tv_sec -= 1; 00666 interval.tv_usec += 1000000; 00667 } 00668 00669 sample_servicetime = interval.tv_sec * 1000.0 + interval.tv_usec / 1000.0; 00670 00671 //updates estimated service time of disk 00672 if( SDevice->m_EstServiceTime == 0) 00673 { 00674 SDevice->m_EstServiceTime = sample_servicetime; 00675 } 00676 else 00677 { 00678 SDevice->m_EstServiceTime = ( 1 - ETParameter ) * SDevice->m_EstServiceTime 00679 + ETParameter * sample_servicetime; 00680 } 00681 00682 //updates Deviation of estimated service time of disk 00683 SDevice->m_DevServiceTime = ( 1 - ETParameter ) * SDevice->m_DevServiceTime + 00684 ETParameter * fabs( sample_servicetime - SDevice->m_EstServiceTime ); 00685 00686 // updates estimated service time according threads 00687 if( SDevice->m_EstServiceTimeAccThreads[activeThreads] == 0) 00688 { 00689 SDevice->m_EstServiceTimeAccThreads[activeThreads] = sample_servicetime; 00690 } 00691 else 00692 { 00693 SDevice->m_EstServiceTimeAccThreads[activeThreads] = 00694 ( 1 - ETParameter ) * SDevice->m_EstServiceTimeAccThreads[activeThreads] 00695 + ETParameter * sample_servicetime; 00696 } 00697 00698 //updates Deviation of estimated service time according number of threads 00699 SDevice->m_DevServiceTimeAccThreads[activeThreads] = 00700 ( 1 - ETParameter ) * SDevice->m_DevServiceTimeAccThreads[activeThreads] + 00701 ETParameter * fabs( sample_servicetime - SDevice->m_EstServiceTimeAccThreads[activeThreads] ); 00702 00703 //updates average service time of disk 00704 average = SDevice->m_AvgServiceTime_Model; 00705 SDevice->m_AvgServiceTime_Model = SDevice->m_AvgServiceTime_Model + 00706 (( sample_servicetime - SDevice->m_AvgServiceTime_Model )/( SDevice->m_NSamples + 1 )); 00707 00708 //updates variance of average service time of disk 00709 if( SDevice->m_NSamples >= 1 ) 00710 { 00711 SDevice->m_VarServiceTime_Model = (1 - (float) 1/SDevice->m_NSamples ) * SDevice->m_VarServiceTime_Model 00712 + (SDevice->m_NSamples + 1) * pow((SDevice->m_AvgServiceTime_Model - average), 2); 00713 } 00714 SDevice->m_NSamples++; 00715 00716 //updates average service time according number of threads 00717 average = SDevice->m_AvgServiceTimeAccThreads[activeThreads]; 00718 SDevice->m_AvgServiceTimeAccThreads[activeThreads] = SDevice->m_AvgServiceTimeAccThreads[activeThreads] + 00719 (( sample_servicetime - SDevice->m_AvgServiceTimeAccThreads[activeThreads])/( SDevice->m_NSamplesAccThreads[activeThreads] + 1 )); 00720 00721 //updates variance of average service time according according number of threads 00722 if( SDevice->m_NSamplesAccThreads[activeThreads] >= 1 ) 00723 { 00724 SDevice->m_VarServiceTimeAccThreads[activeThreads] = (1 - (float) 1/SDevice->m_NSamplesAccThreads[activeThreads] ) * 00725 SDevice->m_VarServiceTimeAccThreads[activeThreads] 00726 + (SDevice->m_NSamplesAccThreads[activeThreads] + 1) * pow((SDevice->m_AvgServiceTimeAccThreads[activeThreads] - average), 2); 00727 } 00728 SDevice->m_NSamplesAccThreads[activeThreads]++; 00729 00730 //SDevice->m_log <<"Threads " << activeThreads << "\t" 00731 // << sample_servicetime << "\t"<< sample 00732 // << " finaltime " << finaltime.tv_sec << " " <<finaltime.tv_usec 00733 // << " m_LastServiceTime " << SDevice->m_LastServiceTime.tv_sec << " " 00734 // << SDevice->m_LastServiceTime.tv_usec 00735 // << endl; 00736 } // fim do else do if( SDevice->m_StartingServiceInterval == true ) 00737 00738 SDevice->m_LastServiceTime = finaltime; 00739 00740 //updates estimated service time according number of threads 00741 if( SDevice->m_EstResponseTimeAccThreads[activeThreads] == 0) 00742 { 00743 SDevice->m_EstResponseTimeAccThreads[activeThreads] = sample; 00744 } 00745 else 00746 { 00747 SDevice->m_EstResponseTimeAccThreads[activeThreads] = 00748 ( 1 - ETParameter ) * SDevice->m_EstResponseTimeAccThreads[activeThreads] 00749 + ETParameter * sample; 00750 } 00751 00752 //updates Deviation of estimated service time according number of threads 00753 SDevice->m_DevResponseTimeAccThreads[activeThreads] = 00754 ( 1 - ETParameter ) * SDevice->m_DevResponseTimeAccThreads[activeThreads] + 00755 ETParameter * fabs( sample - SDevice->m_EstResponseTimeAccThreads[activeThreads] ); 00756 00757 SDevice->m_NRespTSamplesAccThreads[activeThreads]++; 00758 00759 //updates estimated service time of disk 00760 if( SDevice->m_EstResponseTime == 0) 00761 { 00762 SDevice->m_EstResponseTime = sample; 00763 } 00764 else 00765 { 00766 SDevice->m_EstResponseTime = ( 1 - ETParameter ) * SDevice->m_EstResponseTime + 00767 ETParameter * sample; 00768 } 00769 00770 //updates Dev of estimated service time of disk 00771 SDevice->m_DevResponseTime = ( 1 - ETParameter ) * SDevice->m_DevResponseTime + 00772 ETParameter * fabs( sample - SDevice->m_EstResponseTime ); 00773 00774 SDevice->m_log<< "Thread "<< ThreadNumber << "("<< pthread_self() << ") saiu " <<endl; 00775 00776 #ifdef RIO_DEBUG2 00777 RioErr << " ----Desbloqueando a thread principal do disco----" << endl; 00778 #endif 00779 pthread_cond_broadcast(&SDevice->m_Condition); 00780 pthread_mutex_unlock(&SDevice->m_Mutex); 00781 00782 // Prepare status message to be sent to router if required 00783 StrMsg* Slot = NULL; 00784 if(( FlagStatus ) || ( stat < 0 )) 00785 { 00786 Slot = MsgManager.New(); 00787 if( stat < 0 ) 00788 { 00789 RioErr << "Storage: IO Failed ("; 00790 switch( stat ) 00791 { 00792 case VSI_ERROR_DEVICE_POSITION: 00793 RioErr <<"VSI_ERROR_DEVICE_POSITION) "; 00794 break; 00795 case VSI_ERROR_DEVICE_SEEK: 00796 RioErr <<"VSI_ERROR_DEVICE_SEEK) "; 00797 break; 00798 case VSI_ERROR_DEVICE_READ: 00799 RioErr <<"VSI_ERROR_DEVICE_READ) "; 00800 } 00801 RioErr << "RouterId " << Request->RouterId << endl; 00802 stat = ERROR_RSS_IO_FAILED; 00803 } 00804 if(( Command == MSG_RSS_WRITE ) || ( Command == MSG_RSS_FLUSH )) 00805 { 00806 Slot->Msg.Storage.Status.Type = MSG_RSS_WRITECOMPLETE; 00807 } 00808 else 00809 { 00810 Slot->Msg.Storage.Status.Type = MSG_RSS_READCOMPLETE; 00811 } 00812 Slot->Msg.Storage.Status.Size = SizeMsgRSSstatus; 00813 Slot->Msg.Storage.Status.RouterId = Request->RouterId; 00814 Slot->Msg.Storage.Status.StorageId = Request->Id; 00815 Slot->Msg.Storage.Status.Error = stat; 00816 Slot->Msg.Storage.Status.ActiveThreads = activeThreads; 00817 } 00818 00819 // I/O done. should send request to client, wait for router command 00820 // (i.e. do nothing) or free request. 00821 switch( Command ) 00822 { 00823 case MSG_RSS_WRITE: 00824 // If write, we are done. Release request 00825 RequestManager.Free(Request); 00826 break; 00827 case MSG_RSS_FLUSH: 00828 // If flush, we are done. Release request 00829 RequestManager.Free(Request); 00830 break; 00831 case MSG_RSS_READ: 00832 // if read, do nothing now. Send data to client after status 00833 // message is sent to router 00834 break; 00835 case MSG_RSS_FETCH: 00836 // For fetch need to update Request status before sending message 00837 // to router to avoid race condition if the router reply arrives 00838 // before setting the state 00839 // Fetch. Keep request around 00840 //RioErr << "Sending status (FETCH COMPLETE) to router " << endl; 00841 Request->Status = RequestStatusWaitSendCommand; 00842 break; 00843 default: 00844 RioErr << "Invalid Command code on Request: " << Command 00845 << endl; 00846 RioErr << "Unexpected error on storage device thread. " 00847 << "Aborting thread" << endl; 00848 return 0; 00849 } 00850 00851 // Send message to router if required 00852 if(( FlagStatus) || (stat < 0) ) 00853 { 00854 #ifdef RIO_DEBUG2 00855 RioErr <<"Sending status (sotoragedevice.blockthread) to router." 00856 << endl; 00857 #endif 00858 00859 Router.Send(Slot); 00860 }; 00861 00862 // Wait to send read to client just after status is sent to 00863 // router. This avoids router receiving status out of order 00864 // (assuming network guarantees ordered delivery) 00865 if( Command == MSG_RSS_READ ) 00866 { 00867 // Successfull Read. Send data to client 00868 if( stat >= 0 ) 00869 { 00870 Client.Send( Request ); 00871 } 00872 } 00873 } //fim do while (1) 00874 00875 Device.Close(); 00876 pthread_exit( NULL ); 00877 }
void CStorageDevice::GetDiskServiceTime | ( | double | averageservicetime[301] | ) |
Definition at line 892 of file storagedevice.cpp.
00893 { 00894 for( int aux = 0; aux < 301; aux++ ) 00895 { 00896 averageservicetime[aux] = m_EstServiceTimeAccThreads[aux]; 00897 //averageservicetime[aux] = m_AvgServiceTimeAccThreads[aux]; 00898 00899 } 00900 m_NumberOfSaveMeasures++; 00901 00902 00903 if( m_NumberOfSaveMeasures % 5 == 0 && m_CollectMeasures ) 00904 { 00905 char filename[350]; 00906 FILE *handle; 00907 sprintf( filename, "%s.%d", logname, m_NumberOfSaveMeasures ); 00908 00909 if( ( handle = fopen( filename, "w")) == NULL ) 00910 { 00911 if( m_log.is_open() ) 00912 m_log << " Could not create logtimes!" << strerror ( errno ) 00913 << endl; 00914 } 00915 else 00916 { 00917 fprintf( handle, "# EstimatedServiceTime of Disk %10.10f msec\n", m_EstServiceTime ); 00918 fprintf( handle, "# DeviationServiceTime %f\n", m_DevServiceTime ); 00919 fprintf( handle, "\n# AverageServiceTime of Disk %10.10f msec\n", m_AvgServiceTime_Model ); 00920 fprintf( handle, "# VarianceServiceTime %f \n",m_VarServiceTime_Model ); 00921 fprintf( handle, "# Number of Samples %lld \n",m_NSamples ); 00922 fprintf( handle, "\n# EstimatedResponseTime of Disk %10.10f msec\n", m_EstResponseTime ); 00923 fprintf( handle, "# DeviationResponseTime %f\n", m_DevResponseTime ); 00924 fprintf( handle, "\n# Measures in msec.\n"); 00925 fprintf( handle, "#\tEstServT\tDevServT\tAvgServT\tVarServT\tSamples\t\tEstRespT\tDevRespT\tNRespTSamples\n"); 00926 for( int aux = 1; aux < 301; aux++ ) 00927 { 00928 fprintf( handle, "%3d\t%6.6f\t%6.6f\t%6.6f\t%6.6f\t%lld\t\t%6.6f\t%6.6f\t%lld\n", 00929 aux, 00930 m_EstServiceTimeAccThreads[aux], 00931 m_DevServiceTimeAccThreads[aux], 00932 m_AvgServiceTimeAccThreads[aux], 00933 m_VarServiceTimeAccThreads[aux], 00934 m_NSamplesAccThreads[aux], 00935 m_EstResponseTimeAccThreads[aux], 00936 m_DevResponseTimeAccThreads[aux], 00937 m_NRespTSamplesAccThreads[aux]); 00938 } 00939 fclose( handle ); 00940 } 00941 } 00942 return; 00943 }
double CStorageDevice::GetEstimatedTime | ( | ) |
Definition at line 946 of file storagedevice.cpp.
00947 { 00948 return m_EstServiceTime; 00949 }
int CStorageDevice::Initialize | ( | char * | DeviceName, | |
char * | ip, | |||
int | nDiskThreads, | |||
double | EstimatedParameter, | |||
bool | CollectMeasures, | |||
char * | LogsDirectory | |||
) |
Definition at line 160 of file storagedevice.cpp.
00166 { 00167 char device_name[200] = "" , log[ MaxPathSize ] = ""; // 200 00168 00169 // changes '/' by '_' 00170 int i = 0; 00171 while( DeviceName[i] != '\0' ) 00172 { 00173 if( DeviceName[i] == '/' ) 00174 device_name[i] = '_'; 00175 else 00176 device_name[i] = DeviceName[i]; 00177 i++; 00178 if( i == 190 ) break; 00179 } 00180 00181 RioErr << "[StorageDevice] CollectMeasures: \t" << CollectMeasures 00182 << endl; 00183 if( CollectMeasures ) 00184 { 00185 // Alteracao para armazenar os logs em um diretorio diferente daquele com 00186 // o codigo do storage server. 00187 if( LogsDirectory == NULL ) 00188 // Arquivo armazenado no diretorio com o codigo do storage 00189 // server 00190 sprintf( log, "RIOdisk.%s.%s.log", ip, device_name ); 00191 else if( LogsDirectory[ strlen( LogsDirectory ) - 1 ] != '/' ) 00192 // Devemos colocar a "/" entre o nome do arquivo e do diretorio. 00193 sprintf( log, "%s/RIOdisk.%s.%s.log", LogsDirectory, ip, device_name ); 00194 else 00195 // Nao devemos colocar a "/" entre o nome do arquivo e do diretorio. 00196 sprintf( log, "%sRIOdisk.%s.%s.log", LogsDirectory, ip, device_name ); 00197 logopen = 1; 00198 } 00199 00200 if( m_Status == DeviceActive ) 00201 { 00202 return ERROR_SS_PROGRAM_ERROR; 00203 } 00204 00205 m_Status = DeviceActive; 00206 00207 int stat; 00208 00209 stat = m_Device.Open( DeviceName ); 00210 if(stat < 0) 00211 { 00212 Stop(); 00213 return ERROR_SS_INVALID_DEVICE; 00214 } 00215 00216 // Save device name for message trace purposes 00217 m_Name = DeviceName; 00218 m_MaxActiveDiskThreads = nDiskThreads; 00219 00220 if( CollectMeasures ) 00221 { 00222 m_log.open( log ); 00223 m_log << "Device Name : " << m_Name <<endl; 00224 m_log << "Max Active Threads : " << m_MaxActiveDiskThreads <<endl; 00225 } 00226 00227 // Alteracao para armazenar os logs em um diretorio diferente daquele com 00228 // o codigo do storage server. 00229 if( LogsDirectory == NULL ) 00230 // Arquivo armazenado no diretorio com o codigo do storage server 00231 sprintf( logname, "RIOdisk.%s.%s.log.times", ip, device_name ); 00232 else if( LogsDirectory[ strlen( LogsDirectory ) - 1 ] != '/' ) 00233 // Devemos colocar a "/" entre o nome do arquivo e do diretorio. 00234 sprintf( logname, "%s/RIOdisk.%s.%s.log.times", LogsDirectory, ip, 00235 device_name ); 00236 else 00237 // Nao devemos colocar a "/" entre o nome do arquivo e do diretorio. 00238 sprintf( logname, "%sRIOdisk.%s.%s.log.times", LogsDirectory, ip, 00239 device_name ); 00240 00241 stat = m_Thread.Create(Thread,(void*) this); 00242 if( stat < 0 ) 00243 { 00244 Stop(); 00245 return ERROR_SS_THREAD_CREATE; 00246 } 00247 00248 m_CollectMeasures = CollectMeasures; 00249 m_EstimatedParameter = EstimatedParameter; 00250 00251 if( CollectMeasures ) 00252 { 00253 m_log << "CollectMeasures \t" << m_CollectMeasures <<endl; 00254 m_log << "EstimatedTimeParameter \t" << m_EstimatedParameter <<endl; 00255 } 00256 00257 for( int i = 0; i < m_MaxActiveDiskThreads; i++ ) 00258 { 00259 pthread_t thread_id; 00260 pthread_attr_t attr; 00261 ThreadParameter *Param = new ThreadParameter; 00262 Param->DeviceName = m_Name; 00263 Param->DeviceSize = m_Device.Size(); 00264 Param->Device = this; 00265 Param->ThreadNumber = i; 00266 00267 pthread_attr_init( &attr ); 00268 pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); 00269 pthread_attr_setstacksize( &attr, 2*PTHREAD_STACK_MIN ); 00270 if((pthread_create( &thread_id, &attr, &BlockThread,(void*) Param )) != 0 ) 00271 { 00272 RioErr << "Storage: Could not start BlockThread " << i << "(" 00273 << strerror(errno) << ")!"<<endl; 00274 00275 return ERROR_SS_THREAD_CREATE; 00276 } 00277 } 00278 LoadMeasures( logname ); 00279 return 0; 00280 }
int CStorageDevice::Initialize | ( | char * | DeviceName | ) |
Definition at line 124 of file storagedevice.cpp.
00125 { 00126 if( m_Status == DeviceActive ) 00127 { 00128 return ERROR_SS_PROGRAM_ERROR; 00129 } 00130 00131 m_Status = DeviceActive; 00132 00133 int stat; 00134 stat = m_Device.Open( DeviceName ); 00135 if(stat < 0) 00136 { 00137 Stop(); 00138 return ERROR_SS_INVALID_DEVICE; 00139 } 00140 00141 stat = m_Thread.Create(Thread,(void*) this); 00142 if( stat < 0 ) 00143 { 00144 Stop(); 00145 return ERROR_SS_THREAD_CREATE; 00146 } 00147 00148 // Save device name for message trace purposes 00149 m_Name = DeviceName; 00150 00151 return 0; 00152 }
void CStorageDevice::LoadMeasures | ( | char * | filename | ) |
Definition at line 951 of file storagedevice.cpp.
00952 { 00953 FILE *handle; 00954 int i; 00955 00956 if( ( handle = fopen( filename, "r")) == NULL ) 00957 { 00958 if( m_log.is_open() ) 00959 m_log << " Could not read logtimes(" << filename << ")!" 00960 << strerror ( errno )<< endl; 00961 } 00962 else 00963 { 00964 if( fscanf( handle, "# EstimatedServiceTime of Disk %lf msec\n", 00965 &m_EstServiceTime ) != 1 ) 00966 { 00967 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 00968 << "ler o arquivo " << filename << endl; 00969 } 00970 if( fscanf( handle, "# DeviationServiceTime %lf\n", 00971 &m_DevServiceTime ) != 1 ) 00972 { 00973 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 00974 << "ler o arquivo " << filename << endl; 00975 } 00976 if( fscanf( handle, "\n# AverageServiceTime of Disk %lf msec\n", 00977 &m_AvgServiceTime_Model ) != 1 ) 00978 { 00979 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 00980 << "ler o arquivo " << filename << endl; 00981 } 00982 if( fscanf( handle, "# VarianceServiceTime %lf \n", 00983 &m_VarServiceTime_Model ) != 1 ) 00984 { 00985 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 00986 << "ler o arquivo " << filename << endl; 00987 } 00988 if( fscanf( handle, "# Number of Samples %lld \n", 00989 &m_NSamples ) != 1 ) 00990 { 00991 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 00992 << "ler o arquivo " << filename << endl; 00993 } 00994 if( fscanf( handle, "\n# EstimatedResponseTime of Disk %lf msec\n", 00995 &m_EstResponseTime ) != 1 ) 00996 { 00997 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 00998 << "ler o arquivo " << filename << endl; 00999 } 01000 if( fscanf( handle, "# DeviationResponseTime %lf\n", 01001 &m_DevResponseTime ) != 1 ) 01002 { 01003 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 01004 << "ler o arquivo " << filename << endl; 01005 } 01006 if( fscanf( handle, "\n# Measures in msec.\n") != 0 ) 01007 { 01008 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 01009 << "ler o arquivo " << filename << endl; 01010 } 01011 if( fscanf( handle, "#\tEstServT\tDevServT\tAvgServT\tVarServT\tSamples\t\tEstRespT\tDevRespT\tNRespTSamples\n") 01012 != 0 ) 01013 { 01014 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 01015 << "ler o arquivo " << filename << endl; 01016 } 01017 for( int aux = 1; (aux < 301) && (!feof( handle )); aux++ ) 01018 { 01019 if( fscanf( handle, "%3d\t%lf\t%lf\t%lf\t%lf\t%lld\t\t%lf\t%lf\t%lld\n", 01020 &i, 01021 &m_EstServiceTimeAccThreads[aux], 01022 &m_DevServiceTimeAccThreads[aux], 01023 &m_AvgServiceTimeAccThreads[aux], 01024 &m_VarServiceTimeAccThreads[aux], 01025 &m_NSamplesAccThreads[aux], 01026 &m_EstResponseTimeAccThreads[aux], 01027 &m_DevResponseTimeAccThreads[aux], 01028 &m_NRespTSamplesAccThreads[aux] ) != 9 ) 01029 { 01030 RioErr << "CStorageDevice::LoadMeasures: aviso - erro ao " 01031 << "ler o arquivo " << filename << endl; 01032 } 01033 01034 } 01035 fclose( handle ); 01036 } 01037 01038 return; 01039 }
void CStorageDevice::ProcessRequest | ( | StrRequest * | Request | ) |
Definition at line 301 of file storagedevice.cpp.
00302 { 00303 #ifdef RIO_DEBUG2 00304 struct in_addr client_IP; 00305 struct sockaddr_in client_PORT; 00306 if( Request != NULL ) 00307 { 00308 client_IP.s_addr = Request->IPaddress; 00309 client_PORT.sin_port = Request->Port; 00310 RioErr << " ----A requisicao abaixo sera colocada da fila do disco----" 00311 << endl; 00312 RioErr << " ClientId: " << Request->ClientId 00313 << endl; 00314 RioErr << " IPaddress " << inet_ntoa( client_IP ) 00315 << endl; 00316 RioErr << " Port " << htons(client_PORT.sin_port) 00317 << endl; 00318 RioErr << " Request->Disk " << Request->Disk << endl; 00319 } 00320 else 00321 RioErr << " ----A requisicao sera nula colocada da fila do disco----" 00322 << endl; 00323 #endif 00324 00325 m_Queue.Put(Request); 00326 00327 #ifdef RIO_DEBUG2 00328 if( Request != NULL ) 00329 { 00330 client_IP.s_addr = Request->IPaddress; 00331 client_PORT.sin_port = Request->Port; 00332 RioErr << " ----A requisicao abaixo foi colocada com sucesso na fila " 00333 << "do disco----" << endl; 00334 RioErr << " ClientId: " << Request->ClientId 00335 << endl; 00336 RioErr << " IPaddress " << inet_ntoa( client_IP ) 00337 << endl; 00338 RioErr << " Port " << htons(client_PORT.sin_port) 00339 << endl; 00340 RioErr << " Request->Disk " << Request->Disk << endl; 00341 } 00342 else 00343 RioErr << " ----A requisicao nula foi colocada da fila do disco----" 00344 << endl; 00345 #endif 00346 }
int CStorageDevice::SectorSize | ( | ) |
Definition at line 880 of file storagedevice.cpp.
00881 { 00882 return m_Device.SectorSize(); 00883 }
u64 CStorageDevice::Size | ( | ) |
Definition at line 886 of file storagedevice.cpp.
void CStorageDevice::Stop | ( | void | ) |
Definition at line 284 of file storagedevice.cpp.
void * CStorageDevice::Thread | ( | void * | Param | ) | [static, private] |
Definition at line 351 of file storagedevice.cpp.
00352 { 00353 int i; 00354 00355 // Get this class pointer 00356 CStorageDevice* Device = (CStorageDevice*) Param; 00357 00358 // Get members of this class 00359 CRequestQueue &m_Queue = Device->m_Queue; 00360 int &m_Status = Device->m_Status; 00361 00362 if( Device->m_log.is_open() ) 00363 { 00364 ofstream &m_log = Device->m_log; 00365 00366 m_log << "Device Thread running ..." << endl; 00367 } 00368 00369 RioErr << "DEVICETHREADID " << syscall( SYS_gettid ) << endl; 00370 00371 // Loop forever 00372 while( 1 ) 00373 { 00374 pthread_mutex_lock(&Device->m_Mutex); 00375 if( Device->m_ActiveDiskThreads < Device->m_MaxActiveDiskThreads ) 00376 { 00377 00378 pthread_mutex_unlock(&Device->m_Mutex); 00379 00380 #ifdef RIO_DEBUG2 00381 // Variavel booleana usada para indicar que o pedido foi atribuido 00382 // a alguma thread de bloco. 00383 bool isServed = false; 00384 #endif 00385 00386 // Wait and get next request 00387 00388 #ifdef RIO_DEBUG2 00389 RioErr << " ----Tentando obter e esperando, se necessario, uma " 00390 << "requisicao da fila----" << endl; 00391 #endif 00392 StrRequest* Request = m_Queue.Get(); 00393 00394 #ifdef RIO_DEBUG2 00395 struct in_addr client_IP; 00396 struct sockaddr_in client_PORT; 00397 if( Request != NULL ) 00398 { 00399 client_IP.s_addr = Request->IPaddress; 00400 client_PORT.sin_port = Request->Port; 00401 RioErr << " ----A requisicao abaixo foi removida da fila----" 00402 << endl; 00403 RioErr << " ClientId: " << Request->ClientId 00404 << endl; 00405 RioErr << " IPaddress " << inet_ntoa( client_IP ) 00406 << endl; 00407 RioErr << " Port " << htons(client_PORT.sin_port) 00408 << endl; 00409 RioErr << " Request->Disk " << Request->Disk << endl; 00410 } 00411 else 00412 RioErr << " ----A requisicao nula foi removida da fila----" 00413 << endl; 00414 #endif 00415 00416 if(( m_Status != DeviceActive ) || ( Request == 0 )) 00417 { 00418 if(m_Status != DeviceActive) 00419 RioErr << "Storage: loop: Interface is not active!" << endl; 00420 else 00421 RioErr << "Storage: loop: Request = 0!" << endl; 00422 return( 0 ); 00423 } 00424 00425 pthread_mutex_lock(&Device->m_Mutex); 00426 //increments number of active threads 00427 Device->m_ActiveDiskThreads++; 00428 if( Device->m_ActiveDiskThreads == 1 ) 00429 Device->m_StartingServiceInterval = true; 00430 00431 //Device->m_log << "vai pegar mutex "<<endl ; 00432 pthread_mutex_unlock( &Device->m_Mutex ); 00433 00434 for( i = 0; i < Device->m_MaxActiveDiskThreads; i++ ) 00435 { 00436 if( Device->m_RequestsForThreads[ i ].Available == true ) 00437 { 00438 pthread_mutex_lock( &Device->m_MutexThreads[i] ); 00439 Device->m_RequestsForThreads[ i ].Request = Request; 00440 Device->m_RequestsForThreads[ i ].Available = false; 00441 Device->m_RequestsForThreads[ i ].ActiveThreads = 00442 Device->m_ActiveDiskThreads; 00443 00444 #ifdef RIO_DEBUG2 00445 RioErr << " ----Liberando thread no storagedevice para " 00446 << " a requisicao abaixo----" << endl; 00447 RioErr << " ClientId: " << Request->ClientId 00448 << endl; 00449 RioErr << " IPaddress " << inet_ntoa( client_IP ) 00450 << endl; 00451 RioErr << " Port " << htons(client_PORT.sin_port) 00452 << endl; 00453 RioErr << " Request->Disk " << Request->Disk << endl; 00454 00455 isServed = true; 00456 #endif 00457 00458 pthread_cond_broadcast(&Device->m_ConditionNew[i]); 00459 pthread_mutex_unlock( &Device->m_MutexThreads[i] ); 00460 break; 00461 } 00462 } 00463 #ifdef RIO_DEBUG2 00464 if( !isServed ) 00465 { 00466 RioErr << " ----Aviso: a requisicao abaixo foi ignorada, pois " 00467 << "incorretamente achamos que existiam de disco " 00468 << "disponiveis! threads ativas = " 00469 << Device->m_ActiveDiskThreads << " e threads totais = " 00470 << Device->m_MaxActiveDiskThreads << " ----" << endl; 00471 RioErr << " ClientId: " << Request->ClientId << endl; 00472 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl; 00473 RioErr << " Port " << htons( client_PORT.sin_port ) << endl; 00474 RioErr << " Request->Disk " << Request->Disk << endl; 00475 } 00476 #endif 00477 } 00478 else // else do if( Device->m_ActiveDiskThreads < Device->m_MaxActiv... 00479 { 00480 RioErr << "Storage: Waiting Available Thread!" << endl; 00481 pthread_cond_wait( &Device->m_Condition,&Device->m_Mutex ); 00482 pthread_mutex_unlock( &Device->m_Mutex ); 00483 } 00484 } // fim do while( 1 )*/ 00485 }
char CStorageDevice::logname[MaxPathSize] |
Definition at line 87 of file storagedevice.h.
Definition at line 88 of file storagedevice.h.
Definition at line 94 of file storagedevice.h.
Definition at line 108 of file storagedevice.h.
double CStorageDevice::m_AvgServiceTimeAccThreads[301] |
Definition at line 111 of file storagedevice.h.
Definition at line 116 of file storagedevice.h.
pthread_cond_t CStorageDevice::m_Condition |
Definition at line 91 of file storagedevice.h.
pthread_cond_t CStorageDevice::m_ConditionExitThread |
Definition at line 93 of file storagedevice.h.
pthread_cond_t CStorageDevice::m_ConditionNew[300] |
Definition at line 92 of file storagedevice.h.
CvsiDevice CStorageDevice::m_Device [private] |
Definition at line 124 of file storagedevice.h.
Definition at line 103 of file storagedevice.h.
double CStorageDevice::m_DevResponseTimeAccThreads[301] |
Definition at line 105 of file storagedevice.h.
Definition at line 98 of file storagedevice.h.
double CStorageDevice::m_DevServiceTimeAccThreads[301] |
Definition at line 100 of file storagedevice.h.
Definition at line 115 of file storagedevice.h.
Definition at line 102 of file storagedevice.h.
double CStorageDevice::m_EstResponseTimeAccThreads[301] |
Definition at line 104 of file storagedevice.h.
Definition at line 97 of file storagedevice.h.
double CStorageDevice::m_EstServiceTimeAccThreads[301] |
Definition at line 99 of file storagedevice.h.
struct timeval CStorageDevice::m_LastServiceTime |
Definition at line 117 of file storagedevice.h.
ofstream CStorageDevice::m_log |
Definition at line 86 of file storagedevice.h.
Definition at line 89 of file storagedevice.h.
pthread_mutex_t CStorageDevice::m_Mutex |
Definition at line 90 of file storagedevice.h.
pthread_mutex_t CStorageDevice::m_MutexThreads[300] |
Definition at line 90 of file storagedevice.h.
char* CStorageDevice::m_Name [private] |
Definition at line 127 of file storagedevice.h.
unsigned long long int CStorageDevice::m_NRespTSamplesAccThreads[301] |
Definition at line 106 of file storagedevice.h.
unsigned long long int CStorageDevice::m_NSamples |
Definition at line 110 of file storagedevice.h.
unsigned long long int CStorageDevice::m_NSamplesAccThreads[301] |
Definition at line 113 of file storagedevice.h.
unsigned int CStorageDevice::m_NumberOfSaveMeasures [private] |
Definition at line 128 of file storagedevice.h.
CRequestQueue CStorageDevice::m_Queue [private] |
Definition at line 125 of file storagedevice.h.
Definition at line 95 of file storagedevice.h.
Definition at line 118 of file storagedevice.h.
int CStorageDevice::m_Status [private] |
Definition at line 126 of file storagedevice.h.
CvsiThread CStorageDevice::m_Thread [private] |
Definition at line 123 of file storagedevice.h.
Definition at line 109 of file storagedevice.h.
double CStorageDevice::m_VarServiceTimeAccThreads[301] |
Definition at line 112 of file storagedevice.h.