CommunicationStream Class Reference

#include <CommMulticastStream.h>

Public Member Functions

 CommunicationStream (CRioObject *object=0, PLSessionManager *PLSession=0, void *callback=0, BufferStream *bufferStream=0, CRioMMVideo *video=0, char *LogsDirectory=NULL)
int createSocket (AddressIP *address)
int createListenSocket (unsigned short port)
void socketReceiveHandler ()
void socketReceiveMessageHandler (int sd)
void sendIPMsgToSocket (int sd, char *ip, unsigned short port, ClientMode mode, int block=-1)
void sendPIDMsgToSocket (int sd, int myPid)
void sendIPMsg (int pid, char *ip, unsigned short port, ClientMode mode, int block=-1, IPAction ipAction=KEEP_THIS, int target=-1)
void sendMoveMsg (int pid, MoveAction action, int block=-1)
void sendPIDMsg (int myPid)
 Este m�todo n�o est� sendo usado nunca! void CommunicationStream::sendBlockMsg( int pid, unsigned int block ) { ifdef RIO_DEBUG1 RioErr << "### [CommunicationStream - sendBlockMsg] Start" << endl; endif.
void sendModeMsg (int pid, ClientMode mode, int block=-1, int target=-1)
void burstRequest (int block)
string MoveAction2String (MoveAction action)
string ClientMode2String (ClientMode mode)
string IPAction2String (IPAction action)
CommunicationTablegetTable ()
void setTable (CommunicationTable *table)
CRioObjectgetObject ()
PLSessionManagergetPLSession ()

Static Public Member Functions

static void * startListenThread (void *param)
static void * startReceiveThread (void *param)

Data Fields

fd_set * readfds

Protected Member Functions

void MessageMutexLock (string msg="")
void MessageMutexUnlock (string msg="")
void TableMutexLock (string msg="")
void TableMutexUnlock (string msg="")

Protected Attributes

CommunicationTabletable
CRioObjectobject
PLSessionManagerPLSession
void * callback
BufferStreambufferStream
CRioMMVideovideo
int init
pthread_mutex_t MessageMutex
pthread_mutex_t TableMutex

Detailed Description

Definition at line 184 of file CommMulticastStream.h.


Constructor & Destructor Documentation

CommunicationStream::CommunicationStream ( CRioObject object = 0,
PLSessionManager PLSession = 0,
void *  callback = 0,
BufferStream bufferStream = 0,
CRioMMVideo video = 0,
char *  LogsDirectory = NULL 
)

Definition at line 50 of file CommMulticastStream.cpp.

00056 {
00057     #ifdef RIO_DEBUG1
00058     RioErr << "### [CommunicationStream - Constructor] Start" << endl;
00059     #endif
00060 
00061     this->object       = object;
00062     table              = new CommunicationTable();
00063     readfds            = new fd_set;
00064     this->PLSession    = PLSession;
00065     this->callback     = callback;
00066     this->bufferStream = bufferStream;
00067     this->video        = video;
00068     init               = 0;
00069 
00070     pthread_mutex_init( &MessageMutex, NULL );
00071     pthread_mutex_init( &TableMutex,  NULL );
00072 
00073     #ifdef RIO_DEBUG2
00074     gettimeofday( &initial_time, 0 );
00075     gettimeofday( &time_list, 0 );
00076     gettimeofday( &final_time, 0 );
00077 
00078     patching_number  = 0;
00079     multicast_number = 0;
00080     begin            = false;
00081 
00082     pthread_mutex_init( &log_mutex, NULL );
00083     if( LogsDirectory == NULL )
00084         sprintf( FLUXOSLOG, "%s", LOGFILE );
00085     else
00086         sprintf( FLUXOSLOG, "%s%s", LogsDirectory, LOGFILE );
00087     m_log_fluxo.open( FLUXOSLOG );
00088 
00089     calculeted_time = 0;
00090     #endif
00091 
00092     #ifdef RIO_DEBUG1
00093     RioErr << "### [CommunicationStream - Constructor] Finish" << endl;
00094     #endif
00095 }


Member Function Documentation

void CommunicationStream::burstRequest ( int  block  ) 

Definition at line 1431 of file CommMulticastStream.cpp.

01432 {
01433     #ifdef RIO_DEBUG1
01434     RioErr << "### [CommunicationStream - burstRequest] Start" << endl;
01435     #endif
01436 
01437     //Abaixo uso a vari�vel clientNextRequestBlock pois ao usar
01438     //video->GetNextRequestBlock tem-se o problema de que o membro
01439     //NextRequestBlock da RiomMVideo � atualizado a todo instante,
01440     //mas neste ponto aqui precisa-se apenas do valor deste membro
01441     //neste exato momento.
01442     RioBlock clientNextRequestBlock = video->GetNextRequestBlock();
01443 
01444     if( ( block > -1  ) &&
01445         ( clientNextRequestBlock > (RioBlock) block )
01446       )
01447     {
01448         #ifdef RIO_DEBUG2
01449         RioErr << "[CommMulticastStream] Requisitando blocos entre "
01450                << block << " e " << clientNextRequestBlock
01451                << " pois ultrapassei o lider e roubei sua lideran�a"
01452                << endl;
01453         #endif
01454 
01455         //Quando um cliente recebe a mensagem de heranca de lideranca,
01456         //pode ser que ele esteja tocando � frente de tal grupo. Neste
01457         //caso, os blocos que estiverem entre o bloco do grupo e o do
01458         //cliente devem ser solicitados. isto � feito no loop abaixo.
01459         bool readOk = true;
01460         for( RioBlock i = block + 1;
01461              ( i < clientNextRequestBlock ) && readOk;
01462              i++
01463            )
01464         {
01465             readOk = readOk && video->MulticastRead( i,
01466                                               MULTICASTTRAFFIC );
01467 
01468             #ifdef RIO_DEBUG2
01469             if( !readOk )
01470                 RioErr << "[burstRequest] Erro no pedido de bloco " << i
01471                        << endl;
01472             else
01473                 RioErr << "[burstRequest] Bloco " << i << " pedido com sucesso."
01474                        << endl;
01475             #endif
01476         }
01477 
01478         //Tao logo o cliente normalize os pedidos, ser� um lider
01479         //natural, portanto deve deixar de usar group_block, j� que
01480         //este membro indica que client � um lider for�ado que toca
01481         //um bloco diferente do que solicita para o grupo.
01482         video->group_block = -1;
01483 
01484         //se o cliente estiver fazendo patching, sair� dele neste
01485         //momento pois acaba de ultrapassar o l�der e, por isto, deve
01486         //assumir a lideran�a.
01487         if( video->isPatching() )
01488             video->Stream_Control( "plbure1" );
01489 
01490     } // fim do if( ( block > -1  ) && ( clientNextRequestBlock > (RioBlock)...
01491 
01492     #ifdef RIO_DEBUG1
01493     RioErr << "### [CommunicationStream - burstRequest] Finish" << endl;
01494     #endif
01495 }

string CommunicationStream::ClientMode2String ( ClientMode  mode  ) 

Definition at line 1606 of file CommMulticastStream.cpp.

01607 {
01608     //#ifdef RIO_DEBUG1
01609     //RioErr << "### [CommunicationStream - ClientMode2String] Start" << endl;
01610     //#endif
01611 
01612     string result;
01613 
01614     switch( mode )
01615     {
01616         case PASSIVE:     result = "PASSIVE";     break;
01617         case LEADER:      result = "LEADER";      break;
01618         case SUBLEADER:   result = "SUBLEADER";   break;
01619         case INACTIVE:    result = "INACTIVE";    break;
01620         case UNAVAILABLE: result = "UNAVAILABLE"; break;
01621         default:          result = "UNKNOWN!!!";  break;
01622     }
01623 
01624     //#ifdef RIO_DEBUG1
01625     //RioErr << "### [CommunicationStream - ClientMode2String] Finish" << endl;
01626     //#endif
01627 
01628     return result;
01629 }

int CommunicationStream::createListenSocket ( unsigned short  port  ) 

Definition at line 234 of file CommMulticastStream.cpp.

00235 {
00236     #ifdef RIO_DEBUG1
00237     RioErr << "### [CommunicationStream - createListenSocket] Start" << endl;
00238     #endif
00239 
00240     #ifdef RIO_DEBUG2
00241     RioErr << "PID da createListenSocket = " << pthread_self() << endl;
00242     #endif
00243 
00244     struct sockaddr_in servaddr;
00245     int sd;
00246     int nsd;
00247 
00248     memset( &servaddr, 0, sizeof( servaddr ) );
00249     servaddr.sin_family      = AF_INET;
00250     servaddr.sin_port        = htons( port );
00251     servaddr.sin_addr.s_addr = htonl( INADDR_ANY );
00252 
00253     if( ( sd = socket( PF_INET, SOCK_STREAM, 0 ) ) == -1 )
00254     {
00255         Rioperror("CommunicationStream:createListenSocket");
00256 
00257         #ifdef RIO_DEBUG1
00258         RioErr << "### [CommunicationStream - createListenSocket] Finish 1" << endl;
00259         #endif
00260 
00261         return 0;
00262     }
00263 
00264     int aux = 1;
00265     if( setsockopt ( sd, SOL_SOCKET, SO_REUSEADDR,
00266                   (char*)&aux, sizeof(aux)) == -1 )
00267     {
00268         Rioperror( "CommunicationStream:setsockopt");
00269     }
00270 
00271 
00272     if( bind( sd, ( struct sockaddr * ) &servaddr, sizeof( servaddr ) ) == -1 )
00273     {
00274         Rioperror("CommunicationStream:bindSocket");
00275 
00276         #ifdef RIO_DEBUG1
00277         RioErr << "### [CommunicationStream - createListenSocket] Finish 2" << endl;
00278         #endif
00279 
00280         return 0;
00281     }
00282 
00283     //listen( sd, 10 );
00284     int rc;
00285     rc = listen( sd, SOMAXCONN );
00286     if( rc != 0 )
00287         RioErr << "Erro listen commulticast " << strerror( errno ) << endl;
00288 
00289     FD_ZERO( readfds );
00290     while( 1 )
00291     {
00292         if( ( nsd = accept( sd, NULL, NULL ) ) != -1 )
00293         {
00294             TableMutexLock( "crlissock" );
00295             getTable()->insertNullPid( nsd );
00296             TableMutexUnlock(" crlissock" );
00297             //FD_SET( nsd, readfds );
00298         }
00299         else
00300         {
00301             RioErr << "Erro no accept: " << strerror( errno ) << endl;
00302         }
00303     }
00304 }

int CommunicationStream::createSocket ( AddressIP address  ) 

Definition at line 169 of file CommMulticastStream.cpp.

00170 {
00171     #ifdef RIO_DEBUG1
00172     RioErr << "### [CommunicationStream - createSocket] Start" << endl;
00173     #endif
00174 
00175     if( address == NULL )
00176     {
00177         #ifdef RIO_DEBUG1
00178         RioErr << "### [CommunicationStream - createSocket] Finish 1" << endl;
00179         #endif
00180 
00181         return 0;
00182     }
00183 
00184     int sd;
00185     struct sockaddr_in servaddr;
00186 
00187     memset( &servaddr, 0, sizeof( servaddr ) );
00188     servaddr.sin_family      = AF_INET;
00189     servaddr.sin_port        = address->getPort();
00190     servaddr.sin_addr.s_addr = address->getAddress();
00191 
00192     if( ( sd = socket( PF_INET,  SOCK_STREAM, 0 ) ) == -1 )
00193     {
00194         Rioperror("CommunicationStream:createSocket");
00195 
00196         #ifdef RIO_DEBUG1
00197         RioErr << "### [CommunicationStream - createSocket] Finish 2" << endl;
00198         #endif
00199 
00200         return 0;
00201     }
00202 
00203     // Faz com que o socket seja exclusivo do processo, e nao seja passado 
00204     // para um outro processo criado pelo processo que abriu o socket.
00205     if( fcntl( sd, F_SETFD, FD_CLOEXEC ) < 0 )
00206     {
00207         Rioperror("CommunicationStream:fcntl");
00208 
00209         #ifdef RIO_DEBUG1
00210         RioErr << "### [CommunicationStream - createSocket] Finish 3" << endl;
00211         #endif
00212 
00213         return 0;
00214     }
00215 
00216     if( connect( sd, ( struct sockaddr * ) &servaddr, sizeof(servaddr) ) == -1 )
00217     {
00218         Rioperror("CommunicationStream:connectSocket");
00219 
00220         #ifdef RIO_DEBUG1
00221         RioErr << "### [CommunicationStream - createSocket] Finish 4" << endl;
00222         #endif
00223 
00224         return 0;
00225     }
00226 
00227     #ifdef RIO_DEBUG1
00228     RioErr << "### [CommunicationStream - createSocket] Finish 5" << endl;
00229     #endif
00230 
00231     return sd;
00232 }

CRioObject * CommunicationStream::getObject (  ) 

Definition at line 106 of file CommMulticastStream.cpp.

00107 {
00108     #ifdef RIO_DEBUG1
00109     RioErr << "### [CommunicationStream - getObject] Single" << endl;
00110     #endif
00111 
00112     return object;
00113 }

PLSessionManager * CommunicationStream::getPLSession (  ) 

Definition at line 97 of file CommMulticastStream.cpp.

00098 {
00099     #ifdef RIO_DEBUG1
00100     RioErr << "### [CommunicationStream - getPLSession] Single" << endl;
00101     #endif
00102 
00103     return PLSession;
00104 }

CommunicationTable * CommunicationStream::getTable (  ) 

Definition at line 1422 of file CommMulticastStream.cpp.

01423 {
01424     //#ifdef RIO_DEBUG1
01425     //RioErr << "### [CommunicationStream - getTable] Single" << endl;
01426     //#endif
01427 
01428     return table;
01429 }

string CommunicationStream::IPAction2String ( IPAction  action  ) 

Definition at line 1631 of file CommMulticastStream.cpp.

01632 {
01633     //#ifdef RIO_DEBUG1
01634     //RioErr << "### [CommunicationStream - IPAction2String] Start" << endl;
01635     //#endif
01636 
01637     string result;
01638 
01639     switch( action )
01640     {
01641         case JOIN_THIS:  result = "JOIN_THIS";  break;
01642         case LEAVE_THIS: result = "LEAVE_THIS"; break;
01643         case LEAVE_ALL:  result = "LEAVE_ALL";  break;
01644         case KEEP_THIS:  result = "KEEP_THIS";  break;
01645         default:         result = "UNKNOWN!!!"; break;
01646     }
01647 
01648     //#ifdef RIO_DEBUG1
01649     //RioErr << "### [CommunicationStream - IPAction2String] Finish" << endl;
01650     //#endif
01651 
01652     return result;
01653 }

void CommunicationStream::MessageMutexLock ( string  msg = ""  )  [protected]

Definition at line 1497 of file CommMulticastStream.cpp.

01498 {
01499     #ifdef RIO_DEBUG1
01500     RioErr << "### [CommunicationStream - MessageMutexLock] Start" << endl;
01501     #endif
01502 
01503     #ifdef RIO_DEBUG2
01504     if( msg.length() )
01505         RioErr << "[MessageMutexLock] " << msg << "... " << endl;
01506     #endif
01507 
01508     pthread_mutex_lock( &MessageMutex );
01509 
01510     #ifdef RIO_DEBUG2
01511     if( msg.length() )
01512         RioErr << "..." << msg << " OK!" << endl;
01513     #endif
01514 
01515     #ifdef RIO_DEBUG1
01516     RioErr << "### [CommunicationStream - MessageMutexLock] Finish" << endl;
01517     #endif
01518 }

void CommunicationStream::MessageMutexUnlock ( string  msg = ""  )  [protected]

Definition at line 1520 of file CommMulticastStream.cpp.

01521 {
01522     #ifdef RIO_DEBUG1
01523     RioErr << "### [CommunicationStream - MessageMutexUnlock] Start" << endl;
01524     #endif
01525 
01526     #ifdef RIO_DEBUG2
01527     if( msg.length() )
01528         RioErr << "[MessageMutexUnlock] " << msg << endl;
01529     #endif
01530 
01531     pthread_mutex_unlock( &MessageMutex );
01532 
01533     #ifdef RIO_DEBUG1
01534     RioErr << "### [CommunicationStream - MessageMutexUnlock] Finish" << endl;
01535     #endif
01536 }

string CommunicationStream::MoveAction2String ( MoveAction  action  ) 

Definition at line 1579 of file CommMulticastStream.cpp.

01580 {
01581     #ifdef RIO_DEBUG1
01582     RioErr << "### [CommunicationStream - MoveAction2String] Start" << endl;
01583     #endif
01584 
01585     string result;
01586 
01587     switch( action )
01588     {
01589         case ACTION_PLAY:        result = "PLAY";        break;
01590         case ACTION_STOP:        result = "STOP";        break;
01591         case ACTION_PAUSE:       result = "PAUSE";       break;
01592         case ACTION_FORWARD:     result = "FORWARD";     break;
01593         case ACTION_REWIND:      result = "REWIND";      break;
01594         case ACTION_READBUFFER:  result = "READBUFFER";  break;
01595         case ACTION_REACHLEADER: result = "REACHLEADER"; break;
01596         default:                 result = "UNKNOWN!!!";  break;
01597     }
01598 
01599     #ifdef RIO_DEBUG1
01600     RioErr << "### [CommunicationStream - MoveAction2String] Finish" << endl;
01601     #endif
01602 
01603     return result;
01604 }

void CommunicationStream::sendIPMsg ( int  pid,
char *  ip,
unsigned short  port,
ClientMode  mode,
int  block = -1,
IPAction  ipAction = KEEP_THIS,
int  target = -1 
)

Definition at line 1231 of file CommMulticastStream.cpp.

01235 {
01236     #ifdef RIO_DEBUG1
01237     RioErr << "### [CommunicationStream - sendIPMsg] Start" << endl;
01238     #endif
01239 
01240     #ifdef RIO_DEBUG2
01241     RioErr << "[sendIPMsg] Sending message to client " << pid << ", address "
01242            << address << ", port " << port << ", mode "
01243            << ClientMode2String( mode ) << ", block " << block << ", IPaction "
01244            << IPAction2String( ipAction ) << ", target " << target << "." << endl;
01245     #endif
01246 
01247     int                result;
01248     int                escritos;
01249     msgIP              msg;
01250     CommunicationItem *current  = NULL;
01251     CommunicationItem *previous = NULL;
01252 
01253     memset( &msg, 0, sizeof( msg ) );
01254     msg.code = MSGCODE_IP;
01255     strcpy( msg.address, address );
01256     msg.port      = port;
01257     msg.mode      = mode;
01258     msg.block     = block;
01259     msg.target    = target;
01260     msg.ipAction  = ipAction;
01261 
01262     result = getTable()->searchPid( pid, &current, &previous );
01263 
01264     if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01265     {
01266         escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01267 
01268         if( escritos != sizeof( msg ) )
01269         {
01270             RioErr << "CommunicationStream::sendIPMsg erro ao escrever "
01271                    << "o valor de msg: escritos " << escritos << " bytes " 
01272                    << "de um total de " << sizeof( msg ) << " bytes." << endl;
01273         } 
01274         else if( escritos == -1 ) 
01275         {
01276             RioErr << "CommunicationStream::sendIPMsg erro " << errno 
01277                    << "(" << strerror( errno ) << ") ao escrever o valor de "
01278                    << "msg" << endl;
01279         }
01280 
01281     }
01282 
01283     #ifdef RIO_DEBUG1
01284     RioErr << "### [CommunicationStream - sendIPMsg] Finish" << endl;
01285     #endif
01286 }

void CommunicationStream::sendIPMsgToSocket ( int  sd,
char *  ip,
unsigned short  port,
ClientMode  mode,
int  block = -1 
)

Definition at line 1047 of file CommMulticastStream.cpp.

01050 {
01051     #ifdef RIO_DEBUG1
01052     RioErr << "### [CommunicationStream - sendIPMsgToSocket] Start" << endl;
01053     #endif
01054 
01055     int escritos;
01056     msgIP msg;
01057 
01058     msg.code  = MSGCODE_IP;
01059     strcpy( msg.address, address );
01060     msg.port  = port;
01061     msg.mode  = mode;
01062     msg.block = block;
01063 
01064     escritos = write( sd, &msg, sizeof( msgIP ) );
01065     
01066     if( escritos != sizeof( msgIP ) )
01067     {
01068         RioErr << "CommunicationStream::sendIPMsgToSocket erro ao escrever o "
01069                << "valor de msgIP: escritos "<< escritos << " bytes de um "
01070                << "total de " << sizeof( msgIP ) << " bytes." << endl;
01071     } 
01072     else if( escritos == -1 ) 
01073     {
01074         RioErr << "CommunicationStream::sendIPMsgToSocket erro " << errno 
01075                << "(" << strerror( errno ) << ") ao escrever o valor de msgIP"
01076                << endl;
01077     }
01078 
01079     #ifdef RIO_DEBUG1
01080     RioErr << "### [CommunicationStream - sendIPMsgToSocket] Finish" << endl;
01081     #endif
01082 }

void CommunicationStream::sendModeMsg ( int  pid,
ClientMode  mode,
int  block = -1,
int  target = -1 
)

Definition at line 1181 of file CommMulticastStream.cpp.

01183 {
01184     #ifdef RIO_DEBUG1
01185     RioErr << "### [CommunicationStream - sendModeMsg] Start" << endl;
01186     #endif
01187 
01188     #ifdef RIO_DEBUG2
01189     RioErr << "[sendModeMsg] Sending mode message to client " << pid
01190            << ", mode " << ClientMode2String( mode ) << ", block " << block
01191            << ", target " << target << "." << endl;
01192     #endif
01193 
01194     int                result;
01195     int                escritos;
01196     msgMode            msg;
01197     CommunicationItem *current  = NULL;
01198     CommunicationItem *previous = NULL;
01199 
01200     msg.code   = MSGCODE_MODE;
01201     msg.mode   = mode;
01202     msg.block  = block;
01203     msg.target = target;
01204 
01205     result     = getTable()->searchPid( pid, &current, &previous );
01206 
01207     if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01208     {
01209         escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01210 
01211         if( escritos != sizeof( msg ) )
01212         {
01213             RioErr << "CommunicationStream::sendModeMsg erro ao escrever "
01214                    << "o valor de msg: escritos "<< escritos << " bytes " 
01215                    << "de um total de " << sizeof( msg ) << " bytes." << endl;
01216         } 
01217         else if( escritos == -1 ) 
01218         {
01219             RioErr << "CommunicationStream::sendModeMsg erro " << errno 
01220                    << "(" << strerror( errno ) << ") ao escrever o valor de "
01221                    << "msg" << endl;
01222         }
01223 
01224     }
01225 
01226     #ifdef RIO_DEBUG1
01227     RioErr << "### [CommunicationStream - sendModeMsg] Finish" << endl;
01228     #endif
01229 }

void CommunicationStream::sendMoveMsg ( int  pid,
MoveAction  action,
int  block = -1 
)

Definition at line 1288 of file CommMulticastStream.cpp.

01289 {
01290     #ifdef RIO_DEBUG1
01291     RioErr << "### [CommunicationStream - sendMoveMsg] Start" << endl;
01292     #endif
01293 
01294     #ifdef RIO_DEBUG2
01295     RioErr << "[sendMoveMsg] Sending move message to PL: action "
01296            << MoveAction2String( action ) << ", block " << block
01297            << "." << endl;
01298     #endif
01299 
01300     int                result;
01301     int                escritos;
01302     msgMove            msg;
01303     CommunicationItem *current  = NULL;
01304     CommunicationItem *previous = NULL;
01305 
01306     msg.code   = MSGCODE_MOVE;
01307     msg.action = action;
01308     msg.block  = block;
01309 
01310     result     = getTable()->searchPid( pid, &current, &previous );
01311 
01312     if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01313     {
01314         escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01315 
01316         if( escritos != sizeof( msg ) )
01317         {
01318             RioErr << "CommunicationStream::sendMoveMsg erro ao escrever "
01319                    << "o valor de msg: escritos "<< escritos << " bytes " 
01320                    << "de um total de " << sizeof( msg ) << " bytes." << endl;
01321         } 
01322         else if( escritos == -1 ) 
01323         {
01324             RioErr << "CommunicationStream::sendMoveMsg erro " << errno 
01325                    << "(" << strerror( errno ) << ") ao escrever o valor de "
01326                    << "msg" << endl;
01327         }
01328 
01329     }
01330 
01331     #ifdef RIO_DEBUG1
01332     RioErr << "### [CommunicationStream - sendMoveMsg] Finish" << endl;
01333     #endif
01334 }

void CommunicationStream::sendPIDMsg ( int  myPid  ) 

Este m�todo n�o est� sendo usado nunca! void CommunicationStream::sendBlockMsg( int pid, unsigned int block ) { ifdef RIO_DEBUG1 RioErr << "### [CommunicationStream - sendBlockMsg] Start" << endl; endif.

ifdef RIO_DEBUG2 RioErr << "[sendBlockMsg] Sending message to client " << pid << block << "." << endl; endif

int result; msgBlock msg; CommunicationItem *current = NULL; CommunicationItem *previous = NULL;

msg.code = MSGCODE_BLOCK; msg.block = block;

result = getTable()->searchPid( pid, &current, &previous );

if( result == RESULT_COMMUNICATION_SEARCH_FOUND ) { write( current->getSocket(), &msg, sizeof( msg ) ); }

ifdef RIO_DEBUG1 RioErr << "### [CommunicationStream - sendBlockMsg] Finish" << endl; endif }

Definition at line 1369 of file CommMulticastStream.cpp.

01370 {
01371     #ifdef RIO_DEBUG1
01372     RioErr << "### [CommunicationStream - sendPIDMsg] Start" << endl;
01373     #endif
01374 
01375     #ifdef RIO_DEBUG2
01376     RioErr << "[sendPIDMsg] Sending my PID to PL: " << myPid << "." << endl;
01377     #endif
01378 
01379     int                result;
01380     int                escritos;
01381     msgPID             msg;
01382     CommunicationItem *current  = NULL;
01383     CommunicationItem *previous = NULL;
01384 
01385     msg.code = MSGCODE_PID;
01386     msg.pid  = myPid;
01387 
01388     result = getTable()->searchPid( myPid, &current, &previous );
01389 
01390     if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01391     {
01392         escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01393 
01394         if( escritos != sizeof( msg ) )
01395         {
01396             RioErr << "CommunicationStream::sendPIDMsg erro ao escrever "
01397                    << "o valor de msg: escritos "<< escritos << " bytes " 
01398                    << "de um total de " << sizeof( msg ) << " bytes." << endl;
01399         } 
01400         else if( escritos == -1 ) 
01401         {
01402             RioErr << "CommunicationStream::sendPIDMsg erro " << errno 
01403                    << "(" << strerror( errno ) << ") ao escrever o valor de "
01404                    << "msg" << endl;
01405         }
01406 
01407     }
01408 
01409     #ifdef RIO_DEBUG1
01410     RioErr << "### [CommunicationStream - sendPIDMsg] Finish" << endl;
01411     #endif
01412 }

void CommunicationStream::sendPIDMsgToSocket ( int  sd,
int  myPid 
)

Definition at line 1104 of file CommMulticastStream.cpp.

01105 {
01106     #ifdef RIO_DEBUG1
01107     RioErr << "### [CommunicationStream - sendPIDMsgToSocket] Start" << endl;
01108     #endif
01109 
01110     int escritos;
01111     msgPID msg;
01112     msg.code = MSGCODE_PID;
01113     msg.pid  = myPid;
01114 
01115     escritos = write( sd, &msg, sizeof( msgPID ) );
01116 
01117     if( escritos != sizeof( msgPID ) )
01118     {
01119         RioErr << "CommunicationStream::sendPIDMsgToSocket erro ao escrever o "
01120                << "valor de msgPID: escritos "<< escritos << " bytes de um "
01121                << "total de " << sizeof( msgPID ) << " bytes." << endl;
01122     } 
01123     else if( escritos == -1 ) 
01124     {
01125         RioErr << "CommunicationStream::sendPIDMsgToSocket erro " << errno 
01126                << "(" << strerror( errno ) << ") ao escrever o valor de msgPID"
01127                << endl;
01128     }
01129 
01130     #ifdef RIO_DEBUG1
01131     RioErr << "### [CommunicationStream - sendPIDMsgToSocket] Finish" << endl;
01132     #endif
01133 }

void CommunicationStream::setTable ( CommunicationTable table  ) 

Definition at line 1414 of file CommMulticastStream.cpp.

01415 {
01416     //#ifdef RIO_DEBUG1
01417     //RioErr << "### [CommunicationStream - setTable] Single" << endl;
01418     //#endif
01419     this->table = table;
01420 }

void CommunicationStream::socketReceiveHandler (  ) 

Definition at line 306 of file CommMulticastStream.cpp.

00307 {
00308     #ifdef RIO_DEBUG1
00309     RioErr << "### [CommunicationStream - socketReceiveHandler] Start" << endl;
00310     #endif
00311 
00312     #ifdef RIO_DEBUG2
00313     RioErr << "PID da socketReceiveHandler = " << pthread_self() << endl;
00314     #endif
00315 
00316     struct timeval rTimeout;
00317 
00318     int ns;
00319     int maxSock;
00320 
00321     fd_set auxReadSet;
00322     CommunicationItem *aux;
00323 
00324     while( 1 )
00325     {
00326         TableMutexLock();
00327 
00328         if( !getTable()->getLastItem() )
00329             maxSock = 0;
00330         else
00331             maxSock = getTable()->getLastItem()->getSocket();
00332 
00333         TableMutexUnlock();
00334 
00335         if( maxSock == 0 )
00336         {
00337             usleep( 100 );
00338             continue;
00339         }
00340 
00341         rTimeout.tv_sec = 1;
00342         rTimeout.tv_usec = 0;
00343 
00344         FD_ZERO( &auxReadSet );
00345 
00346         TableMutexLock( "sorcvhand2" );
00347         aux = getTable()->getFirstItem();
00348         while( aux )
00349         {
00350             FD_SET( aux->getSocket(), &auxReadSet );
00351             aux = aux->getNext();
00352         }
00353 
00354         TableMutexUnlock( "sorcvhand2" );
00355 
00356         ns = select( maxSock + 1, &auxReadSet, NULL, (fd_set *)0, &rTimeout );
00357 
00358         if( ns > 0 )
00359         {
00360             TableMutexLock( "sorcvhand3" );
00361             aux = getTable()->getFirstItem();
00362 
00363             while( aux )
00364             {
00365                 //H� a necessidade de se armazenar aux->getNext() neste ponto
00366                 //pois esta informa��o ser� �til ap�s a chamada a
00367                 //socketReceiveMessageHandler(), que dependendo da atividade a
00368                 //ser feita deletar� aux da tabela.
00369                 CommunicationItem *nextaux = aux->getNext();
00370 
00371                 if( ( aux->getSocket() > 0 ) &&
00372                     ( FD_ISSET( aux->getSocket(), &auxReadSet ) )
00373                   )
00374                 {
00375                     socketReceiveMessageHandler( aux->getSocket() );
00376                 }
00377                 aux = nextaux;
00378             }
00379 
00380             TableMutexUnlock( "sorcvhand3" );
00381         }
00382         else if( ns!= 0 )
00383         {
00384             Rioperror( "socketReceiveHandler:select" );
00385         }
00386     } // fim do while( 1 )
00387 
00388     #ifdef RIO_DEBUG1
00389     RioErr << "### [CommunicationStream - socketReceiveHandler] Finish" << endl;
00390     #endif
00391 }

void CommunicationStream::socketReceiveMessageHandler ( int  sd  ) 

Definition at line 393 of file CommMulticastStream.cpp.

00394 {
00395     #ifdef RIO_DEBUG1
00396     RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Start" << endl;
00397     #endif
00398 
00399     static unsigned int  cont           = 0;
00400     unsigned char        readByte;
00401     unsigned char        readSeq[ MAXLEN_MSGSTRUCT ];
00402     InfoClient          *NewMember      = NULL;
00403     InfoClient          *member         = NULL;
00404     CommunicationItem   *current        = NULL;
00405     CommunicationItem   *previous       = NULL;
00406     int                  result;
00407     CRioObject          *obj;
00408 
00409     int            i              = 0;
00410     int            size           = 0;
00411     unsigned char *code           = 0;
00412 
00413     #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00414     int            member_mode    = UNAVAILABLE;
00415     int            leave          = 0;
00416     InfoClient    *member_next    = 0;
00417     #endif
00418 
00419     /* O loop abaixo est� lendo um byte por vez. Segundo o man do select (man
00420      * select_tut) � extremamente desrecomendada esta pr�tica. Reimplementar o
00421      * loop abaixo para fazer uma leitura �nica, se poss�vel.
00422      * */    
00423     while( read( sd, &readByte, 1 ) )
00424     {
00425         readSeq[ i ] = readByte;
00426 
00427         // We might get the message type to know when it end
00428         if( i == 0 )
00429         {
00430             code = readSeq;
00431 
00432             switch( *code )
00433             {
00434                 case MSGCODE_IP:
00435                     size = sizeof( msgIP );
00436                     break;
00437                 case MSGCODE_BLOCK:
00438                     size = sizeof( msgBlock );
00439                     break;
00440                 case MSGCODE_PID:
00441                     size = sizeof( msgPID );
00442                     break;
00443                 case MSGCODE_MOVE:
00444                     size = sizeof( msgMove );
00445                     break;
00446                 case MSGCODE_MODE:
00447                     size = sizeof( msgMode );
00448                     break;
00449 
00450                 #ifdef RIO_EMUL
00451                 case MSGCODE_FLUXO:
00452                     size = sizeof( msgFluxo );
00453                     break;
00454                 #endif
00455 
00456                 default:
00457                     #ifdef RIO_DEBUG2
00458                     RioErr << "[CommMulticast] Erro: Mensagem intrat�vel! ("
00459                            << *code << ")" << endl;
00460                     #endif
00461 
00462                     #ifdef RIO_DEBUG1
00463                     RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 1" << endl;
00464                     #endif
00465 
00466                     return;
00467                     break;
00468             } // fim do switch( *code )
00469         } // fim do if( i == 0 )
00470 
00471         i++;
00472         if( i >= size )
00473             break;
00474     } // fim do while( read( sd, &readByte, 1 ) )
00475 
00476     if( size == 0 )
00477     {
00478         // Connection End, now we must remove socket from table
00479         RioErr << "[CommMulticast]: Removing client socket " << endl;
00480 
00481         FD_CLR( sd, readfds );
00482         getTable()->removeSocket( sd );
00483 
00484         close( sd );
00485 
00486         #ifdef RIO_DEBUG1
00487         RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 2" << endl;
00488         #endif
00489 
00490         return;
00491     }
00492 
00493     if( i < size )
00494     {
00495         #ifdef RIO_DEBUG1
00496         RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 3" << endl;
00497         #endif
00498 
00499         #ifdef RIO_DEBUG2
00500         RioErr << "[CommunicationStream - socketReceiveMessageHandler] Nao foi "
00501                << "possivel ler a mensagem por completo." << endl;
00502         #endif
00503 
00504         return;
00505     }
00506 
00507     MessageMutexLock( "SoRecMe1" );
00508 
00509     switch( *code )
00510     {
00511         // Message received by client
00512         case MSGCODE_IP:
00513         {
00514             msgIP ipData;
00515             memcpy( &ipData, readSeq, sizeof( msgIP ) );
00516 
00517             RioErr << "[CommMulticast]: MSGCODE_IP Received" << endl;
00518             RioErr << "[CommMulticast]: IP Address = " << ipData.address << endl;
00519             RioErr << "[CommMulticast]: IP Port    = " << ipData.port << endl;
00520             RioErr << "[CommMulticast]: IP Mode    = "
00521                    << ClientMode2String( ipData.mode ) << endl;
00522             RioErr << "[CommMulticast]: IP BLOCK   = " << ipData.block << endl;
00523             RioErr << "[CommMulticast]: IP Action  = "
00524                    << IPAction2String( ipData.ipAction ) << endl;
00525             RioErr << "[CommMulticast]: IP Target  = " << ipData.target << endl;
00526 
00527             obj = getObject();
00528 
00529             if( !obj )
00530             {
00531                 RioErr << "Not found" << endl;
00532                 MessageMutexUnlock( "SoRecMe2" );
00533 
00534                 #ifdef RIO_DEBUG1
00535                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 4" << endl;
00536                 #endif
00537 
00538                 return;
00539             }
00540             else
00541             {
00542                 RioErr << "OK (" << cont++ << ")" << endl;
00543             }
00544 
00545             result = getTable()->searchSocket( sd, &current, &previous );
00546             if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00547             {
00548                 RioErr << "[CommMulticast]: Error result communication " << endl;
00549                 MessageMutexUnlock( "SoRecMe3" );
00550 
00551                 #ifdef RIO_DEBUG1
00552                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 5" << endl;
00553                 #endif
00554 
00555                 return;
00556             }
00557 
00558             //current � um CommunicationItem
00559             current->setMode( ipData.mode );
00560 
00561             if( ( ipData.mode == LEADER ) || ( ipData.mode == SUBLEADER) )
00562                 video->first = true;
00563             else
00564                 video->first = false;
00565 
00566             switch( ipData.ipAction )
00567             {
00568                 case LEAVE_THIS:
00569                 {
00570                     RioErr << "[CommMulticast]: Leave this" << endl;
00571                     obj->LeaveGroup( ipData.port );
00572                     break;
00573                 }
00574 
00575                 case LEAVE_ALL:
00576                 {
00577                     RioErr << "[CommMulticast]: Leave all" << endl;
00578                     obj->LeaveAllGroups();
00579                     break;
00580                 }
00581 
00582                 case JOIN_THIS:
00583                 {
00584                     RioErr << "[CommMulticast]: Join This" << endl;
00585                     int enable_join = 1;
00586                     obj->SetMulticastSocket( ipData.port, ipData.address,
00587                                              callback, bufferStream,
00588                                              enable_join );
00589                     break;
00590                 }
00591 
00592                 case KEEP_THIS:
00593                 {
00594                     RioErr << "[CommMulticast]: Keep This" << endl;
00595                     break;
00596                 }
00597 
00598                 default:
00599                     RioErr << "[CommMulticast]: Invalid mode " 
00600                            << ipData.ipAction << "!!" << endl;
00601                     break;
00602             }
00603 
00604             if( ipData.block > -1 )
00605             {
00606                 //Delta Before
00607                 RioErr << "[CommMulticast]: Delta before: "
00608                        << ipData.block << endl;
00609 
00610                 video->SetNextRequestBlock( ipData.block );
00611             }
00612 
00613             if( ipData.target > -1 )
00614             {
00615                 //Setting target
00616                 RioErr << "[CommMulticast]: New client target: "
00617                        << ipData.target << endl;
00618 
00619                 video->SetTarget( ipData.target );
00620 
00621                 //Nota: Todo submembro e todo l�der � considerado est�vel,
00622                 //isto �, n�o est� em estado de patching ( no caso do submembro,
00623                 //esta � uma condi��o para fazer o merging entre dois grupos),
00624                 //logo n�o h� a necessidade de se enviar um target infinito
00625                 //(INT_MAX) para estes tipos de clientes.
00626 
00627                 //Cliente entrando em patching 
00628                 if( ( ipData.target != INT_MAX ) && !video->isPatching() )
00629                     video->Stream_Control( "plsoreme1" );
00630                 //Cliente saindo do patching 
00631                 else if( ( ipData.target == INT_MAX ) && video->isPatching() )
00632                     video->Stream_Control( "plsoreme2" );
00633             }
00634 
00635             //Se cliente recebeu um LEAVE_ALL que n�o indique que ele deve ir
00636             //para o modo INACTIVE, ent�o ele receber� uma nova mensagem do tipo
00637             //JOIN_THIS. Neste caso, apenas esta segunda mensagem dever� liberar
00638             //o mutex do cliente, chamando a MulticastReleaseCondition().
00639             if( !(  ( ipData.ipAction == LEAVE_ALL )    &&
00640                     ( ipData.mode != INACTIVE )
00641                 )
00642               )
00643             {
00644                 video->MulticastMutexLock( "comtcs1" );
00645                 video->MulticastReleaseCondition( "comtcs1" );
00646                 video->MulticastMutexUnlock( "comtcs1" );
00647             }
00648 
00649             RioErr << "[CommMulticast]: BroadCast End" << endl;
00650             break;
00651         } // fim do case MSGCODE_IP:
00652 
00653         // Message received by PI
00654         case MSGCODE_MOVE:
00655         {
00656             msgMove moveData;
00657             memcpy( &moveData, readSeq, sizeof( msgMove ) );
00658 
00659             RioErr << "[CommMulticast] MSGCODE_MOVE Received..." << endl;
00660 
00661             if( getPLSession() == NULL )
00662             {
00663                 MessageMutexUnlock( "SoRecMe4" );
00664 
00665                 #ifdef RIO_DEBUG1
00666                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 6" << endl;
00667                 #endif
00668 
00669                 return;
00670             }
00671 
00672             result = getTable()->searchSocket( sd, &current, &previous );
00673             if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00674             {
00675                 MessageMutexUnlock( "SoRecMe5" );
00676 
00677                 #ifdef RIO_DEBUG1
00678                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 7" << endl;
00679                 #endif
00680 
00681                 return;
00682             }
00683 
00684             getPLSession()->MoveMutexLock( "comtcs2" );
00685             member = getPLSession()->FindClient( current->getPid() );
00686             getPLSession()->MoveMutexUnlock( "comtcs2" );
00687 
00688             if( !member )
00689             {
00690                 RioErr << "[CommMulticast]: member " << current->getPid()
00691                        << " nao encontrado. Saindo da "
00692                        << "socketReceiveMessageHandler..." << endl;
00693 
00694                 MessageMutexUnlock( "SoRecMe6" );
00695 
00696                 #ifdef RIO_DEBUG1
00697                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 8" << endl;
00698                 #endif
00699 
00700                 return;
00701             }
00702 
00703             #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00704             member_mode    = member->mode;
00705 
00706             if( ( moveData.action == ACTION_PAUSE ) ||
00707                 ( moveData.action == ACTION_STOP  ) ||
00708                 ( moveData.block < 0 )
00709               )
00710             {
00711                 leave          = 1;
00712             }
00713             #endif
00714 
00715             RioErr << "[CommMulticast]: MOVE CLIENT" << endl;
00716             RioErr << "[CommMulticast]: PID " << current->getPid() << endl;
00717             RioErr << "[CommMulticast]: GROUP " << member->multicast_port
00718                    << endl;
00719             RioErr << "[CommMulticast]: action = "
00720                    << MoveAction2String( moveData.action ) << endl;
00721             RioErr << "[CommMulticast]: block = " << moveData.block
00722                    << endl;
00723             RioErr << "[CommMulticast]: mode = "
00724                    << ClientMode2String( member->mode ) << endl;
00725 
00726             result = getPLSession()->AllocClient( member->PIDClientThread,
00727                                                   &moveData );
00728 
00729             if( result == PL_ERROR )
00730             {
00731                 MessageMutexUnlock( "SoRecMe7" );
00732 
00733                 #ifdef RIO_DEBUG1
00734                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 9" << endl;
00735                 #endif
00736 
00737                 return;
00738             }
00739 
00740             #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00741             //se member era lider, se moveu e nao havia quem herdasse
00742             //a lideranca em seu lugar, temos um fluxo multicast a menos.
00743             if( ( member_mode == LEADER ) &&
00744                 ( member->mode != LEADER ) &&
00745                 ( member_next == NULL )
00746               )
00747             {
00748                 pthread_mutex_lock( &log_mutex );
00749                 multicast_number--;
00750                 pthread_mutex_unlock( &log_mutex );
00751             }
00752 
00753             gettimeofday( &time_list, 0 );
00754 
00755             final_time.tv_sec  = time_list.tv_sec - initial_time.tv_sec;
00756             final_time.tv_usec = time_list.tv_usec - initial_time.tv_usec;
00757 
00758             if( final_time.tv_usec < 0 )
00759             {
00760               final_time.tv_sec  -= 1;
00761               final_time.tv_usec += 1000000;
00762             }
00763             calculeted_time = ( unsigned int )(final_time.tv_sec * 1000.0 +
00764                                                 final_time.tv_usec/1000.0);
00765             calculeted_time = calculeted_time/1000;
00766 
00767             if( ( !leave ) && ( member->mode == LEADER ) )
00768             {
00769                 pthread_mutex_lock( &log_mutex );
00770                 multicast_number++;
00771                 pthread_mutex_unlock( &log_mutex );
00772             }
00773 
00774             m_log_fluxo << " Multicast " << multicast_number
00775                         << " Patching " << patching_number
00776                         << " Total_fluxo "
00777                         << multicast_number + patching_number
00778                         << " Tempo " << calculeted_time << endl;
00779             #endif
00780 
00781             #ifdef RIO_DEBUG2
00782             RioErr << "OK (" << cont++ << ")" << endl;
00783             #endif
00784 
00785             break;
00786         } // fim do case MSGCODE_MOVE:
00787 
00788         // Message received by PI
00789         case MSGCODE_PID:
00790         {
00791             msgPID pidData;
00792             memcpy( &pidData, readSeq, sizeof( msgPID ) );
00793 
00794             RioErr << "[CommMulticast] MSGCODE_PID Received..." << endl;
00795             RioErr << "[CommMulticast] PID: " << pidData.pid << endl;
00796 
00797             getTable()->changeSocketPid( sd, pidData.pid );
00798 
00799             if( getPLSession() == NULL )
00800             {
00801                 MessageMutexUnlock( "SoRecMe8" );
00802 
00803                 #ifdef RIO_DEBUG1
00804                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 10" << endl;
00805                 #endif
00806 
00807                 return;
00808             }
00809 
00810             getPLSession()->MoveMutexLock( "comtcs3" );
00811             NewMember = getPLSession()->FindClient( pidData.pid );
00812             getPLSession()->MoveMutexUnlock( "comtcs3" );
00813 
00814             #ifdef RIO_DEBUG2
00815             RioErr << "[CommMulticast]: Mensagem MSGCODE_IP enviada para o "
00816                    << "cliente " << NewMember->PIDClientThread << " (mode "
00817                    << ClientMode2String( NewMember->mode ) << ", group "
00818                    << NewMember->multicast_port << ", ip "
00819                    << NewMember->multicast_ip << ", block -1, action KEEP_THIS)"
00820                    << endl;
00821             #endif
00822 
00823             //A mensagem abaixo s� � enviada para que o cliente libere o mutex
00824             //multicastMutex: Assim que o cliente envia seu PID pela
00825             //MSGCODE_PID, ele suspende sua execu��o ao executar
00826             //pthread_cond_wait( &multicastCondition, &multicastMutex ). A
00827             //execu��o continua t�o logo pthread_cond_broadcast seja executada
00828             //(o que acontece ao receber uma MSGCODE_IP)
00829             sendIPMsg( NewMember->PIDClientThread,
00830                        NewMember->multicast_ip,
00831                        NewMember->multicast_port,
00832                        NewMember->mode );
00833 
00834             #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00835             if( NewMember->mode == LEADER )
00836             {
00837                 pthread_mutex_lock( &log_mutex );
00838                 multicast_number++;
00839                 pthread_mutex_unlock( &log_mutex );
00840             }
00841             #endif
00842 
00843             break;
00844         } // fim do case MSGCODE_PID:
00845 
00846         // Message received by client
00847         case MSGCODE_MODE:
00848         {
00849             //MSGCODE_MODE � diferente de MSGCODE_IP no seguinte: MSGCODE_MODE
00850             //s� � enviada para promover lientes passivos a subl�deres ou a
00851             //l�deres ou para informar a um l�der que ele foi ultrapassado e,
00852             //portanto, deve deixar a lideran�a. Em s�ntese, MSGCODE_MODE
00853             //informa ao cliente (que n�o fez nenhuma intera�ao) que ele deve
00854             //mudar de fun��o no sistema (tornar-se l�der, subl�der ou passivo).
00855 
00856             msgMode modeData;
00857             memcpy( &modeData, readSeq, sizeof( msgMode ) );
00858 
00859             result = getTable()->searchSocket( sd, &current, &previous );
00860             if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00861             {
00862                 RioErr << "[CommMulticast] Erro!!! Cliente n�o encontrado na "
00863                        << "tabela" << endl;
00864 
00865                 MessageMutexUnlock( "SoRecMe9" );
00866 
00867                 #ifdef RIO_DEBUG1
00868                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 11" << endl;
00869                 #endif
00870 
00871                 return;
00872             }
00873 
00874             if( video->waitingMSGCODE_IP )
00875             {
00876                 #ifdef RIO_DEBUG2
00877                 RioErr << "[CommMulticast] MSGCODE_MODE Received and will be ignored..." << endl;
00878                 RioErr << "[CommMulticast]: Setaria mode para "
00879                        << ClientMode2String( modeData.mode ) << endl;
00880                 RioErr << "[CommMulticast]: Setaria Bloco para "
00881                        << ( int )modeData.block << endl;
00882                 #endif
00883 
00884                 MessageMutexUnlock( "SoRecMe9" );
00885                 return;
00886             }
00887 
00888             #ifdef RIO_DEBUG2
00889             RioErr << "[CommMulticast] MSGCODE_MODE Received..." << endl;
00890             RioErr << "[CommMulticast]: Setando mode para "
00891                    << ClientMode2String( modeData.mode ) << endl;
00892             RioErr << "[CommMulticast]: Setando group_block para "
00893                    << ( int )modeData.block << endl;
00894             #endif
00895 
00896             if( modeData.mode == LEADER )
00897             {
00898                 video->first       = true;
00899                 video->group_block = ( int )modeData.block;
00900 
00901                 //No caso de um MSGCODE_MODE, o cliente ser� um l�der for�ado.
00902                 //Ele n�o ter� mais um alvo pois abaixo, na chamada a
00903                 //burstRequest, ser�o solicitados todos os blocos que este
00904                 //cliente n�o tem at� o atual l�der.
00905                 video->SetTarget( INT_MAX );
00906 
00907                 //Cliente saindo do patching.
00908                 if( video->isPatching() )
00909                     video->Stream_Control( "plsoreme3" );
00910 
00911                 if( current->getMode() == PASSIVE )
00912                 {
00913                     //Se cliente era passivo e foi eleito l�der ent�o pode ser
00914                     //que tenha que requisitar uma rajada de blocos. O m�todo
00915                     //burstRequest() trata isto.
00916                     burstRequest( modeData.block );
00917                 }
00918             }
00919             else if( modeData.mode == SUBLEADER )
00920             {
00921                 video->first       = true;
00922                 video->group_block = ( int )modeData.block;
00923 
00924                 //No caso de um MSGCODE_MODE, o cliente ser� um subl�der
00925                 //for�ado. Por estar se tornando um subl�der, ele dever�, em
00926                 //algum momento, alcan�ar seu l�der; portanto ter� seu target
00927                 //alterado para modeData.target, que � o bloco que o atual
00928                 //l�der do grupo est� pedindo para o grupo.
00929 
00930                 RioErr << "[CommMulticast]: New client target: "
00931                        << modeData.target << endl;
00932 
00933                 video->SetTarget( modeData.target );
00934 
00935                 //Cliente entrando no patching.
00936                 if( !video->isPatching() )
00937                     video->Stream_Control( "plsoreme4" );
00938             }
00939             else
00940             {
00941                 //O �nico caso em que esta condi��o � executada � quando este
00942                 //cliente � l�der e � ultrapassado, quando, ent�o, deve perder a
00943                 //lideran�a.
00944                 video->first       = false;
00945                 video->group_block = -1;
00946                 video->SetTarget( INT_MAX );
00947 
00948                 //Cliente saindo do patching
00949                 if( video->isPatching() )
00950                     video->Stream_Control( "plsoreme5" );
00951             }
00952 
00953             current->setMode( modeData.mode );
00954 
00955             break;
00956         } // fim do case MSGCODE_MODE:
00957 
00958         #if defined(RIO_DEBUG2) && defined(RIO_EMUL)
00959         case MSGCODE_FLUXO:
00960         {
00961             msgFluxo fluxoData;
00962             memcpy( &fluxoData, readSeq, sizeof( msgFluxo ) );
00963 
00964             m_log_fluxo << " " << endl;
00965 
00966             m_log_fluxo << "Recebimento de mensagem de mudanca de fluxo "
00967                         << endl;
00968             result = getTable()->searchSocket( sd, &current, &previous );
00969             if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00970             {
00971                 MessageMutexUnlock( "SoRecMe10" );
00972 
00973                 #ifdef RIO_DEBUG1
00974                 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 12" << endl;
00975                 #endif
00976 
00977                 return;
00978             }
00979 
00980             gettimeofday( &time_list, 0 );
00981 
00982             final_time.tv_sec  = time_list.tv_sec - initial_time.tv_sec;
00983             final_time.tv_usec = time_list.tv_usec - initial_time.tv_usec;
00984 
00985             if( final_time.tv_usec < 0 )
00986             {
00987               final_time.tv_sec  -= 1;
00988               final_time.tv_usec += 1000000;
00989             }
00990 
00991             calculeted_time = ( unsigned int ) (final_time.tv_sec * 1000.0 +
00992                                                  final_time.tv_usec/1000.0);
00993 
00994             calculeted_time = calculeted_time/1000;
00995 
00996             pthread_mutex_lock( &log_mutex );
00997             if( fluxoData.patching == 1 )
00998             {
00999                 patching_number++;
01000                 m_log_fluxo << "Cliente entrando no patching. "
01001                             << " patching_number depois: "
01002                             << patching_number << endl;
01003             }
01004             else // if( fluxoData.patching == -1 )
01005             {
01006                 patching_number--;
01007                 m_log_fluxo << "Cliente saindo no patching. "
01008                             << " patching_number depois: "
01009                             << patching_number << endl;
01010             }
01011 
01012             m_log_fluxo << " Multicast " << multicast_number
01013                         << " Patching " << patching_number
01014                         << " Total_fluxo "
01015                         << multicast_number + patching_number
01016                         << " Tempo " << calculeted_time << endl;
01017 
01018             getPLSession()->MoveMutexLock( "msgcdflux" );
01019             getPLSession()->PrintLogFile();
01020             getPLSession()->MoveMutexUnlock( "msgcdflux" );
01021 
01022             pthread_mutex_unlock( &log_mutex );
01023 
01024             break;
01025         } // fim do case MSGCODE_FLUXO:
01026         #endif
01027     } //fim do switch( *code )
01028 
01029     MessageMutexUnlock( "SoRecMe" );
01030 
01031     #ifdef RIO_DEBUG1
01032     RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 13" << endl;
01033     #endif
01034 }

void * CommunicationStream::startListenThread ( void *  param  )  [static]

Definition at line 115 of file CommMulticastStream.cpp.

00116 {
00117     #ifdef RIO_DEBUG1
00118     RioErr << "### [CommunicationStream - startListenThread] Start" << endl;
00119     #endif
00120 
00121     if( param == NULL )
00122     {
00123         #ifdef RIO_DEBUG1
00124         RioErr << "### [CommunicationStream - startListenThread] Finish 1" << endl;
00125         #endif
00126 
00127         return NULL;
00128     }
00129 
00130     CommunicationStreamThreadParam *p;
00131     p = ( CommunicationStreamThreadParam * ) param;
00132 
00133     p->stream->createListenSocket( p->port );
00134 
00135     #ifdef RIO_DEBUG1
00136     RioErr << "### [CommunicationStream - startListenThread] Finish 2" << endl;
00137     #endif
00138 
00139     return NULL;
00140 }

void * CommunicationStream::startReceiveThread ( void *  param  )  [static]

Definition at line 142 of file CommMulticastStream.cpp.

00143 {
00144     #ifdef RIO_DEBUG1
00145     RioErr << "### [CommunicationStream - startReceiveThread] Start" << endl;
00146     #endif
00147 
00148     if( param == NULL )
00149     {
00150         #ifdef RIO_DEBUG1
00151         RioErr << "### [CommunicationStream - startReceiveThread] Finish 1" << endl;
00152         #endif
00153 
00154         return NULL;
00155     }
00156 
00157     CommunicationStreamThreadParam *p;
00158     p = ( CommunicationStreamThreadParam * ) param;
00159 
00160     p->stream->socketReceiveHandler();
00161 
00162     #ifdef RIO_DEBUG1
00163     RioErr << "### [CommunicationStream - startReceiveThread] Finish 2" << endl;
00164     #endif
00165 
00166     return NULL;
00167 }

void CommunicationStream::TableMutexLock ( string  msg = ""  )  [protected]

Definition at line 1538 of file CommMulticastStream.cpp.

01539 {
01540     //#ifdef RIO_DEBUG1
01541     //RioErr << "### [CommunicationStream - TableMutexLock] Start" << endl;
01542     //#endif
01543     
01544     #ifdef RIO_DEBUG2
01545     if( msg.length() )
01546         RioErr << "[TableMutexLock] " << msg << "... " << endl;
01547     #endif
01548 
01549     pthread_mutex_lock( &TableMutex );
01550 
01551     #ifdef RIO_DEBUG2
01552     if( msg.length() )
01553         RioErr << "..." << msg << " OK!" << endl;
01554     #endif
01555 
01556     //#ifdef RIO_DEBUG1
01557     //RioErr << "### [CommunicationStream - TableMutexLock] Finish" << endl;
01558     //#endif
01559 }

void CommunicationStream::TableMutexUnlock ( string  msg = ""  )  [protected]

Definition at line 1561 of file CommMulticastStream.cpp.

01562 {
01563     //#ifdef RIO_DEBUG1
01564     //RioErr << "### [CommunicationStream - TableMutexUnlock] Start" << endl;
01565     //#endif
01566 
01567     #ifdef RIO_DEBUG2
01568     if( msg.length() )
01569         RioErr << "[TableMutexUnlock] " << msg << endl;
01570     #endif
01571 
01572     pthread_mutex_unlock( &TableMutex );
01573 
01574     //#ifdef RIO_DEBUG1
01575     //RioErr << "### [CommunicationStream - TableMutexUnlock] Finish" << endl;
01576     //#endif
01577 }


Field Documentation

Definition at line 191 of file CommMulticastStream.h.

void* CommunicationStream::callback [protected]

Definition at line 190 of file CommMulticastStream.h.

int CommunicationStream::init [protected]

Definition at line 193 of file CommMulticastStream.h.

pthread_mutex_t CommunicationStream::MessageMutex [protected]

Definition at line 194 of file CommMulticastStream.h.

Definition at line 188 of file CommMulticastStream.h.

Definition at line 189 of file CommMulticastStream.h.

Definition at line 227 of file CommMulticastStream.h.

Definition at line 187 of file CommMulticastStream.h.

pthread_mutex_t CommunicationStream::TableMutex [protected]

Definition at line 195 of file CommMulticastStream.h.

Definition at line 192 of file CommMulticastStream.h.


The documentation for this class was generated from the following files:
Generated on Wed Jul 4 16:03:32 2012 for RIO by  doxygen 1.6.3