#include <DiskMgr.h>
Public Member Functions | |
void | SendStorageNode (EventStorageRequest *event, u16 DiskSeq) |
void | GetDiskServiceTime (EventStorageRequest *event, u16 DiskSeq) |
Static Public Attributes | |
static const int | sn_maxdisks = MAXSTORAGEDISKSARRAY |
Private Member Functions | |
SNode (DiskMgr *mgr, char *hostname) | |
~SNode () | |
int | txthread () |
int | rxthread () |
void | rxthread_nodeinfo (EventStorageReply *event) |
void | rxthread_diskinfo (EventStorageReply *event) |
void | rxthread_confirmation (EventStorageReply *event) |
Nova funcao para processar o recebimento da resposta a mensagem de inicializacao enviada na funcao DiskMgr::BuildNode. | |
void | rxthread_diskservicetimeinfo (EventStorageReply *event) |
Static Private Member Functions | |
static void * | txthreadep (void *parm) |
static void * | rxthreadep (void *parm) |
Private Attributes | |
SNode * | sn_link |
DiskMgr * | sn_mgr |
char * | sn_hostname |
vsiIPaddress | sn_IPaddress |
CvsiTCPsocket | sn_socket |
EventQueue | sn_sendqueue |
pthread_t | sn_txthread |
pthread_t | sn_rxthread |
int | sn_termflag |
int | sn_disks |
int | sn_diskidorg |
long long int | sn_disksize [sn_maxdisks] |
bool | sn_restart |
bool | sn_isenabled |
bool | sn_sendevent |
pthread_mutex_t | sn_mutex |
pthread_cond_t | sn_waitstorage |
bool | sn_readfreeblocksfile |
Friends | |
class | DiskMgr |
Definition at line 222 of file DiskMgr.h.
SNode::SNode | ( | DiskMgr * | mgr, | |
char * | hostname | |||
) | [private] |
Definition at line 1643 of file DiskMgr.cpp.
01644 { 01645 int i; 01646 01647 sn_mgr = mgr; 01648 pthread_mutex_lock( &mgr->m_mutex ); 01649 i = strlen( hostname ) + 1; 01650 sn_hostname = new char[i]; 01651 if( sn_hostname ) 01652 { 01653 memcpy( sn_hostname, hostname, i ); 01654 } 01655 01656 sn_txthread = 0; 01657 sn_rxthread = 0; 01658 sn_termflag = false; 01659 01660 sn_link = NULL; 01661 memset( (char *)&sn_IPaddress, 0, sizeof( vsiIPaddress ) ); 01662 // sn_socket does not need to be initialized (done by it's constructorv) 01663 // sn_sendqueue does not need to be initialized (done by it's constructorv) 01664 sn_diskidorg = 0; 01665 01666 SNode *nodelast = VIRTORG( mgr->m_nodelist, SNode, sn_link ); 01667 01668 while( nodelast->sn_link != 0 ) 01669 nodelast = nodelast->sn_link; 01670 01671 nodelast->sn_link = this; 01672 01673 sn_disks = 0; 01674 for( i = 0; i < sn_maxdisks; i++ ) 01675 { 01676 sn_disksize[i] = 0; 01677 } 01678 01679 // Inicializa as novas variaveis criadas pela implementacao que permite a 01680 // recuperacao dos servidores de armazenamento. 01681 sn_restart = false; 01682 sn_isenabled = false; 01683 sn_sendevent = false; 01684 sn_readfreeblocksfile = false; 01685 01686 pthread_mutex_unlock( &mgr->m_mutex ); 01687 01688 // Inicializa o mutex e a variavel de condicao, usados pela nova 01689 // implementacao que permite a recuperacao dos storages. 01690 pthread_mutex_init( &sn_mutex, NULL ); 01691 pthread_cond_init( &sn_waitstorage, NULL ); 01692 }
SNode::~SNode | ( | ) | [private] |
Definition at line 1694 of file DiskMgr.cpp.
01695 { 01696 Event *event; 01697 // Cancela as threads de leitura e de escrita do servidor de armazenameto 01698 // (storage) associado a este no. 01699 sn_termflag = true; 01700 // Fecha o socket para leitura e escrita (com isso, a thread rxthread de 01701 // recepcao terminara caso esteja esperando na funcao receive, e a thread 01702 // txthread de transmissao nao enviara mais pacotes ao servidor de 01703 // armazenamento). 01704 sn_socket.Close(); 01705 // Coloca um evento especial EventTypeFinalizeThread na fila de eventos 01706 // para informar a thread que ela deve terminar a sua execucao. 01707 if( sn_txthread != 0 ) 01708 { 01709 event = EventManager.New( EventTypeFinalizeThread ); 01710 sn_sendqueue.Put( event ); 01711 // Desbloquea a thread txthread caso ela esteja esperando pelo reinicio do 01712 // servidor de armazenamento associado a esta classe. 01713 pthread_cond_signal( &sn_waitstorage ); 01714 // Espera que a thread de envio termine. 01715 pthread_join( sn_txthread, NULL ); 01716 } 01717 if( sn_rxthread != 0 ) // Espera que a thread de recebimento termine. 01718 pthread_join( sn_rxthread, NULL ); 01719 01720 // ### needs to check that there aren't any refs from a Disk to this SNode 01721 pthread_mutex_lock( &sn_mgr->m_mutex ); 01722 SNode *node = VIRTORG( sn_mgr->m_nodelist, SNode, sn_link ); 01723 01724 while( node->sn_link != 0 ) 01725 { 01726 if( node->sn_link == this ) 01727 { 01728 node->sn_link = this->sn_link; 01729 break; 01730 } 01731 // added to fix bug -------------------------------------- 01732 // it was not removing snode 01733 node = node->sn_link; 01734 // -------------------------------------------------------------------- 01735 } 01736 01737 pthread_mutex_unlock( &sn_mgr->m_mutex ); 01738 01739 delete [] sn_hostname; 01740 01741 // Destroi o mutex e a variavel de condicao, usados pela nova implementacao 01742 // que permite a recuperacao dos storages. 01743 pthread_mutex_destroy( &sn_mutex ); 01744 pthread_cond_destroy( &sn_waitstorage ); 01745 }
void SNode::GetDiskServiceTime | ( | EventStorageRequest * | event, | |
u16 | DiskSeq | |||
) |
Definition at line 2559 of file DiskMgr.cpp.
02560 { 02561 if( event->Header.Type != EventTypeStorageRequest ) 02562 { 02563 if( sn_mgr->m_log.is_open() ) 02564 sn_mgr->m_log << "SNode::GetDiskServiceTime ERROR: " 02565 << "Event has invalid type:" 02566 << (int) (event->Header.Type) << endl; 02567 02568 EventManager.Free((Event*)event); 02569 return; 02570 } 02571 u16 type = event->StorageRequest.Header.Type; 02572 if( type == MSG_RSS_DISKSERVICETIMEINFO_REQ ) 02573 { 02574 event->StorageRequest.DiskServiceTimeInfo.Disk = DiskSeq; 02575 } 02576 else 02577 { 02578 if( sn_mgr->m_log.is_open() ) 02579 sn_mgr->m_log << "SNode::GetDiskServiceTime ERROR: Storage request" 02580 << " has invalid type: " 02581 << event->StorageRequest.Header.Type << endl; 02582 02583 EventManager.Free((Event*)event); 02584 return; 02585 } 02586 02587 #ifdef RIO_DEBUG2 02588 if( sn_mgr->m_log.is_open() ) 02589 sn_mgr->m_log << "SNode::GetDiskServiceTime getting service time for " 02590 << "disk " 02591 << event->StorageRequest.DiskServiceTimeInfo.Disk << " : " 02592 << event->StorageRequest.DiskServiceTimeInfo.DiskId 02593 << endl; 02594 #endif 02595 02596 sn_sendqueue.Put((Event*) event); 02597 02598 return; 02599 }
int SNode::rxthread | ( | ) | [private] |
Definition at line 1756 of file DiskMgr.cpp.
01757 { 01758 int rc; 01759 EventStorageReply *event; 01760 //const int pfxsize = sizeof(event->StorageReply.Header.Type) + 01761 // sizeof(event->StorageReply.Header.Size); 01762 const unsigned long pfxsize = (unsigned long) &event->StorageReply.Header.Token - 01763 (unsigned long) &event->StorageReply.Header; 01764 bool isenabled; 01765 01766 RioErr << "RXTHREADID " << syscall( SYS_gettid ) << endl; 01767 01768 while( 1 ) 01769 { 01770 if( sn_termflag ) 01771 { 01772 RioErr << " DiskMgr rxthread finished. " << endl; 01773 pthread_exit( NULL ); 01774 } 01775 01776 // Obtem o valor de isenabled. 01777 // Obs: obter o mutex e necessario? ou podemos fazer como antes, 01778 // retirando as 3 linhas de codigo a seguir e usando o if comentado? 01779 // Obtem o mutex para o acesso exclusivo a sn_isenabled. 01780 pthread_mutex_lock( &sn_mutex ); 01781 // Obtem o valor atual de sn_isenabled. 01782 isenabled = sn_isenabled; 01783 // Libera o mutex para o acesso exclusivo a sn_isenabled. 01784 pthread_mutex_unlock( &sn_mutex ); 01785 01786 // Verifica se e necessario recriar a conexao com o servidor de 01787 // armazenamento, ou se podemos enviar os dados. 01788 if( isenabled ) 01789 { 01790 // A conexao esta habilitada, logo podemos receber novas mensagens. 01791 event = ( EventStorageReply* ) EventManager.New( 01792 EventTypeStorageReply ); 01793 rc = sn_socket.Receive( ( char * ) &event->StorageReply.Header.Type, 01794 pfxsize ); 01795 01796 if( sn_termflag ) 01797 { 01798 EventManager.Free( ( Event* ) event ); 01799 RioErr << " DiskMgr rxthread finished. " << endl; 01800 pthread_exit( NULL ); 01801 } 01802 01803 if( rc ) 01804 { 01805 unsigned int storageid; 01806 01807 EventManager.Free( ( Event* ) event ); 01808 01809 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks; 01810 01811 #ifdef RIO_DEBUG2 01812 RioErr << "SNode::rxthread receive retornou o erro " << errno 01813 << " (" << strerror( errno ) << ") para o storage na " 01814 << "maquina " << sn_hostname << endl; 01815 #endif 01816 01817 if( sn_mgr->m_log.is_open() ) 01818 sn_mgr->m_log << "DiskMgr.rxthread receive1 failed." 01819 << endl; 01820 01821 RioErr << " DiskMgr rxthread error. See log file. Storage " 01822 << "server " << sn_hostname << " is down" << endl; 01823 01824 // Define o estado do servidor de armazenamento como 01825 // temporariamente parado. 01826 // Obtem o mutex para o acesso exclusivo a sn_isenabled. 01827 pthread_mutex_lock( &sn_mutex ); 01828 // Define o estado do servidor de armazenamento como parado. 01829 sn_isenabled = false; 01830 // Libera o mutex para o acesso exclusivo a sn_isenabled. 01831 pthread_mutex_unlock( &sn_mutex ); 01832 // Gera um evento para informar ao router da queda do servidor 01833 // de armazenamento 01834 EventStorageDown* event; 01835 event = ( EventStorageDown * ) EventManager.New( 01836 EventTypeStorageDown ); 01837 01838 event->StorageId = storageid; 01839 event->EmptyStorageQueue = true; 01840 01841 #ifdef RIO_DEBUG2 01842 RioErr << "SNode::rxthread enviando a mensagem " 01843 << "EventStorageDown ao Router do servidor de " 01844 << "armazenamento com a ID " << event->StorageId << endl; 01845 #endif 01846 01847 sn_mgr->m_Router->Put( ( Event * ) event ); 01848 01849 } 01850 else 01851 { 01852 if( event->StorageReply.Header.Size > 01853 sizeof( event->StorageReply ) ) 01854 { 01855 if( sn_mgr->m_log.is_open() ) 01856 sn_mgr->m_log << "DiskMgr.rxthread msg too large " 01857 << event->StorageReply.Header.Size 01858 << endl; 01859 01860 RioErr << " DiskMgr rxthread error. See log file. " << endl; 01861 } 01862 else 01863 { 01864 rc = sn_socket.Receive( (char *) &event->StorageReply. 01865 Header.Token, 01866 event->StorageReply.Header.Size - 01867 pfxsize ); 01868 01869 if( sn_termflag ) 01870 { 01871 EventManager.Free( ( Event* ) event ); 01872 RioErr << " DiskMgr rxthread finished. " << endl; 01873 pthread_exit( NULL ); 01874 } 01875 01876 if( rc ) 01877 { 01878 unsigned int storageid; 01879 01880 EventManager.Free( ( Event* ) event ); 01881 01882 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks; 01883 01884 if( sn_mgr->m_log.is_open() ) 01885 sn_mgr->m_log << "DiskMgr.rxthread receive2 failed." 01886 << endl; 01887 01888 RioErr << " DiskMgr rxthread error. See log file. " 01889 << "Storage server " << sn_hostname 01890 << " is down" << endl; 01891 01892 // Obtem o mutex para o acesso exclusivo a sn_isenabled. 01893 pthread_mutex_lock( &sn_mutex ); 01894 // Define o estado do servidor de armazenamento como 01895 // parado. 01896 sn_isenabled = false; 01897 // Libera o mutex para o acesso exclusivo a 01898 // sn_isenabled. 01899 pthread_mutex_unlock( &sn_mutex ); 01900 01901 // Gera um evento para informar ao router da queda do 01902 // servidor de armazenamento 01903 EventStorageDown* event; 01904 event = ( EventStorageDown * ) EventManager.New( 01905 EventTypeStorageDown ); 01906 01907 event->StorageId = storageid; 01908 event->EmptyStorageQueue = true; 01909 01910 #ifdef RIO_DEBUG2 01911 RioErr << "SNode::rxthread enviando a mensagem " 01912 << "EventStorageDown ao Router do servidor de " 01913 << "armazenamento com a ID " << event->StorageId 01914 << endl; 01915 #endif 01916 01917 sn_mgr->m_Router->Put( ( Event * ) event ); 01918 } 01919 else 01920 { 01921 01922 // Esta impressao foi comentada porque estes logs ainda 01923 // nao sao necessarios. Se for necessario usar estes 01924 // logs, esta linha do codigo pode ser descomentada 01925 // (ja existe um script para processar estes logs). 01926 //#ifdef RIO_DEBUG_EMUL 01927 //struct in_addr ip; 01928 //ip.s_addr = sn_IPaddress.Host; 01929 //RioErr << "RECEIVESTORAGE " << ( int ) time( NULL ) 01930 // << " " << inet_ntoa( ip ) << " " 01931 // << event->StorageReply.Header.Size << endl; 01932 //#endif 01933 01934 if( sn_termflag ) 01935 { 01936 EventManager.Free(( Event* )event ); 01937 RioErr << " DiskMgr rxthread finished. " << endl; 01938 return 0; 01939 } 01940 01941 switch( event->StorageReply.Header.Type ) 01942 { 01943 case MSG_RSS_NODEINFO: 01944 rxthread_nodeinfo( event ); 01945 break; 01946 01947 case MSG_RSS_DISKINFO: 01948 rxthread_diskinfo( event ); 01949 break; 01950 01951 case MSG_RSS_DISKSERVICETIMEINFO: 01952 rxthread_diskservicetimeinfo( event ); 01953 break; 01954 01955 // Processa a nova mensagem indicando que a 01956 // inicializacao do servidor de armazenamento foi 01957 // bem sucedida. 01958 case MSG_RSS_CONFIRMATION: 01959 rxthread_confirmation( event ); 01960 break; 01961 01962 #ifdef RIO_DEGUG2 01963 // Processa as mensagens associadas ao gerenciamento 01964 // dos logs. 01965 // Obs: atualmente estas mensagens sao somente 01966 // usadas para depurar o codigo. 01967 case MSG_RSS_SEARCHLOGS_STATUS: 01968 rxthread_searchlogsstatus( event ); 01969 break; 01970 #endif 01971 // ------------------------------------------------- 01972 01973 default: 01974 sn_mgr->m_Router->Put( ( Event* )event ); 01975 break; 01976 } 01977 } 01978 } 01979 } 01980 } 01981 else 01982 { 01983 // A conexao caiu ou ainda nao foi inicializada, logo precisamos 01984 // restabelecer a conexao com o servidor de armazenamento. 01985 // Antes, porem, fechamos a conexao anterior com o servidor de 01986 // armazenamento, se ela estiver aberta. 01987 sn_socket.Close(); 01988 // Tenta reiniciar a conexao com o servidor de armazenamento. 01989 rc = sn_socket.Connect( &sn_IPaddress ); 01990 if( rc ) 01991 { 01992 // Se a conexao falhar, tentamos mais tarde 01993 RioErr << " connect failed for " << sn_hostname << ": " 01994 << GetErrorDescription( rc ) << ". Trying again!" 01995 << endl; 01996 // Dorme por algum tempo. 01997 sleep( sn_mgr->m_TimeBetweenAttempts ); 01998 } 01999 else 02000 { 02001 // Remove todos os eventos do servidor de armazenamento. 02002 Event *oldevent; 02003 do 02004 { 02005 oldevent = ( Event * ) sn_sendqueue.Remove(); 02006 if( oldevent != NULL ) 02007 EventManager.Free( oldevent ); 02008 } while( oldevent != NULL ); 02009 02010 RioErr << "SNode::rxthread conexao executada com sucesso para " 02011 << "o servidor de armazenamento " << sn_hostname 02012 << ". Enviando a mensagem MSG_RSS_INITIALIZATION_REQ " 02013 << "para reiniciar o servidor de armazenamento " 02014 << endl; 02015 // Obtem o mutex para o acesso exclusivo a sn_isenabled. 02016 pthread_mutex_lock( &sn_mutex ); 02017 // Define o estado do servidor de armazenamento como em 02018 // funcionamento. 02019 sn_isenabled = true; 02020 // Define que precisamos enviar o evento EventTypeStorageUp 02021 // ao router 02022 sn_sendevent = true; 02023 // Envia um sinal para acordar a thread que envia dados ao 02024 // servidor de armazenamento. 02025 pthread_cond_signal( &sn_waitstorage ); 02026 // Envia a mensagem de reinicio. 02027 EventStorageRequest *event; 02028 event = ( EventStorageRequest* ) EventManager.New( 02029 EventTypeStorageRequest ); 02030 event->StorageRequest.Header.Type = MSG_RSS_INITIALIZATION_REQ; 02031 event->StorageRequest.Header.Size = SizeMsgRSSInitializationReq; 02032 event->StorageRequest.Header.Token = RSS_TOKEN_ROUTER; 02033 MsgRSSInitializationReq* msg = & ( event->StorageRequest. 02034 Initialization ); 02035 msg->BlockSize = sn_mgr->m_BlockSize; 02036 sn_sendqueue.Put( ( Event* ) event ); 02037 // Libera o mutex para o acesso exclusivo a sn_isenabled. 02038 pthread_mutex_unlock( &sn_mutex ); 02039 02040 } 02041 } 02042 } 02043 }
void SNode::rxthread_confirmation | ( | EventStorageReply * | event | ) | [private] |
Nova funcao para processar o recebimento da resposta a mensagem de inicializacao enviada na funcao DiskMgr::BuildNode.
event | ponteiro para evento a ser processado, com a mensagem recebida. |
Definition at line 2048 of file DiskMgr.cpp.
02049 { 02050 EventStorageRequest *TxEvent; 02051 bool isenabled; 02052 02053 if( ( event->StorageReply.NodeInfo.Size != SizeMsgRSSConfirmation ) 02054 || ( event->StorageReply.NodeInfo.Token != RSS_TOKEN_STORAGE )) 02055 { 02056 RioErr << "event->StorageReply.NodeInfo.Size = " 02057 << event->StorageReply.NodeInfo.Size << endl; 02058 RioErr << "SizeMsgRSSConfirmation = " << SizeMsgRSSConfirmation 02059 << endl; 02060 RioErr << "event->StorageReply.NodeInfo.Token = " << hex 02061 << event->StorageReply.NodeInfo.Token << dec << endl; 02062 RioErr << "RSS_TOKEN_STORAGE = " << hex << RSS_TOKEN_STORAGE << dec 02063 << endl; 02064 if( sn_mgr->m_log.is_open() ) 02065 sn_mgr->m_log << "DiskMgr.rxthread invalid confirmation msg " 02066 << "ignored for " << sn_hostname << endl; 02067 02068 EventManager.Free(( Event* ) event ); 02069 return; 02070 } 02071 02072 EventManager.Free(( Event* ) event ); 02073 02074 // Obs: obter o mutex e necessario? ou podemos fazer como antes, 02075 // retirando as 3 linhas de codigo a seguir e usando o if comentado? 02076 // Obtem o mutex para o acesso exclusivo a sn_isenabled. 02077 pthread_mutex_lock( &sn_mutex ); 02078 // Define o estado do servidor de armazenamento como parado. 02079 isenabled = sn_isenabled; 02080 // Libera o mutex para o acesso exclusivo a sn_isenabled. 02081 pthread_mutex_unlock( &sn_mutex ); 02082 02083 // Envia uma mensagem ao roteador informando que o servidor de armazenamento 02084 // inicializou com sucesso. A mensagem somente devera ser enviada nesta 02085 // funcao se o servidor estiver reinicializando, isto e, o servidor 02086 // estava ativo antes de inicializarmos o servidor de gerenciamento. Em caso 02087 // contrario (quando o servidor de armazenamento inicializar depois do 02088 // servidor de gerenciamento), esta mensagem somente devera ser enviada apos 02089 // recebermos e inicializarmos todos os discos deste servidor. 02090 if( ( sn_sendevent ) && ( sn_restart ) ) 02091 { 02092 unsigned int storageid; 02093 02094 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks; 02095 02096 // O servidor de armazenamento reiniciou com sucesso. Precisamos entao 02097 // enviar o evento, ao Router, informando que este servidor esta 02098 // novamente ativo. 02099 EventStorageUp* event; 02100 // Depois de restabelecida a conexao com sucesso, enviamos um 02101 // outro evento indicando que o servidor de armazenamento voltou 02102 // a ficar disponivel. 02103 event = ( EventStorageUp * ) EventManager.New( EventTypeStorageUp ); 02104 event->StorageId = storageid; 02105 02106 #ifdef RIO_DEBUG2 02107 RioErr << "SNode::rxthread_confirmation enviando a mensagem " 02108 << "EventStorageUp ao Router do servidor de armazenamento com " 02109 << "a ID " << event->StorageId << endl; 02110 #endif 02111 02112 sn_mgr->m_Router->Put( ( Event * ) event ); 02113 02114 // Somente para evitar o envio de um outro evento. 02115 sn_sendevent = false; 02116 } 02117 02118 if( !sn_restart ) 02119 { 02120 // Se for a primeira vez que o servidor de armazenamento iniciou, envia 02121 // a mensagem originalmente enviada pela funcao DiskMgr::BuildNode: 02122 02123 // send node info request to this node to start things off... 02124 TxEvent = ( EventStorageRequest* ) EventManager.New( 02125 EventTypeStorageRequest ); 02126 TxEvent->StorageRequest.Header.Type = MSG_RSS_NODEINFO_REQ; 02127 TxEvent->StorageRequest.Header.Size = SizeMsgRSSnodeInfoReq; 02128 TxEvent->StorageRequest.Header.Token = RSS_TOKEN_ROUTER; 02129 02130 sn_sendqueue.Put( (Event*) TxEvent ); 02131 02132 // Se esta funcao for chamada novamente, entao estaremos tentado 02133 // reiniciar a conexao com o servidor de armazenamento. 02134 sn_restart = true; 02135 } 02136 }
void SNode::rxthread_diskinfo | ( | EventStorageReply * | event | ) | [private] |
Definition at line 2225 of file DiskMgr.cpp.
02226 { 02227 int diskid; 02228 Disk *dp; 02229 bool sendevent; 02230 02231 #ifdef RIO_DEBUG1 02232 RioErr << "### [SNode - rxthread_diskinfo] Start" << endl; 02233 #endif 02234 02235 if( ( event->StorageReply.DiskInfo.Size != SizeMsgRSSdiskInfo ) 02236 || ( event->StorageReply.DiskInfo.Token != RSS_TOKEN_STORAGE )) 02237 { 02238 if( sn_mgr->m_log.is_open() ) 02239 sn_mgr->m_log << "DiskMgr.rxthread invalid diskinfo msg " 02240 << "ignored for " << sn_hostname << endl; 02241 02242 EventManager.Free((Event*) event); 02243 return; 02244 } 02245 02246 if( sn_mgr->m_log.is_open() ) 02247 sn_mgr->m_log << " DiskMgr.rxthread_diskinfo: node " << sn_hostname 02248 << " Disk " << event->StorageReply.DiskInfo.Disk 02249 << " DiskSize " << event->StorageReply.DiskInfo.DiskSize 02250 << " DiskName " << event->StorageReply.DiskInfo.DiskName; 02251 // ---------------------------------------------------------- 02252 02253 char diskname[256] = "", diskdevice[256] = ""; 02254 strcpy( diskname, event->StorageReply.DiskInfo.DiskName ); 02255 for( int aux = 0 ; diskname[aux] != '\0'; aux++ ) 02256 if( diskname[aux] == '/' ) 02257 diskname[aux] = '.'; 02258 sprintf( diskdevice, "%s/%s.%s", sn_mgr->m_MetaRoot, sn_hostname, diskname); 02259 02260 if( sn_mgr->m_log.is_open() ) 02261 sn_mgr->m_log << " DiskBitsName " << diskdevice << endl; 02262 // ------------------------------------------------------------------------ 02263 02264 if( event->StorageReply.DiskInfo.Disk > sn_maxdisks ) 02265 { 02266 if( sn_mgr->m_log.is_open() ) 02267 sn_mgr->m_log << "DiskMgr.rxthread Disk > maxdisks " 02268 << "ignored for " << sn_hostname << endl; 02269 EventManager.Free((Event*) event); 02270 return; 02271 } 02272 02273 sn_disksize[event->StorageReply.DiskInfo.Disk] = 02274 event->StorageReply.DiskInfo.DiskSize; 02275 02276 diskid = sn_diskidorg + event->StorageReply.DiskInfo.Disk; 02277 02278 if( sn_mgr->m_disks[diskid] ) 02279 { 02280 if( sn_mgr->m_log.is_open() ) 02281 sn_mgr->m_log << "Diskmgr.rxthread duplicate disk info for disk " 02282 << diskid << " from " << sn_hostname 02283 << " ignored " << endl; 02284 02285 EventManager.Free((Event*) event); 02286 return; 02287 } 02288 02289 // ### should locking be here or in Disk ctor/dtor?? 02290 pthread_mutex_lock( &sn_mgr->m_mutex ); 02291 dp = new Disk( sn_mgr, this ); 02292 dp->d_diskid = diskid; 02293 dp->d_diskseq = event->StorageReply.DiskInfo.Disk; 02294 dp->d_ntot = sn_disksize[dp->d_diskseq] / sn_mgr->m_BlockSize; 02295 // ### use log file instead of cerr (?) - need to pass *ostream? 02296 02297 // the diskbits file name will be -------------------- 02298 // storagenode.devicename instead of DiskBits1 and so on 02299 // It will avoid errors if the device order is modified at disk.cfg 02300 dp->d_BitMap.Initialize( diskdevice, dp->d_ntot, &cerr ); 02301 02302 //added to support df command ---------------------------------- 02303 strcpy( dp->d_DiskName, event->StorageReply.DiskInfo.DiskName ); 02304 // ------------------------------------------------------------------------ 02305 02306 // ### should delay allowing alloc until load or format has been called 02307 // for this d_BitMap? 02308 dp->d_status = Disk::STAT_READ | Disk::STAT_WRITE | Disk::STAT_ALLOC; 02309 sn_mgr->m_disks[diskid] = dp; 02310 02311 // Verifica se todos os discos ja foram inicializados. Existe um modo melhor 02312 // do que percorrer todas as entradas para este servidor de armazenamento? 02313 sendevent = true; 02314 for( int d = 0; d < sn_disks; d++ ) 02315 sendevent = sendevent && ( sn_mgr->m_disks[ d + sn_diskidorg ] != 0 ); 02316 02317 pthread_mutex_unlock( &sn_mgr->m_mutex ); 02318 02319 // Why this was not here before ???????? 02320 EventManager.Free(( Event* ) event ); 02321 02322 // Send add disk message to Router 02323 EventAddDisk* eventdisk; 02324 eventdisk = ( EventAddDisk* ) EventManager.New( EventTypeAddDisk ); 02325 eventdisk->Disk = diskid; 02326 sn_mgr->m_Router->Put(( Event* )eventdisk ); 02327 02328 // Envia uma mensagem ao roteador informando que o servidor de armazenamento 02329 // inicializou com sucesso. Isso somente sera feito se este servidor 02330 // inicializar apos o servidor de gerenciamento (isso e indicado pela 02331 // variavel booleana sn_sendevent, que indica a necessidade de enviarmos 02332 // um evento ao Router). A mensagem somente devera ser enviada apos todas 02333 // as mensagens de todos os discos forem recebidas com sucesso. Note que, 02334 // no caso de o servidor esta inicializando pela primeira vez, apos o 02335 // servidor de gerenciamento, precisamos carregar todos os bitmaps dos 02336 // discos. 02337 if( ( sn_sendevent ) && ( sendevent ) ) 02338 { 02339 unsigned int storageid; 02340 02341 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks; 02342 02343 // Carrega os bitmaps dos discos deste servidor de armazenamento. Note 02344 // que quando o servidor de armazenamento iniciar antes do servidor de 02345 // gerenciamento, isso sera feito na funcao Start da classe DiskMgr. 02346 sn_mgr->LoadDisksBitMaps( storageid ); 02347 02348 // Libera todos os blocos associados a este servidor de armazenamento 02349 // que acabou de inicializar, caso existam blocos a serem liberados. 02350 if( sn_readfreeblocksfile ) 02351 { 02352 sn_mgr->FreeSalvedBlocks( sn_hostname ); 02353 sn_readfreeblocksfile = false; 02354 } 02355 02356 // Contabiliza os discos que passaram a ficar ativos. 02357 sn_mgr->m_NumberOfActiveDisks = sn_mgr->m_NumberOfActiveDisks + 02358 sn_disks; 02359 02360 // O servidor de armazenamento reiniciou com sucesso. Precisamos entao 02361 // enviar o evento, ao Router, informando que este servidor esta 02362 // novamente ativo. 02363 EventStorageUp* event; 02364 // Depois de restabelecida a conexao com sucesso, enviamos um 02365 // outro evento indicando que o servidor de armazenamento voltou 02366 // a ficar disponivel. 02367 event = ( EventStorageUp * ) EventManager.New( EventTypeStorageUp ); 02368 event->StorageId = storageid; 02369 02370 #ifdef RIO_DEBUG2 02371 RioErr << "SNode::rxthread_confirmation enviando a mensagem " 02372 << "EventStorageUp ao Router do servidor de armazenamento com " 02373 << "a ID " << event->StorageId << endl; 02374 #endif 02375 02376 sn_mgr->m_Router->Put( ( Event * ) event ); 02377 02378 // Somente para evitar o envio de um outro evento. 02379 sn_sendevent = false; 02380 } 02381 02382 #ifdef RIO_DEBUG1 02383 RioErr << "### [SNode - rxthread_diskinfo] End" << endl; 02384 #endif 02385 }
void SNode::rxthread_diskservicetimeinfo | ( | EventStorageReply * | event | ) | [private] |
Definition at line 2389 of file DiskMgr.cpp.
02390 { 02391 if( ( event->StorageReply.DiskServiceTimeInfo.Size != 02392 SizeMsgRSSdiskServiceTimeInfo ) || 02393 ( event->StorageReply.DiskServiceTimeInfo.Token != RSS_TOKEN_STORAGE )) 02394 { 02395 if( sn_mgr->m_log.is_open() ) 02396 sn_mgr->m_log << "DiskMgr.rxthread invalid diskServiceTimeinfo msg " 02397 << "ignored for " << sn_hostname << endl; 02398 EventManager.Free((Event*) event); 02399 return; 02400 } 02401 02402 if( event->StorageReply.DiskServiceTimeInfo.Disk > sn_maxdisks ) 02403 { 02404 if( sn_mgr->m_log.is_open() ) 02405 sn_mgr->m_log << "DiskMgr.rxthread Disk > maxdisks " 02406 << "ignored for " << sn_hostname 02407 << endl; 02408 02409 EventManager.Free((Event*) event); 02410 return; 02411 } 02412 02413 #ifdef RIO_DEBUG2 02414 if( sn_mgr->m_log.is_open() ) 02415 sn_mgr->m_log << "DiskMgr.rxthread received servicetimeinfo of disk " 02416 << event->StorageReply.DiskServiceTimeInfo.Disk << ":" 02417 << event->StorageReply.DiskServiceTimeInfo.DiskId 02418 << endl; 02419 #endif 02420 02421 02422 sn_mgr->m_Router->SetDiskServiceTime( 02423 event->StorageReply.DiskServiceTimeInfo.DiskId, 02424 event->StorageReply.DiskServiceTimeInfo.EstimatedDiskServTime, 02425 event->StorageReply.DiskServiceTimeInfo.EstimatedServTimeAccT); 02426 EventManager.Free((Event*)event); 02427 }
void SNode::rxthread_nodeinfo | ( | EventStorageReply * | event | ) | [private] |
Definition at line 2168 of file DiskMgr.cpp.
02169 { 02170 EventStorageRequest *TxEvent; 02171 int i; 02172 02173 if( ( event->StorageReply.NodeInfo.Size != SizeMsgRSSnodeInfo ) 02174 || ( event->StorageReply.NodeInfo.Token != RSS_TOKEN_STORAGE )) 02175 { 02176 if( sn_mgr->m_log.is_open() ) 02177 sn_mgr->m_log << "DiskMgr.rxthread invalid nodeinfo msg " 02178 << "ignored for " << sn_hostname << endl; 02179 02180 EventManager.Free(( Event* ) event ); 02181 return; 02182 } 02183 02184 if( sn_disks != 0 ) 02185 { 02186 if( sn_mgr->m_log.is_open() ) 02187 sn_mgr->m_log << "DiskMgr.rxthread duplicate nodeinfo " 02188 << "ignored for " << sn_hostname << endl; 02189 02190 EventManager.Free(( Event* ) event ); 02191 return; 02192 } 02193 02194 i = event->StorageReply.NodeInfo.nDisks; 02195 02196 if( i > sn_maxdisks ) 02197 { 02198 if( sn_mgr->m_log.is_open() ) 02199 sn_mgr->m_log << "DiskMgr.rxthread nDisks " << i 02200 << " > sn_maxdisks " << sn_maxdisks 02201 << " ignored for " << sn_hostname << endl; 02202 02203 EventManager.Free((Event*) event); 02204 return; 02205 } 02206 02207 sn_disks = i; 02208 EventManager.Free(( Event* ) event ); 02209 02210 // send disk info request for each disk 02211 for( i = 0; i < sn_disks; i++ ) 02212 { 02213 TxEvent = ( EventStorageRequest* ) 02214 EventManager.New(EventTypeStorageRequest); 02215 TxEvent->StorageRequest.DiskInfo.Type = MSG_RSS_DISKINFO_REQ; 02216 TxEvent->StorageRequest.DiskInfo.Size = SizeMsgRSSdiskInfoReq; 02217 TxEvent->StorageRequest.DiskInfo.Token = RSS_TOKEN_ROUTER; 02218 TxEvent->StorageRequest.DiskInfo.Disk = i; 02219 sn_sendqueue.Put(( Event* )TxEvent ); 02220 } 02221 }
void * SNode::rxthreadep | ( | void * | parm | ) | [static, private] |
Definition at line 1747 of file DiskMgr.cpp.
01748 { 01749 ((SNode *)parm)->rxthread(); 01750 return 0; 01751 }
void SNode::SendStorageNode | ( | EventStorageRequest * | event, | |
u16 | DiskSeq | |||
) |
Definition at line 2521 of file DiskMgr.cpp.
02522 { 02523 if( event->Header.Type != EventTypeStorageRequest ) 02524 { 02525 if( sn_mgr->m_log.is_open() ) 02526 sn_mgr->m_log << "SNode::SendStorageNode ERROR: Event has " 02527 << "invalid type:" << (int) (event->Header.Type) 02528 << endl; 02529 02530 EventManager.Free((Event*)event); 02531 return; 02532 } 02533 u16 type = event->StorageRequest.Header.Type & 0xf; 02534 02535 if( ( type == MSG_RSS_READ ) || ( type == MSG_RSS_WRITE ) || 02536 ( type == MSG_RSS_FETCH ) || ( type == MSG_RSS_RECEIVE )) 02537 { 02538 event->StorageRequest.Request.New.Disk = DiskSeq; 02539 } 02540 //bug: "if" changed - "or" by "and" 02541 else if( ( type != MSG_RSS_SEND ) && 02542 ( type != MSG_RSS_FLUSH ) && 02543 ( type != MSG_RSS_CANCEL ) ) 02544 { 02545 if( sn_mgr->m_log.is_open() ) 02546 sn_mgr->m_log << "SNode::SendStorageNode ERROR: Storage request" 02547 << " has invalid type: " 02548 << event->StorageRequest.Header.Type << endl; 02549 02550 EventManager.Free((Event*)event); 02551 return; 02552 } 02553 sn_sendqueue.Put((Event*) event); 02554 02555 return; 02556 }
int SNode::txthread | ( | ) | [private] |
Definition at line 2439 of file DiskMgr.cpp.
02440 { 02441 EventStorageRequest *event = NULL; 02442 Event *auxevent; 02443 int rc; 02444 02445 RioErr << "TXTHREADID " << syscall( SYS_gettid ) << endl; 02446 02447 while( true ) 02448 { 02449 //if( sn_termflag ) return 0; 02450 02451 // Obtem o mutex para o acesso exclusivo a sn_isenabled. 02452 pthread_mutex_lock( &sn_mutex ); 02453 // Verifica se precisamos esperar pela conexao. 02454 if( !sn_isenabled ) // Neste caso, esperamos ate que a conexao seja 02455 // restabelecida. 02456 pthread_cond_wait( &sn_waitstorage, &sn_mutex ); 02457 // Libera o mutex para o acesso exclusivo a sn_isenabled. 02458 pthread_mutex_unlock( &sn_mutex ); 02459 02460 // O Get abaixo retorna um Event 02461 //event = (EventStorageRequest*) sn_sendqueue.Get(); 02462 auxevent = sn_sendqueue.Get(); 02463 // Verifica se e um evento gerado para terminar a thread. 02464 if( auxevent->Type == EventTypeFinalizeThread ) 02465 { 02466 EventManager.Free( (Event*) auxevent ); 02467 RioErr << " DiskMgr txthread finished. " << endl; 02468 // Como recebemos um evento para terminar a thread, devemos entao 02469 // termina-la. 02470 //pthread_exit( NULL ); 02471 return 0; 02472 } 02473 02474 event = ( EventStorageRequest* ) auxevent; 02475 #ifdef RIO_DEBUG2 02476 DataRequest& Request = (( EventDataRequest* )( event ))->Request; 02477 struct in_addr clientip; 02478 02479 clientip.s_addr = Request.Target.IPaddress; 02480 RioErr << "Enviando ao storage agora!: " 02481 << "IP " << inet_ntoa(clientip) 02482 << " port " << Request.Target.Port 02483 << " reqid " << Request.Reqid 02484 << " block " << Request.Block 02485 << endl; 02486 #endif 02487 02488 /*if( sn_termflag ) 02489 { 02490 EventManager.Free((Event*)event); 02491 return 0; 02492 }*/ 02493 02494 rc = sn_socket.Send( (char *) &(event->StorageRequest.Header.Type), 02495 event->StorageRequest.Header.Size ); 02496 if( rc ) 02497 { 02498 if( sn_mgr->m_log.is_open() ) 02499 sn_mgr->m_log << "SNode.txthread send error " << rc << endl; 02500 RioErr << " DiskMgr txthread error. See log file. " << endl; 02501 } 02502 02503 // Esta impressao foi comentada porque estes logs ainda nao sao 02504 // necessarios. Se for necessario usar estes logs, esta linha do codigo 02505 // pode ser descomentada (ja existe um script para processar estes 02506 // logs). 02507 //#ifdef RIO_DEBUG_EMUL 02508 //struct in_addr ip; 02509 //ip.s_addr = sn_IPaddress.Host; 02510 //RioErr << "SENDSTORAGE " << ( int ) time( NULL ) << " " 02511 // << inet_ntoa( ip ) << " " << event->StorageRequest.Header.Size 02512 // << endl; 02513 //#endif 02514 02515 EventManager.Free((Event*)event); 02516 } // fim do while( true ); 02517 }
void * SNode::txthreadep | ( | void * | parm | ) | [static, private] |
Definition at line 2431 of file DiskMgr.cpp.
02432 { 02433 (( SNode * )parm )->txthread(); 02434 return 0; 02435 }
int SNode::sn_diskidorg [private] |
int SNode::sn_disks [private] |
long long int SNode::sn_disksize[sn_maxdisks] [private] |
char* SNode::sn_hostname [private] |
vsiIPaddress SNode::sn_IPaddress [private] |
bool SNode::sn_isenabled [private] |
SNode* SNode::sn_link [private] |
const int SNode::sn_maxdisks = MAXSTORAGEDISKSARRAY [static] |
DiskMgr* SNode::sn_mgr [private] |
pthread_mutex_t SNode::sn_mutex [private] |
bool SNode::sn_readfreeblocksfile [private] |
bool SNode::sn_restart [private] |
pthread_t SNode::sn_rxthread [private] |
bool SNode::sn_sendevent [private] |
EventQueue SNode::sn_sendqueue [private] |
CvsiTCPsocket SNode::sn_socket [private] |
int SNode::sn_termflag [private] |
pthread_t SNode::sn_txthread [private] |
pthread_cond_t SNode::sn_waitstorage [private] |