SNode Class Reference

#include <DiskMgr.h>

Public Member Functions

void SendStorageNode (EventStorageRequest *event, u16 DiskSeq)
void GetDiskServiceTime (EventStorageRequest *event, u16 DiskSeq)

Static Public Attributes

static const int sn_maxdisks = MAXSTORAGEDISKSARRAY

Private Member Functions

 SNode (DiskMgr *mgr, char *hostname)
 ~SNode ()
int txthread ()
int rxthread ()
void rxthread_nodeinfo (EventStorageReply *event)
void rxthread_diskinfo (EventStorageReply *event)
void rxthread_confirmation (EventStorageReply *event)
 Nova funcao para processar o recebimento da resposta a mensagem de inicializacao enviada na funcao DiskMgr::BuildNode.
void rxthread_diskservicetimeinfo (EventStorageReply *event)

Static Private Member Functions

static void * txthreadep (void *parm)
static void * rxthreadep (void *parm)

Private Attributes

SNodesn_link
DiskMgrsn_mgr
char * sn_hostname
vsiIPaddress sn_IPaddress
CvsiTCPsocket sn_socket
EventQueue sn_sendqueue
pthread_t sn_txthread
pthread_t sn_rxthread
int sn_termflag
int sn_disks
int sn_diskidorg
long long int sn_disksize [sn_maxdisks]
bool sn_restart
bool sn_isenabled
bool sn_sendevent
pthread_mutex_t sn_mutex
pthread_cond_t sn_waitstorage
bool sn_readfreeblocksfile

Friends

class DiskMgr

Detailed Description

Definition at line 222 of file DiskMgr.h.


Constructor & Destructor Documentation

SNode::SNode ( DiskMgr mgr,
char *  hostname 
) [private]

Definition at line 1643 of file DiskMgr.cpp.

01644 {
01645     int i;
01646 
01647     sn_mgr = mgr;
01648     pthread_mutex_lock( &mgr->m_mutex );
01649     i = strlen( hostname ) + 1;
01650     sn_hostname = new char[i];
01651     if( sn_hostname )
01652     {
01653         memcpy( sn_hostname, hostname, i );
01654     }
01655 
01656     sn_txthread = 0;
01657     sn_rxthread = 0;
01658     sn_termflag = false;
01659 
01660     sn_link = NULL;
01661     memset( (char *)&sn_IPaddress, 0, sizeof( vsiIPaddress ) );
01662     // sn_socket does not need to be initialized (done by it's constructorv)
01663     // sn_sendqueue does not need to be initialized (done by it's constructorv)
01664     sn_diskidorg = 0;
01665 
01666     SNode *nodelast = VIRTORG( mgr->m_nodelist, SNode, sn_link );
01667 
01668     while( nodelast->sn_link != 0 )
01669         nodelast = nodelast->sn_link;
01670 
01671     nodelast->sn_link = this;
01672 
01673     sn_disks = 0;
01674     for( i = 0; i < sn_maxdisks; i++ )
01675     {
01676         sn_disksize[i] = 0;
01677     }
01678     
01679     // Inicializa as novas variaveis criadas pela implementacao que permite a
01680     // recuperacao dos servidores de armazenamento.
01681     sn_restart = false;
01682     sn_isenabled = false;
01683     sn_sendevent = false;
01684     sn_readfreeblocksfile = false;
01685 
01686     pthread_mutex_unlock( &mgr->m_mutex );
01687     
01688     // Inicializa o mutex e a variavel de condicao, usados pela nova 
01689     // implementacao que permite a recuperacao dos storages.
01690     pthread_mutex_init( &sn_mutex, NULL );
01691     pthread_cond_init( &sn_waitstorage, NULL );
01692 }

SNode::~SNode (  )  [private]

Definition at line 1694 of file DiskMgr.cpp.

01695 {
01696     Event *event;
01697     // Cancela as threads de leitura e de escrita do servidor de armazenameto
01698     // (storage) associado a este no.
01699     sn_termflag = true;
01700     // Fecha o socket para leitura e escrita (com isso, a thread rxthread de 
01701     // recepcao terminara caso esteja esperando na funcao receive, e a thread
01702     // txthread de transmissao nao enviara mais pacotes ao servidor de 
01703     // armazenamento).
01704     sn_socket.Close();
01705     // Coloca um evento especial EventTypeFinalizeThread na fila de eventos 
01706     // para informar a thread que ela deve terminar a sua execucao.
01707     if( sn_txthread != 0 )
01708     {
01709         event = EventManager.New( EventTypeFinalizeThread );
01710         sn_sendqueue.Put( event );
01711         // Desbloquea a thread txthread caso ela esteja esperando pelo reinicio do
01712         // servidor de armazenamento associado a esta classe.
01713         pthread_cond_signal( &sn_waitstorage );
01714         // Espera que a thread de envio termine.
01715         pthread_join( sn_txthread, NULL );
01716     }
01717     if( sn_rxthread != 0 ) // Espera que a thread de recebimento termine.
01718         pthread_join( sn_rxthread, NULL ); 
01719     
01720     // ### needs to check that there aren't any refs from a Disk to this SNode
01721     pthread_mutex_lock( &sn_mgr->m_mutex );
01722     SNode *node = VIRTORG( sn_mgr->m_nodelist, SNode, sn_link );
01723 
01724     while( node->sn_link != 0 )
01725     {
01726         if( node->sn_link == this )
01727         {
01728             node->sn_link = this->sn_link;
01729             break;
01730         }
01731         // added to fix bug --------------------------------------
01732         // it was not removing snode
01733         node = node->sn_link;
01734         // --------------------------------------------------------------------
01735     }
01736 
01737     pthread_mutex_unlock( &sn_mgr->m_mutex );
01738 
01739     delete [] sn_hostname;
01740 
01741     // Destroi o mutex e a variavel de condicao, usados pela nova implementacao 
01742     // que permite a recuperacao dos storages.
01743     pthread_mutex_destroy( &sn_mutex );
01744     pthread_cond_destroy( &sn_waitstorage );
01745 }


Member Function Documentation

void SNode::GetDiskServiceTime ( EventStorageRequest event,
u16  DiskSeq 
)

Definition at line 2559 of file DiskMgr.cpp.

02560 {
02561     if( event->Header.Type != EventTypeStorageRequest )
02562     {
02563         if( sn_mgr->m_log.is_open() )
02564             sn_mgr->m_log << "SNode::GetDiskServiceTime ERROR: "
02565                           << "Event has invalid type:"
02566                           << (int) (event->Header.Type) << endl;
02567 
02568         EventManager.Free((Event*)event);
02569         return;
02570     }
02571     u16 type = event->StorageRequest.Header.Type;
02572     if( type == MSG_RSS_DISKSERVICETIMEINFO_REQ )
02573     {
02574         event->StorageRequest.DiskServiceTimeInfo.Disk = DiskSeq;
02575     }
02576     else
02577     {
02578         if( sn_mgr->m_log.is_open() )
02579             sn_mgr->m_log << "SNode::GetDiskServiceTime ERROR: Storage request"
02580                           << " has invalid type: "
02581                           << event->StorageRequest.Header.Type << endl;
02582 
02583         EventManager.Free((Event*)event);
02584         return;
02585     }
02586 
02587     #ifdef RIO_DEBUG2
02588     if( sn_mgr->m_log.is_open() )
02589         sn_mgr->m_log << "SNode::GetDiskServiceTime getting service time for "
02590                       << "disk "
02591                       << event->StorageRequest.DiskServiceTimeInfo.Disk << " : "
02592                       << event->StorageRequest.DiskServiceTimeInfo.DiskId
02593                       << endl;
02594     #endif
02595 
02596     sn_sendqueue.Put((Event*) event);
02597 
02598     return;
02599 }

int SNode::rxthread (  )  [private]

Definition at line 1756 of file DiskMgr.cpp.

01757 {
01758     int rc;
01759     EventStorageReply *event;
01760     //const int pfxsize = sizeof(event->StorageReply.Header.Type) +
01761     //                    sizeof(event->StorageReply.Header.Size);
01762     const unsigned long pfxsize = (unsigned long) &event->StorageReply.Header.Token -
01763                                   (unsigned long) &event->StorageReply.Header;
01764     bool isenabled;                    
01765 
01766     RioErr << "RXTHREADID " << syscall( SYS_gettid ) << endl;
01767 
01768     while( 1 )
01769     {
01770         if( sn_termflag )
01771         {
01772             RioErr << " DiskMgr rxthread finished. " << endl;
01773             pthread_exit( NULL );
01774         }
01775         
01776         // Obtem o valor de isenabled.
01777         // Obs: obter o mutex e necessario? ou podemos fazer como antes, 
01778         // retirando as 3 linhas de codigo a seguir e usando o if comentado?
01779         // Obtem o mutex para o acesso exclusivo a sn_isenabled.
01780         pthread_mutex_lock( &sn_mutex );
01781         // Obtem o valor atual de sn_isenabled.
01782         isenabled = sn_isenabled;
01783         // Libera o mutex para o acesso exclusivo a sn_isenabled.
01784         pthread_mutex_unlock( &sn_mutex );
01785         
01786         // Verifica se e necessario recriar a conexao com o servidor de 
01787         // armazenamento, ou se podemos enviar os dados.
01788         if( isenabled )
01789         {
01790             // A conexao esta habilitada, logo podemos receber novas mensagens.
01791             event = ( EventStorageReply* ) EventManager.New( 
01792                                                         EventTypeStorageReply );
01793             rc = sn_socket.Receive( ( char * ) &event->StorageReply.Header.Type, 
01794                                     pfxsize );
01795 
01796             if( sn_termflag )
01797             {
01798                 EventManager.Free( ( Event* ) event );
01799                 RioErr << " DiskMgr rxthread finished. " << endl;
01800                 pthread_exit( NULL );
01801             }
01802 
01803             if( rc )
01804             {
01805                 unsigned int storageid;
01806 
01807                 EventManager.Free( ( Event* ) event );
01808                 
01809                 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
01810                 
01811                 #ifdef RIO_DEBUG2
01812                 RioErr << "SNode::rxthread receive retornou o erro " << errno 
01813                        << " (" << strerror( errno ) << ") para o storage na "
01814                        << "maquina " << sn_hostname << endl;
01815                 #endif        
01816 
01817                 if( sn_mgr->m_log.is_open() )
01818                     sn_mgr->m_log << "DiskMgr.rxthread receive1 failed." 
01819                                   << endl;
01820 
01821                 RioErr << " DiskMgr rxthread error. See log file. Storage "     
01822                        << "server " << sn_hostname << " is down" << endl;
01823                        
01824                 // Define o estado do servidor de armazenamento como 
01825                 // temporariamente parado.
01826                 // Obtem o mutex para o acesso exclusivo a sn_isenabled.
01827                 pthread_mutex_lock( &sn_mutex );
01828                 // Define o estado do servidor de armazenamento como parado.
01829                 sn_isenabled = false;
01830                 // Libera o mutex para o acesso exclusivo a sn_isenabled.
01831                 pthread_mutex_unlock( &sn_mutex );
01832                 // Gera um evento para informar ao router da queda do servidor 
01833                 // de armazenamento
01834                 EventStorageDown* event;
01835                 event = ( EventStorageDown * ) EventManager.New( 
01836                                                          EventTypeStorageDown );
01837 
01838                 event->StorageId = storageid;
01839                 event->EmptyStorageQueue = true;
01840 
01841                 #ifdef RIO_DEBUG2
01842                 RioErr << "SNode::rxthread enviando a mensagem "
01843                        << "EventStorageDown ao Router do servidor de "
01844                        << "armazenamento com a ID " << event->StorageId << endl;
01845                 #endif       
01846 
01847                 sn_mgr->m_Router->Put( ( Event * ) event );
01848                 
01849             } 
01850             else
01851             {
01852                 if( event->StorageReply.Header.Size > 
01853                     sizeof( event->StorageReply ) )
01854                 {
01855                     if( sn_mgr->m_log.is_open() )
01856                         sn_mgr->m_log << "DiskMgr.rxthread msg too large "
01857                                       << event->StorageReply.Header.Size
01858                                       << endl;
01859 
01860                     RioErr << " DiskMgr rxthread error. See log file. " << endl;
01861                 } 
01862                 else
01863                 {
01864                     rc = sn_socket.Receive( (char *) &event->StorageReply.
01865                                                      Header.Token,
01866                                              event->StorageReply.Header.Size - 
01867                                              pfxsize );
01868 
01869                     if( sn_termflag )
01870                     {
01871                         EventManager.Free( ( Event* ) event );
01872                         RioErr << " DiskMgr rxthread finished. " << endl;
01873                         pthread_exit( NULL );
01874                     }
01875                                              
01876                     if( rc )
01877                     {
01878                         unsigned int storageid;
01879 
01880                         EventManager.Free( ( Event* ) event );
01881 
01882                         storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
01883                 
01884                         if( sn_mgr->m_log.is_open() )
01885                             sn_mgr->m_log << "DiskMgr.rxthread receive2 failed." 
01886                                           << endl;
01887                                           
01888                         RioErr << " DiskMgr rxthread error. See log file. "     
01889                                << "Storage server " << sn_hostname 
01890                                << " is down" << endl;  
01891 
01892                         // Obtem o mutex para o acesso exclusivo a sn_isenabled.
01893                         pthread_mutex_lock( &sn_mutex );
01894                         // Define o estado do servidor de armazenamento como 
01895                         // parado.
01896                         sn_isenabled = false;
01897                         // Libera o mutex para o acesso exclusivo a 
01898                         // sn_isenabled.
01899                         pthread_mutex_unlock( &sn_mutex );
01900 
01901                         // Gera um evento para informar ao router da queda do 
01902                         // servidor de armazenamento
01903                         EventStorageDown* event;
01904                         event = ( EventStorageDown * ) EventManager.New( 
01905                                                          EventTypeStorageDown );
01906 
01907                         event->StorageId = storageid;
01908                         event->EmptyStorageQueue = true;                   
01909 
01910                         #ifdef RIO_DEBUG2 
01911                         RioErr << "SNode::rxthread enviando a mensagem "
01912                                << "EventStorageDown ao Router do servidor de "
01913                                << "armazenamento com a ID " << event->StorageId 
01914                                << endl;
01915                         #endif       
01916 
01917                         sn_mgr->m_Router->Put( ( Event * ) event ); 
01918                     }
01919                     else
01920                     {
01921 
01922                         // Esta impressao foi comentada porque estes logs ainda
01923                         // nao sao necessarios. Se for necessario usar estes
01924                         // logs, esta linha do codigo pode ser descomentada 
01925                         // (ja existe um script para processar estes logs).
01926                         //#ifdef RIO_DEBUG_EMUL
01927                         //struct in_addr ip;
01928                         //ip.s_addr = sn_IPaddress.Host;
01929                         //RioErr << "RECEIVESTORAGE " << ( int ) time( NULL ) 
01930                         //       << " " << inet_ntoa( ip ) << " " 
01931                         //       << event->StorageReply.Header.Size << endl;
01932                         //#endif
01933 
01934                         if( sn_termflag )
01935                         {
01936                             EventManager.Free(( Event* )event );
01937                             RioErr << " DiskMgr rxthread finished. " << endl;
01938                             return 0;
01939                         }
01940 
01941                         switch( event->StorageReply.Header.Type )
01942                         {
01943                             case MSG_RSS_NODEINFO:
01944                                 rxthread_nodeinfo( event );
01945                                 break;
01946 
01947                             case MSG_RSS_DISKINFO:
01948                                 rxthread_diskinfo( event );
01949                                 break;
01950 
01951                             case MSG_RSS_DISKSERVICETIMEINFO:
01952                                 rxthread_diskservicetimeinfo( event );
01953                                 break;
01954                 
01955                             // Processa a nova mensagem indicando que a 
01956                             // inicializacao do servidor de armazenamento foi 
01957                             // bem sucedida.
01958                             case MSG_RSS_CONFIRMATION:
01959                                 rxthread_confirmation( event );
01960                                 break; 
01961                                
01962                             #ifdef RIO_DEGUG2 
01963                             // Processa as mensagens associadas ao gerenciamento 
01964                             // dos logs. 
01965                             // Obs: atualmente estas mensagens sao somente 
01966                             // usadas para depurar o codigo. 
01967                             case MSG_RSS_SEARCHLOGS_STATUS:
01968                                  rxthread_searchlogsstatus( event );    
01969                                  break;
01970                             #endif    
01971                             // -------------------------------------------------
01972 
01973                            default: 
01974                                sn_mgr->m_Router->Put( ( Event* )event );
01975                                break;
01976                         }
01977                     }
01978                 }
01979             }
01980         }
01981         else
01982         {
01983             // A conexao caiu ou ainda nao foi inicializada, logo precisamos 
01984             // restabelecer a conexao com o servidor de armazenamento.
01985             // Antes, porem, fechamos a conexao anterior com o servidor de 
01986             // armazenamento, se ela estiver aberta.
01987             sn_socket.Close();  
01988             // Tenta reiniciar a conexao com o servidor de armazenamento.
01989             rc =  sn_socket.Connect( &sn_IPaddress );
01990             if( rc )
01991             {
01992                 // Se a conexao falhar, tentamos mais tarde
01993                 RioErr << " connect failed for " << sn_hostname << ": " 
01994                        << GetErrorDescription( rc ) << ". Trying again!" 
01995                        << endl;
01996                 // Dorme por algum tempo.
01997                 sleep( sn_mgr->m_TimeBetweenAttempts );
01998             } 
01999             else
02000             {
02001                 // Remove todos os eventos do servidor de armazenamento.
02002                 Event *oldevent;
02003                 do
02004                 {
02005                     oldevent = ( Event * ) sn_sendqueue.Remove();
02006                     if( oldevent != NULL )
02007                         EventManager.Free( oldevent );
02008                 } while( oldevent != NULL );
02009                 
02010                 RioErr << "SNode::rxthread conexao executada com sucesso para "
02011                        << "o servidor de armazenamento " << sn_hostname
02012                        << ". Enviando a mensagem MSG_RSS_INITIALIZATION_REQ "
02013                        << "para reiniciar o servidor de armazenamento "
02014                        << endl; 
02015                 // Obtem o mutex para o acesso exclusivo a sn_isenabled.
02016                 pthread_mutex_lock( &sn_mutex );
02017                 // Define o estado do servidor de armazenamento como em 
02018                 // funcionamento.
02019                 sn_isenabled = true;
02020                 // Define que precisamos enviar o evento EventTypeStorageUp
02021                 // ao router
02022                 sn_sendevent = true;
02023                 // Envia um sinal para acordar a thread que envia dados ao
02024                 // servidor de armazenamento.
02025                 pthread_cond_signal( &sn_waitstorage );
02026                 // Envia a mensagem de reinicio.
02027                 EventStorageRequest *event;
02028                 event = ( EventStorageRequest* ) EventManager.New( 
02029                                                       EventTypeStorageRequest );
02030                 event->StorageRequest.Header.Type = MSG_RSS_INITIALIZATION_REQ;
02031                 event->StorageRequest.Header.Size = SizeMsgRSSInitializationReq;
02032                 event->StorageRequest.Header.Token = RSS_TOKEN_ROUTER;
02033                 MsgRSSInitializationReq* msg = & ( event->StorageRequest.
02034                                                           Initialization );   
02035                 msg->BlockSize = sn_mgr->m_BlockSize;
02036                 sn_sendqueue.Put( ( Event* ) event );
02037                 // Libera o mutex para o acesso exclusivo a sn_isenabled.
02038                 pthread_mutex_unlock( &sn_mutex );
02039 
02040             }  
02041         }
02042     }
02043 }

void SNode::rxthread_confirmation ( EventStorageReply event  )  [private]

Nova funcao para processar o recebimento da resposta a mensagem de inicializacao enviada na funcao DiskMgr::BuildNode.

Parameters:
event ponteiro para evento a ser processado, com a mensagem recebida.

Definition at line 2048 of file DiskMgr.cpp.

02049 {
02050     EventStorageRequest *TxEvent;
02051     bool isenabled;
02052 
02053     if( ( event->StorageReply.NodeInfo.Size != SizeMsgRSSConfirmation )
02054         || ( event->StorageReply.NodeInfo.Token != RSS_TOKEN_STORAGE ))
02055     {
02056         RioErr << "event->StorageReply.NodeInfo.Size = " 
02057                << event->StorageReply.NodeInfo.Size << endl;
02058         RioErr << "SizeMsgRSSConfirmation = " << SizeMsgRSSConfirmation 
02059                << endl;
02060         RioErr << "event->StorageReply.NodeInfo.Token = " << hex 
02061                << event->StorageReply.NodeInfo.Token << dec << endl;
02062         RioErr << "RSS_TOKEN_STORAGE = " << hex << RSS_TOKEN_STORAGE << dec 
02063                << endl;
02064         if( sn_mgr->m_log.is_open() )
02065             sn_mgr->m_log << "DiskMgr.rxthread invalid confirmation msg "
02066                           << "ignored for " << sn_hostname  << endl;
02067 
02068         EventManager.Free(( Event* ) event );
02069         return;
02070     }
02071 
02072     EventManager.Free(( Event* ) event );
02073 
02074     // Obs: obter o mutex e necessario? ou podemos fazer como antes, 
02075     // retirando as 3 linhas de codigo a seguir e usando o if comentado?
02076     // Obtem o mutex para o acesso exclusivo a sn_isenabled.
02077     pthread_mutex_lock( &sn_mutex );
02078     // Define o estado do servidor de armazenamento como parado.
02079     isenabled = sn_isenabled;
02080     // Libera o mutex para o acesso exclusivo a sn_isenabled.
02081     pthread_mutex_unlock( &sn_mutex );
02082 
02083     // Envia uma mensagem ao roteador informando que o servidor de armazenamento
02084     // inicializou com sucesso. A mensagem somente devera ser enviada nesta 
02085     // funcao se o servidor estiver reinicializando, isto e, o servidor 
02086     // estava ativo antes de inicializarmos o servidor de gerenciamento. Em caso
02087     // contrario (quando o servidor de armazenamento inicializar depois do 
02088     // servidor de gerenciamento), esta mensagem somente devera ser enviada apos
02089     // recebermos e inicializarmos todos os discos deste servidor. 
02090     if( ( sn_sendevent ) && ( sn_restart ) )
02091     {
02092         unsigned int storageid;
02093                  
02094         storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
02095                 
02096         // O servidor de armazenamento reiniciou com sucesso. Precisamos entao
02097         // enviar o evento, ao Router, informando que este servidor esta 
02098         // novamente ativo.   
02099         EventStorageUp* event;
02100         // Depois de restabelecida a conexao com sucesso, enviamos um 
02101         // outro evento indicando que o servidor de armazenamento voltou 
02102         // a ficar disponivel.
02103         event = ( EventStorageUp * ) EventManager.New( EventTypeStorageUp );
02104         event->StorageId = storageid;
02105 
02106         #ifdef RIO_DEBUG2
02107         RioErr << "SNode::rxthread_confirmation enviando a mensagem "
02108                << "EventStorageUp ao Router do servidor de armazenamento com "
02109                << "a ID " << event->StorageId << endl;
02110         #endif       
02111 
02112         sn_mgr->m_Router->Put( ( Event * ) event );
02113 
02114         // Somente para evitar o envio de um outro evento.
02115         sn_sendevent = false;
02116     }
02117     
02118     if( !sn_restart )
02119     {
02120         // Se for a primeira vez que o servidor de armazenamento iniciou, envia 
02121         // a mensagem originalmente enviada pela funcao DiskMgr::BuildNode:
02122        
02123         // send node info request to this node to start things off...
02124         TxEvent = ( EventStorageRequest* ) EventManager.New( 
02125                                                       EventTypeStorageRequest );
02126         TxEvent->StorageRequest.Header.Type  = MSG_RSS_NODEINFO_REQ;
02127         TxEvent->StorageRequest.Header.Size  = SizeMsgRSSnodeInfoReq;
02128         TxEvent->StorageRequest.Header.Token = RSS_TOKEN_ROUTER;
02129 
02130         sn_sendqueue.Put( (Event*) TxEvent );
02131         
02132         // Se esta funcao for chamada novamente, entao estaremos tentado 
02133         // reiniciar a conexao com o servidor de armazenamento.
02134         sn_restart = true;
02135     }  
02136 }

void SNode::rxthread_diskinfo ( EventStorageReply event  )  [private]

Definition at line 2225 of file DiskMgr.cpp.

02226 {
02227     int diskid;
02228     Disk *dp;
02229     bool sendevent;
02230 
02231     #ifdef RIO_DEBUG1
02232     RioErr << "### [SNode - rxthread_diskinfo] Start" << endl;
02233     #endif
02234 
02235     if( ( event->StorageReply.DiskInfo.Size != SizeMsgRSSdiskInfo )
02236         || ( event->StorageReply.DiskInfo.Token != RSS_TOKEN_STORAGE ))
02237     {
02238         if( sn_mgr->m_log.is_open() )
02239             sn_mgr->m_log << "DiskMgr.rxthread invalid diskinfo msg "
02240                           << "ignored for " << sn_hostname  << endl;
02241 
02242         EventManager.Free((Event*) event);
02243         return;
02244     }
02245 
02246     if( sn_mgr->m_log.is_open() )
02247         sn_mgr->m_log << " DiskMgr.rxthread_diskinfo: node " << sn_hostname
02248                       << " Disk "     << event->StorageReply.DiskInfo.Disk
02249                       << " DiskSize " << event->StorageReply.DiskInfo.DiskSize
02250                       << " DiskName " << event->StorageReply.DiskInfo.DiskName;
02251                   // ----------------------------------------------------------
02252 
02253     char diskname[256] = "", diskdevice[256] = "";
02254     strcpy( diskname, event->StorageReply.DiskInfo.DiskName );
02255     for( int aux = 0 ; diskname[aux] != '\0'; aux++ )
02256         if( diskname[aux] == '/' )
02257             diskname[aux] = '.';
02258     sprintf( diskdevice, "%s/%s.%s", sn_mgr->m_MetaRoot, sn_hostname, diskname);
02259 
02260     if( sn_mgr->m_log.is_open() )
02261         sn_mgr->m_log << " DiskBitsName " << diskdevice << endl;
02262     // ------------------------------------------------------------------------
02263 
02264     if( event->StorageReply.DiskInfo.Disk > sn_maxdisks )
02265     {
02266         if( sn_mgr->m_log.is_open() )
02267             sn_mgr->m_log << "DiskMgr.rxthread Disk > maxdisks "
02268                           << "ignored for " << sn_hostname << endl;
02269         EventManager.Free((Event*) event);
02270         return;
02271     }
02272 
02273     sn_disksize[event->StorageReply.DiskInfo.Disk] =
02274                                         event->StorageReply.DiskInfo.DiskSize;
02275 
02276     diskid = sn_diskidorg + event->StorageReply.DiskInfo.Disk;
02277 
02278     if( sn_mgr->m_disks[diskid] )
02279     {
02280         if( sn_mgr->m_log.is_open() )
02281             sn_mgr->m_log << "Diskmgr.rxthread duplicate disk info for disk "
02282                           << diskid << " from " << sn_hostname
02283                           << " ignored " << endl;
02284 
02285         EventManager.Free((Event*) event);
02286         return;
02287     }
02288 
02289     // ### should locking be here or in Disk ctor/dtor??
02290     pthread_mutex_lock( &sn_mgr->m_mutex );
02291     dp = new Disk( sn_mgr, this );
02292     dp->d_diskid = diskid;
02293     dp->d_diskseq = event->StorageReply.DiskInfo.Disk;
02294     dp->d_ntot = sn_disksize[dp->d_diskseq] / sn_mgr->m_BlockSize;
02295     // ### use log file instead of cerr (?) - need to pass *ostream?
02296 
02297     // the diskbits file name will be --------------------
02298     // storagenode.devicename instead of DiskBits1 and so on
02299     // It will avoid errors if the device order is modified at disk.cfg
02300     dp->d_BitMap.Initialize( diskdevice, dp->d_ntot, &cerr );
02301     
02302     //added to support df command ----------------------------------
02303     strcpy( dp->d_DiskName, event->StorageReply.DiskInfo.DiskName );
02304     // ------------------------------------------------------------------------
02305 
02306     // ### should delay allowing alloc until load or format has been called
02307     //     for this d_BitMap?
02308     dp->d_status = Disk::STAT_READ | Disk::STAT_WRITE | Disk::STAT_ALLOC;
02309     sn_mgr->m_disks[diskid] = dp;
02310     
02311     // Verifica se todos os discos ja foram inicializados. Existe um modo melhor
02312     // do que percorrer todas as entradas para este servidor de armazenamento?
02313     sendevent = true;
02314     for( int d = 0; d < sn_disks; d++ )
02315         sendevent = sendevent && ( sn_mgr->m_disks[ d + sn_diskidorg ] != 0 );     
02316     
02317     pthread_mutex_unlock( &sn_mgr->m_mutex );
02318 
02319     // Why this was not here before ????????
02320     EventManager.Free(( Event* ) event );
02321 
02322      // Send add disk message to Router
02323     EventAddDisk* eventdisk;
02324     eventdisk = ( EventAddDisk* ) EventManager.New( EventTypeAddDisk );
02325     eventdisk->Disk = diskid;
02326     sn_mgr->m_Router->Put(( Event* )eventdisk );
02327 
02328     // Envia uma mensagem ao roteador informando que o servidor de armazenamento
02329     // inicializou com sucesso. Isso somente sera feito se este servidor 
02330     // inicializar apos o servidor de gerenciamento (isso e indicado pela 
02331     // variavel booleana sn_sendevent, que indica a necessidade de enviarmos
02332     // um evento ao Router). A mensagem somente devera ser enviada apos todas
02333     // as mensagens de todos os discos forem recebidas com sucesso. Note que,
02334     // no caso de o servidor esta inicializando pela primeira vez, apos o 
02335     // servidor de gerenciamento, precisamos carregar todos os bitmaps dos
02336     // discos.
02337     if( ( sn_sendevent ) && ( sendevent ) ) 
02338     {
02339         unsigned int storageid;
02340                  
02341         storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
02342 
02343         // Carrega os bitmaps dos discos deste servidor de armazenamento. Note
02344         // que quando o servidor de armazenamento iniciar antes do servidor de
02345         // gerenciamento, isso sera feito na funcao Start da classe DiskMgr.                
02346         sn_mgr->LoadDisksBitMaps( storageid );
02347         
02348         // Libera todos os blocos associados a este servidor de armazenamento
02349         // que acabou de inicializar, caso existam blocos a serem liberados.
02350         if( sn_readfreeblocksfile )
02351         {
02352             sn_mgr->FreeSalvedBlocks( sn_hostname );
02353             sn_readfreeblocksfile = false;
02354         }
02355         
02356         // Contabiliza os discos que passaram a ficar ativos.
02357         sn_mgr->m_NumberOfActiveDisks = sn_mgr->m_NumberOfActiveDisks + 
02358                                         sn_disks;
02359 
02360         // O servidor de armazenamento reiniciou com sucesso. Precisamos entao
02361         // enviar o evento, ao Router, informando que este servidor esta 
02362         // novamente ativo.   
02363         EventStorageUp* event;
02364         // Depois de restabelecida a conexao com sucesso, enviamos um 
02365         // outro evento indicando que o servidor de armazenamento voltou 
02366         // a ficar disponivel.
02367         event = ( EventStorageUp * ) EventManager.New( EventTypeStorageUp );
02368         event->StorageId = storageid;
02369 
02370         #ifdef RIO_DEBUG2
02371         RioErr << "SNode::rxthread_confirmation enviando a mensagem "
02372                << "EventStorageUp ao Router do servidor de armazenamento com "
02373                << "a ID " << event->StorageId << endl;
02374         #endif       
02375 
02376         sn_mgr->m_Router->Put( ( Event * ) event );
02377         
02378         // Somente para evitar o envio de um outro evento.
02379         sn_sendevent = false;
02380     }
02381 
02382     #ifdef RIO_DEBUG1
02383     RioErr << "### [SNode - rxthread_diskinfo] End" << endl;
02384     #endif
02385 }

void SNode::rxthread_diskservicetimeinfo ( EventStorageReply event  )  [private]

Definition at line 2389 of file DiskMgr.cpp.

02390 {
02391     if( ( event->StorageReply.DiskServiceTimeInfo.Size !=
02392          SizeMsgRSSdiskServiceTimeInfo ) ||
02393        ( event->StorageReply.DiskServiceTimeInfo.Token != RSS_TOKEN_STORAGE ))
02394     {
02395         if( sn_mgr->m_log.is_open() )
02396             sn_mgr->m_log << "DiskMgr.rxthread invalid diskServiceTimeinfo msg "
02397                           << "ignored for " << sn_hostname << endl;
02398         EventManager.Free((Event*) event);
02399         return;
02400     }
02401 
02402     if( event->StorageReply.DiskServiceTimeInfo.Disk > sn_maxdisks )
02403     {
02404         if( sn_mgr->m_log.is_open() )
02405             sn_mgr->m_log << "DiskMgr.rxthread Disk > maxdisks "
02406                           << "ignored for " << sn_hostname
02407                           << endl;
02408 
02409         EventManager.Free((Event*) event);
02410         return;
02411     }
02412 
02413     #ifdef RIO_DEBUG2
02414     if( sn_mgr->m_log.is_open() )
02415         sn_mgr->m_log << "DiskMgr.rxthread received servicetimeinfo of disk "
02416                       << event->StorageReply.DiskServiceTimeInfo.Disk << ":"
02417                       << event->StorageReply.DiskServiceTimeInfo.DiskId
02418                       << endl;
02419     #endif
02420 
02421 
02422     sn_mgr->m_Router->SetDiskServiceTime(
02423                 event->StorageReply.DiskServiceTimeInfo.DiskId,
02424                 event->StorageReply.DiskServiceTimeInfo.EstimatedDiskServTime,
02425                 event->StorageReply.DiskServiceTimeInfo.EstimatedServTimeAccT);
02426     EventManager.Free((Event*)event);
02427 }

void SNode::rxthread_nodeinfo ( EventStorageReply event  )  [private]

Definition at line 2168 of file DiskMgr.cpp.

02169 {
02170     EventStorageRequest *TxEvent;
02171     int i;
02172 
02173     if( ( event->StorageReply.NodeInfo.Size != SizeMsgRSSnodeInfo )
02174         || ( event->StorageReply.NodeInfo.Token != RSS_TOKEN_STORAGE ))
02175     {
02176         if( sn_mgr->m_log.is_open() )
02177             sn_mgr->m_log << "DiskMgr.rxthread invalid nodeinfo msg "
02178                           << "ignored for " << sn_hostname  << endl;
02179 
02180         EventManager.Free(( Event* ) event );
02181         return;
02182     }
02183 
02184     if( sn_disks != 0 )
02185     {
02186         if( sn_mgr->m_log.is_open() )
02187             sn_mgr->m_log << "DiskMgr.rxthread duplicate nodeinfo "
02188                           << "ignored for " << sn_hostname  << endl;
02189 
02190         EventManager.Free(( Event* ) event );
02191         return;
02192     }
02193 
02194     i = event->StorageReply.NodeInfo.nDisks;
02195 
02196     if( i > sn_maxdisks )
02197     {
02198         if( sn_mgr->m_log.is_open() )
02199             sn_mgr->m_log << "DiskMgr.rxthread nDisks " << i
02200                           << " > sn_maxdisks "  << sn_maxdisks
02201                           << " ignored for "    << sn_hostname << endl;
02202 
02203         EventManager.Free((Event*) event);
02204         return;
02205     }
02206 
02207     sn_disks = i;
02208     EventManager.Free(( Event* ) event );
02209 
02210     // send disk info request for each disk
02211     for( i = 0; i < sn_disks; i++ )
02212     {
02213         TxEvent = ( EventStorageRequest* )
02214                     EventManager.New(EventTypeStorageRequest);
02215         TxEvent->StorageRequest.DiskInfo.Type   = MSG_RSS_DISKINFO_REQ;
02216         TxEvent->StorageRequest.DiskInfo.Size   = SizeMsgRSSdiskInfoReq;
02217         TxEvent->StorageRequest.DiskInfo.Token  = RSS_TOKEN_ROUTER;
02218         TxEvent->StorageRequest.DiskInfo.Disk   = i;
02219         sn_sendqueue.Put(( Event* )TxEvent );
02220     }
02221 }

void * SNode::rxthreadep ( void *  parm  )  [static, private]

Definition at line 1747 of file DiskMgr.cpp.

01748 {
01749     ((SNode *)parm)->rxthread();
01750     return 0;
01751 }

void SNode::SendStorageNode ( EventStorageRequest event,
u16  DiskSeq 
)

Definition at line 2521 of file DiskMgr.cpp.

02522 {
02523     if( event->Header.Type != EventTypeStorageRequest )
02524     {
02525         if( sn_mgr->m_log.is_open() )
02526             sn_mgr->m_log << "SNode::SendStorageNode ERROR: Event has "
02527                           << "invalid type:" << (int) (event->Header.Type)
02528                           << endl;
02529 
02530         EventManager.Free((Event*)event);
02531         return;
02532     }
02533     u16 type = event->StorageRequest.Header.Type & 0xf;
02534 
02535     if( ( type == MSG_RSS_READ  ) || ( type == MSG_RSS_WRITE   ) ||
02536        ( type == MSG_RSS_FETCH ) || ( type == MSG_RSS_RECEIVE ))
02537     {
02538         event->StorageRequest.Request.New.Disk = DiskSeq;
02539     }
02540     //bug: "if" changed - "or" by "and"
02541     else if( ( type != MSG_RSS_SEND   ) &&
02542              ( type != MSG_RSS_FLUSH  ) &&
02543              ( type != MSG_RSS_CANCEL ) )
02544     {
02545         if( sn_mgr->m_log.is_open() )
02546             sn_mgr->m_log << "SNode::SendStorageNode ERROR: Storage request"
02547                           << " has invalid type: "
02548                           << event->StorageRequest.Header.Type << endl;
02549 
02550         EventManager.Free((Event*)event);
02551         return;
02552     }
02553     sn_sendqueue.Put((Event*) event);
02554 
02555     return;
02556 }

int SNode::txthread (  )  [private]

Definition at line 2439 of file DiskMgr.cpp.

02440 {
02441     EventStorageRequest *event = NULL;
02442     Event *auxevent;
02443     int rc;
02444     
02445     RioErr << "TXTHREADID " << syscall( SYS_gettid ) << endl;
02446 
02447     while( true )
02448     {
02449         //if( sn_termflag ) return 0;
02450         
02451         // Obtem o mutex para o acesso exclusivo a sn_isenabled.
02452         pthread_mutex_lock( &sn_mutex );
02453         // Verifica se precisamos esperar pela conexao.
02454         if( !sn_isenabled ) // Neste caso, esperamos ate que a conexao seja
02455                             // restabelecida.
02456             pthread_cond_wait( &sn_waitstorage, &sn_mutex );
02457         // Libera o mutex para o acesso exclusivo a sn_isenabled.
02458         pthread_mutex_unlock( &sn_mutex );
02459         
02460         // O Get abaixo retorna um Event
02461         //event = (EventStorageRequest*) sn_sendqueue.Get();
02462         auxevent = sn_sendqueue.Get();
02463         // Verifica se e um evento gerado para terminar a thread.
02464         if( auxevent->Type == EventTypeFinalizeThread )
02465         {
02466             EventManager.Free( (Event*) auxevent );
02467             RioErr << " DiskMgr txthread finished. " << endl;
02468             // Como recebemos um evento para terminar a thread, devemos entao
02469             // termina-la.
02470             //pthread_exit( NULL );
02471             return 0;
02472         } 
02473 
02474         event = ( EventStorageRequest* ) auxevent;
02475         #ifdef RIO_DEBUG2
02476         DataRequest& Request = (( EventDataRequest* )( event ))->Request;
02477         struct in_addr clientip;
02478         
02479         clientip.s_addr = Request.Target.IPaddress;
02480         RioErr << "Enviando ao storage agora!: "
02481                << "IP " << inet_ntoa(clientip)
02482                << " port "  << Request.Target.Port
02483                << " reqid " << Request.Reqid
02484                << " block " << Request.Block
02485                << endl;
02486         #endif
02487 
02488         /*if( sn_termflag )
02489         {
02490             EventManager.Free((Event*)event);
02491             return 0;
02492         }*/
02493 
02494         rc = sn_socket.Send( (char *) &(event->StorageRequest.Header.Type),
02495                              event->StorageRequest.Header.Size );
02496         if( rc )
02497         {
02498             if( sn_mgr->m_log.is_open() )
02499                 sn_mgr->m_log << "SNode.txthread send error " << rc << endl;
02500              RioErr << " DiskMgr txthread error. See log file. " << endl;
02501         }
02502 
02503         // Esta impressao foi comentada porque estes logs ainda nao sao 
02504         // necessarios. Se for necessario usar estes logs, esta linha do codigo 
02505         // pode ser descomentada (ja existe um script para processar estes 
02506         // logs).
02507         //#ifdef RIO_DEBUG_EMUL
02508         //struct in_addr ip;
02509         //ip.s_addr = sn_IPaddress.Host;
02510         //RioErr << "SENDSTORAGE " << ( int ) time( NULL ) << " " 
02511         //       << inet_ntoa( ip ) << " " << event->StorageRequest.Header.Size 
02512         //       << endl;
02513         //#endif
02514 
02515         EventManager.Free((Event*)event);
02516     } // fim do while( true );
02517 }

void * SNode::txthreadep ( void *  parm  )  [static, private]

Definition at line 2431 of file DiskMgr.cpp.

02432 {
02433     (( SNode * )parm )->txthread();
02434     return 0;
02435 }


Friends And Related Function Documentation

friend class DiskMgr [friend]

Definition at line 294 of file DiskMgr.h.


Field Documentation

int SNode::sn_diskidorg [private]

Definition at line 269 of file DiskMgr.h.

int SNode::sn_disks [private]

Definition at line 268 of file DiskMgr.h.

long long int SNode::sn_disksize[sn_maxdisks] [private]

Definition at line 271 of file DiskMgr.h.

char* SNode::sn_hostname [private]

Definition at line 261 of file DiskMgr.h.

Definition at line 262 of file DiskMgr.h.

bool SNode::sn_isenabled [private]

Definition at line 279 of file DiskMgr.h.

SNode* SNode::sn_link [private]

Definition at line 259 of file DiskMgr.h.

const int SNode::sn_maxdisks = MAXSTORAGEDISKSARRAY [static]

Definition at line 225 of file DiskMgr.h.

DiskMgr* SNode::sn_mgr [private]

Definition at line 260 of file DiskMgr.h.

pthread_mutex_t SNode::sn_mutex [private]

Definition at line 284 of file DiskMgr.h.

Definition at line 292 of file DiskMgr.h.

bool SNode::sn_restart [private]

Definition at line 275 of file DiskMgr.h.

pthread_t SNode::sn_rxthread [private]

Definition at line 266 of file DiskMgr.h.

bool SNode::sn_sendevent [private]

Definition at line 282 of file DiskMgr.h.

Definition at line 264 of file DiskMgr.h.

Definition at line 263 of file DiskMgr.h.

int SNode::sn_termflag [private]

Definition at line 267 of file DiskMgr.h.

pthread_t SNode::sn_txthread [private]

Definition at line 265 of file DiskMgr.h.

pthread_cond_t SNode::sn_waitstorage [private]

Definition at line 287 of file DiskMgr.h.


The documentation for this class was generated from the following files:
Generated on Wed Jul 4 16:03:38 2012 for RIO by  doxygen 1.6.3