#include <CommMulticastStream.h>
Public Member Functions | |
CommunicationStream (CRioObject *object=0, PLSessionManager *PLSession=0, void *callback=0, BufferStream *bufferStream=0, CRioMMVideo *video=0, char *LogsDirectory=NULL) | |
int | createSocket (AddressIP *address) |
int | createListenSocket (unsigned short port) |
void | socketReceiveHandler () |
void | socketReceiveMessageHandler (int sd) |
void | sendIPMsgToSocket (int sd, char *ip, unsigned short port, ClientMode mode, int block=-1) |
void | sendPIDMsgToSocket (int sd, int myPid) |
void | sendIPMsg (int pid, char *ip, unsigned short port, ClientMode mode, int block=-1, IPAction ipAction=KEEP_THIS, int target=-1) |
void | sendMoveMsg (int pid, MoveAction action, int block=-1) |
void | sendPIDMsg (int myPid) |
Este m�todo n�o est� sendo usado nunca! void CommunicationStream::sendBlockMsg( int pid, unsigned int block ) { ifdef RIO_DEBUG1 RioErr << "### [CommunicationStream - sendBlockMsg] Start" << endl; endif. | |
void | sendModeMsg (int pid, ClientMode mode, int block=-1, int target=-1) |
void | burstRequest (int block) |
string | MoveAction2String (MoveAction action) |
string | ClientMode2String (ClientMode mode) |
string | IPAction2String (IPAction action) |
CommunicationTable * | getTable () |
void | setTable (CommunicationTable *table) |
CRioObject * | getObject () |
PLSessionManager * | getPLSession () |
Static Public Member Functions | |
static void * | startListenThread (void *param) |
static void * | startReceiveThread (void *param) |
Data Fields | |
fd_set * | readfds |
Protected Member Functions | |
void | MessageMutexLock (string msg="") |
void | MessageMutexUnlock (string msg="") |
void | TableMutexLock (string msg="") |
void | TableMutexUnlock (string msg="") |
Protected Attributes | |
CommunicationTable * | table |
CRioObject * | object |
PLSessionManager * | PLSession |
void * | callback |
BufferStream * | bufferStream |
CRioMMVideo * | video |
int | init |
pthread_mutex_t | MessageMutex |
pthread_mutex_t | TableMutex |
Definition at line 184 of file CommMulticastStream.h.
CommunicationStream::CommunicationStream | ( | CRioObject * | object = 0 , |
|
PLSessionManager * | PLSession = 0 , |
|||
void * | callback = 0 , |
|||
BufferStream * | bufferStream = 0 , |
|||
CRioMMVideo * | video = 0 , |
|||
char * | LogsDirectory = NULL | |||
) |
Definition at line 50 of file CommMulticastStream.cpp.
00056 { 00057 #ifdef RIO_DEBUG1 00058 RioErr << "### [CommunicationStream - Constructor] Start" << endl; 00059 #endif 00060 00061 this->object = object; 00062 table = new CommunicationTable(); 00063 readfds = new fd_set; 00064 this->PLSession = PLSession; 00065 this->callback = callback; 00066 this->bufferStream = bufferStream; 00067 this->video = video; 00068 init = 0; 00069 00070 pthread_mutex_init( &MessageMutex, NULL ); 00071 pthread_mutex_init( &TableMutex, NULL ); 00072 00073 #ifdef RIO_DEBUG2 00074 gettimeofday( &initial_time, 0 ); 00075 gettimeofday( &time_list, 0 ); 00076 gettimeofday( &final_time, 0 ); 00077 00078 patching_number = 0; 00079 multicast_number = 0; 00080 begin = false; 00081 00082 pthread_mutex_init( &log_mutex, NULL ); 00083 if( LogsDirectory == NULL ) 00084 sprintf( FLUXOSLOG, "%s", LOGFILE ); 00085 else 00086 sprintf( FLUXOSLOG, "%s%s", LogsDirectory, LOGFILE ); 00087 m_log_fluxo.open( FLUXOSLOG ); 00088 00089 calculeted_time = 0; 00090 #endif 00091 00092 #ifdef RIO_DEBUG1 00093 RioErr << "### [CommunicationStream - Constructor] Finish" << endl; 00094 #endif 00095 }
void CommunicationStream::burstRequest | ( | int | block | ) |
Definition at line 1431 of file CommMulticastStream.cpp.
01432 { 01433 #ifdef RIO_DEBUG1 01434 RioErr << "### [CommunicationStream - burstRequest] Start" << endl; 01435 #endif 01436 01437 //Abaixo uso a vari�vel clientNextRequestBlock pois ao usar 01438 //video->GetNextRequestBlock tem-se o problema de que o membro 01439 //NextRequestBlock da RiomMVideo � atualizado a todo instante, 01440 //mas neste ponto aqui precisa-se apenas do valor deste membro 01441 //neste exato momento. 01442 RioBlock clientNextRequestBlock = video->GetNextRequestBlock(); 01443 01444 if( ( block > -1 ) && 01445 ( clientNextRequestBlock > (RioBlock) block ) 01446 ) 01447 { 01448 #ifdef RIO_DEBUG2 01449 RioErr << "[CommMulticastStream] Requisitando blocos entre " 01450 << block << " e " << clientNextRequestBlock 01451 << " pois ultrapassei o lider e roubei sua lideran�a" 01452 << endl; 01453 #endif 01454 01455 //Quando um cliente recebe a mensagem de heranca de lideranca, 01456 //pode ser que ele esteja tocando � frente de tal grupo. Neste 01457 //caso, os blocos que estiverem entre o bloco do grupo e o do 01458 //cliente devem ser solicitados. isto � feito no loop abaixo. 01459 bool readOk = true; 01460 for( RioBlock i = block + 1; 01461 ( i < clientNextRequestBlock ) && readOk; 01462 i++ 01463 ) 01464 { 01465 readOk = readOk && video->MulticastRead( i, 01466 MULTICASTTRAFFIC ); 01467 01468 #ifdef RIO_DEBUG2 01469 if( !readOk ) 01470 RioErr << "[burstRequest] Erro no pedido de bloco " << i 01471 << endl; 01472 else 01473 RioErr << "[burstRequest] Bloco " << i << " pedido com sucesso." 01474 << endl; 01475 #endif 01476 } 01477 01478 //Tao logo o cliente normalize os pedidos, ser� um lider 01479 //natural, portanto deve deixar de usar group_block, j� que 01480 //este membro indica que client � um lider for�ado que toca 01481 //um bloco diferente do que solicita para o grupo. 01482 video->group_block = -1; 01483 01484 //se o cliente estiver fazendo patching, sair� dele neste 01485 //momento pois acaba de ultrapassar o l�der e, por isto, deve 01486 //assumir a lideran�a. 01487 if( video->isPatching() ) 01488 video->Stream_Control( "plbure1" ); 01489 01490 } // fim do if( ( block > -1 ) && ( clientNextRequestBlock > (RioBlock)... 01491 01492 #ifdef RIO_DEBUG1 01493 RioErr << "### [CommunicationStream - burstRequest] Finish" << endl; 01494 #endif 01495 }
string CommunicationStream::ClientMode2String | ( | ClientMode | mode | ) |
Definition at line 1606 of file CommMulticastStream.cpp.
01607 { 01608 //#ifdef RIO_DEBUG1 01609 //RioErr << "### [CommunicationStream - ClientMode2String] Start" << endl; 01610 //#endif 01611 01612 string result; 01613 01614 switch( mode ) 01615 { 01616 case PASSIVE: result = "PASSIVE"; break; 01617 case LEADER: result = "LEADER"; break; 01618 case SUBLEADER: result = "SUBLEADER"; break; 01619 case INACTIVE: result = "INACTIVE"; break; 01620 case UNAVAILABLE: result = "UNAVAILABLE"; break; 01621 default: result = "UNKNOWN!!!"; break; 01622 } 01623 01624 //#ifdef RIO_DEBUG1 01625 //RioErr << "### [CommunicationStream - ClientMode2String] Finish" << endl; 01626 //#endif 01627 01628 return result; 01629 }
int CommunicationStream::createListenSocket | ( | unsigned short | port | ) |
Definition at line 234 of file CommMulticastStream.cpp.
00235 { 00236 #ifdef RIO_DEBUG1 00237 RioErr << "### [CommunicationStream - createListenSocket] Start" << endl; 00238 #endif 00239 00240 #ifdef RIO_DEBUG2 00241 RioErr << "PID da createListenSocket = " << pthread_self() << endl; 00242 #endif 00243 00244 struct sockaddr_in servaddr; 00245 int sd; 00246 int nsd; 00247 00248 memset( &servaddr, 0, sizeof( servaddr ) ); 00249 servaddr.sin_family = AF_INET; 00250 servaddr.sin_port = htons( port ); 00251 servaddr.sin_addr.s_addr = htonl( INADDR_ANY ); 00252 00253 if( ( sd = socket( PF_INET, SOCK_STREAM, 0 ) ) == -1 ) 00254 { 00255 Rioperror("CommunicationStream:createListenSocket"); 00256 00257 #ifdef RIO_DEBUG1 00258 RioErr << "### [CommunicationStream - createListenSocket] Finish 1" << endl; 00259 #endif 00260 00261 return 0; 00262 } 00263 00264 int aux = 1; 00265 if( setsockopt ( sd, SOL_SOCKET, SO_REUSEADDR, 00266 (char*)&aux, sizeof(aux)) == -1 ) 00267 { 00268 Rioperror( "CommunicationStream:setsockopt"); 00269 } 00270 00271 00272 if( bind( sd, ( struct sockaddr * ) &servaddr, sizeof( servaddr ) ) == -1 ) 00273 { 00274 Rioperror("CommunicationStream:bindSocket"); 00275 00276 #ifdef RIO_DEBUG1 00277 RioErr << "### [CommunicationStream - createListenSocket] Finish 2" << endl; 00278 #endif 00279 00280 return 0; 00281 } 00282 00283 //listen( sd, 10 ); 00284 int rc; 00285 rc = listen( sd, SOMAXCONN ); 00286 if( rc != 0 ) 00287 RioErr << "Erro listen commulticast " << strerror( errno ) << endl; 00288 00289 FD_ZERO( readfds ); 00290 while( 1 ) 00291 { 00292 if( ( nsd = accept( sd, NULL, NULL ) ) != -1 ) 00293 { 00294 TableMutexLock( "crlissock" ); 00295 getTable()->insertNullPid( nsd ); 00296 TableMutexUnlock(" crlissock" ); 00297 //FD_SET( nsd, readfds ); 00298 } 00299 else 00300 { 00301 RioErr << "Erro no accept: " << strerror( errno ) << endl; 00302 } 00303 } 00304 }
int CommunicationStream::createSocket | ( | AddressIP * | address | ) |
Definition at line 169 of file CommMulticastStream.cpp.
00170 { 00171 #ifdef RIO_DEBUG1 00172 RioErr << "### [CommunicationStream - createSocket] Start" << endl; 00173 #endif 00174 00175 if( address == NULL ) 00176 { 00177 #ifdef RIO_DEBUG1 00178 RioErr << "### [CommunicationStream - createSocket] Finish 1" << endl; 00179 #endif 00180 00181 return 0; 00182 } 00183 00184 int sd; 00185 struct sockaddr_in servaddr; 00186 00187 memset( &servaddr, 0, sizeof( servaddr ) ); 00188 servaddr.sin_family = AF_INET; 00189 servaddr.sin_port = address->getPort(); 00190 servaddr.sin_addr.s_addr = address->getAddress(); 00191 00192 if( ( sd = socket( PF_INET, SOCK_STREAM, 0 ) ) == -1 ) 00193 { 00194 Rioperror("CommunicationStream:createSocket"); 00195 00196 #ifdef RIO_DEBUG1 00197 RioErr << "### [CommunicationStream - createSocket] Finish 2" << endl; 00198 #endif 00199 00200 return 0; 00201 } 00202 00203 // Faz com que o socket seja exclusivo do processo, e nao seja passado 00204 // para um outro processo criado pelo processo que abriu o socket. 00205 if( fcntl( sd, F_SETFD, FD_CLOEXEC ) < 0 ) 00206 { 00207 Rioperror("CommunicationStream:fcntl"); 00208 00209 #ifdef RIO_DEBUG1 00210 RioErr << "### [CommunicationStream - createSocket] Finish 3" << endl; 00211 #endif 00212 00213 return 0; 00214 } 00215 00216 if( connect( sd, ( struct sockaddr * ) &servaddr, sizeof(servaddr) ) == -1 ) 00217 { 00218 Rioperror("CommunicationStream:connectSocket"); 00219 00220 #ifdef RIO_DEBUG1 00221 RioErr << "### [CommunicationStream - createSocket] Finish 4" << endl; 00222 #endif 00223 00224 return 0; 00225 } 00226 00227 #ifdef RIO_DEBUG1 00228 RioErr << "### [CommunicationStream - createSocket] Finish 5" << endl; 00229 #endif 00230 00231 return sd; 00232 }
CRioObject * CommunicationStream::getObject | ( | ) |
Definition at line 106 of file CommMulticastStream.cpp.
PLSessionManager * CommunicationStream::getPLSession | ( | ) |
Definition at line 97 of file CommMulticastStream.cpp.
CommunicationTable * CommunicationStream::getTable | ( | ) |
Definition at line 1422 of file CommMulticastStream.cpp.
01423 { 01424 //#ifdef RIO_DEBUG1 01425 //RioErr << "### [CommunicationStream - getTable] Single" << endl; 01426 //#endif 01427 01428 return table; 01429 }
string CommunicationStream::IPAction2String | ( | IPAction | action | ) |
Definition at line 1631 of file CommMulticastStream.cpp.
01632 { 01633 //#ifdef RIO_DEBUG1 01634 //RioErr << "### [CommunicationStream - IPAction2String] Start" << endl; 01635 //#endif 01636 01637 string result; 01638 01639 switch( action ) 01640 { 01641 case JOIN_THIS: result = "JOIN_THIS"; break; 01642 case LEAVE_THIS: result = "LEAVE_THIS"; break; 01643 case LEAVE_ALL: result = "LEAVE_ALL"; break; 01644 case KEEP_THIS: result = "KEEP_THIS"; break; 01645 default: result = "UNKNOWN!!!"; break; 01646 } 01647 01648 //#ifdef RIO_DEBUG1 01649 //RioErr << "### [CommunicationStream - IPAction2String] Finish" << endl; 01650 //#endif 01651 01652 return result; 01653 }
void CommunicationStream::MessageMutexLock | ( | string | msg = "" |
) | [protected] |
Definition at line 1497 of file CommMulticastStream.cpp.
01498 { 01499 #ifdef RIO_DEBUG1 01500 RioErr << "### [CommunicationStream - MessageMutexLock] Start" << endl; 01501 #endif 01502 01503 #ifdef RIO_DEBUG2 01504 if( msg.length() ) 01505 RioErr << "[MessageMutexLock] " << msg << "... " << endl; 01506 #endif 01507 01508 pthread_mutex_lock( &MessageMutex ); 01509 01510 #ifdef RIO_DEBUG2 01511 if( msg.length() ) 01512 RioErr << "..." << msg << " OK!" << endl; 01513 #endif 01514 01515 #ifdef RIO_DEBUG1 01516 RioErr << "### [CommunicationStream - MessageMutexLock] Finish" << endl; 01517 #endif 01518 }
void CommunicationStream::MessageMutexUnlock | ( | string | msg = "" |
) | [protected] |
Definition at line 1520 of file CommMulticastStream.cpp.
01521 { 01522 #ifdef RIO_DEBUG1 01523 RioErr << "### [CommunicationStream - MessageMutexUnlock] Start" << endl; 01524 #endif 01525 01526 #ifdef RIO_DEBUG2 01527 if( msg.length() ) 01528 RioErr << "[MessageMutexUnlock] " << msg << endl; 01529 #endif 01530 01531 pthread_mutex_unlock( &MessageMutex ); 01532 01533 #ifdef RIO_DEBUG1 01534 RioErr << "### [CommunicationStream - MessageMutexUnlock] Finish" << endl; 01535 #endif 01536 }
string CommunicationStream::MoveAction2String | ( | MoveAction | action | ) |
Definition at line 1579 of file CommMulticastStream.cpp.
01580 { 01581 #ifdef RIO_DEBUG1 01582 RioErr << "### [CommunicationStream - MoveAction2String] Start" << endl; 01583 #endif 01584 01585 string result; 01586 01587 switch( action ) 01588 { 01589 case ACTION_PLAY: result = "PLAY"; break; 01590 case ACTION_STOP: result = "STOP"; break; 01591 case ACTION_PAUSE: result = "PAUSE"; break; 01592 case ACTION_FORWARD: result = "FORWARD"; break; 01593 case ACTION_REWIND: result = "REWIND"; break; 01594 case ACTION_READBUFFER: result = "READBUFFER"; break; 01595 case ACTION_REACHLEADER: result = "REACHLEADER"; break; 01596 default: result = "UNKNOWN!!!"; break; 01597 } 01598 01599 #ifdef RIO_DEBUG1 01600 RioErr << "### [CommunicationStream - MoveAction2String] Finish" << endl; 01601 #endif 01602 01603 return result; 01604 }
void CommunicationStream::sendIPMsg | ( | int | pid, | |
char * | ip, | |||
unsigned short | port, | |||
ClientMode | mode, | |||
int | block = -1 , |
|||
IPAction | ipAction = KEEP_THIS , |
|||
int | target = -1 | |||
) |
Definition at line 1231 of file CommMulticastStream.cpp.
01235 { 01236 #ifdef RIO_DEBUG1 01237 RioErr << "### [CommunicationStream - sendIPMsg] Start" << endl; 01238 #endif 01239 01240 #ifdef RIO_DEBUG2 01241 RioErr << "[sendIPMsg] Sending message to client " << pid << ", address " 01242 << address << ", port " << port << ", mode " 01243 << ClientMode2String( mode ) << ", block " << block << ", IPaction " 01244 << IPAction2String( ipAction ) << ", target " << target << "." << endl; 01245 #endif 01246 01247 int result; 01248 int escritos; 01249 msgIP msg; 01250 CommunicationItem *current = NULL; 01251 CommunicationItem *previous = NULL; 01252 01253 memset( &msg, 0, sizeof( msg ) ); 01254 msg.code = MSGCODE_IP; 01255 strcpy( msg.address, address ); 01256 msg.port = port; 01257 msg.mode = mode; 01258 msg.block = block; 01259 msg.target = target; 01260 msg.ipAction = ipAction; 01261 01262 result = getTable()->searchPid( pid, ¤t, &previous ); 01263 01264 if( result == RESULT_COMMUNICATION_SEARCH_FOUND ) 01265 { 01266 escritos = write( current->getSocket(), &msg, sizeof( msg ) ); 01267 01268 if( escritos != sizeof( msg ) ) 01269 { 01270 RioErr << "CommunicationStream::sendIPMsg erro ao escrever " 01271 << "o valor de msg: escritos " << escritos << " bytes " 01272 << "de um total de " << sizeof( msg ) << " bytes." << endl; 01273 } 01274 else if( escritos == -1 ) 01275 { 01276 RioErr << "CommunicationStream::sendIPMsg erro " << errno 01277 << "(" << strerror( errno ) << ") ao escrever o valor de " 01278 << "msg" << endl; 01279 } 01280 01281 } 01282 01283 #ifdef RIO_DEBUG1 01284 RioErr << "### [CommunicationStream - sendIPMsg] Finish" << endl; 01285 #endif 01286 }
void CommunicationStream::sendIPMsgToSocket | ( | int | sd, | |
char * | ip, | |||
unsigned short | port, | |||
ClientMode | mode, | |||
int | block = -1 | |||
) |
Definition at line 1047 of file CommMulticastStream.cpp.
01050 { 01051 #ifdef RIO_DEBUG1 01052 RioErr << "### [CommunicationStream - sendIPMsgToSocket] Start" << endl; 01053 #endif 01054 01055 int escritos; 01056 msgIP msg; 01057 01058 msg.code = MSGCODE_IP; 01059 strcpy( msg.address, address ); 01060 msg.port = port; 01061 msg.mode = mode; 01062 msg.block = block; 01063 01064 escritos = write( sd, &msg, sizeof( msgIP ) ); 01065 01066 if( escritos != sizeof( msgIP ) ) 01067 { 01068 RioErr << "CommunicationStream::sendIPMsgToSocket erro ao escrever o " 01069 << "valor de msgIP: escritos "<< escritos << " bytes de um " 01070 << "total de " << sizeof( msgIP ) << " bytes." << endl; 01071 } 01072 else if( escritos == -1 ) 01073 { 01074 RioErr << "CommunicationStream::sendIPMsgToSocket erro " << errno 01075 << "(" << strerror( errno ) << ") ao escrever o valor de msgIP" 01076 << endl; 01077 } 01078 01079 #ifdef RIO_DEBUG1 01080 RioErr << "### [CommunicationStream - sendIPMsgToSocket] Finish" << endl; 01081 #endif 01082 }
void CommunicationStream::sendModeMsg | ( | int | pid, | |
ClientMode | mode, | |||
int | block = -1 , |
|||
int | target = -1 | |||
) |
Definition at line 1181 of file CommMulticastStream.cpp.
01183 { 01184 #ifdef RIO_DEBUG1 01185 RioErr << "### [CommunicationStream - sendModeMsg] Start" << endl; 01186 #endif 01187 01188 #ifdef RIO_DEBUG2 01189 RioErr << "[sendModeMsg] Sending mode message to client " << pid 01190 << ", mode " << ClientMode2String( mode ) << ", block " << block 01191 << ", target " << target << "." << endl; 01192 #endif 01193 01194 int result; 01195 int escritos; 01196 msgMode msg; 01197 CommunicationItem *current = NULL; 01198 CommunicationItem *previous = NULL; 01199 01200 msg.code = MSGCODE_MODE; 01201 msg.mode = mode; 01202 msg.block = block; 01203 msg.target = target; 01204 01205 result = getTable()->searchPid( pid, ¤t, &previous ); 01206 01207 if( result == RESULT_COMMUNICATION_SEARCH_FOUND ) 01208 { 01209 escritos = write( current->getSocket(), &msg, sizeof( msg ) ); 01210 01211 if( escritos != sizeof( msg ) ) 01212 { 01213 RioErr << "CommunicationStream::sendModeMsg erro ao escrever " 01214 << "o valor de msg: escritos "<< escritos << " bytes " 01215 << "de um total de " << sizeof( msg ) << " bytes." << endl; 01216 } 01217 else if( escritos == -1 ) 01218 { 01219 RioErr << "CommunicationStream::sendModeMsg erro " << errno 01220 << "(" << strerror( errno ) << ") ao escrever o valor de " 01221 << "msg" << endl; 01222 } 01223 01224 } 01225 01226 #ifdef RIO_DEBUG1 01227 RioErr << "### [CommunicationStream - sendModeMsg] Finish" << endl; 01228 #endif 01229 }
void CommunicationStream::sendMoveMsg | ( | int | pid, | |
MoveAction | action, | |||
int | block = -1 | |||
) |
Definition at line 1288 of file CommMulticastStream.cpp.
01289 { 01290 #ifdef RIO_DEBUG1 01291 RioErr << "### [CommunicationStream - sendMoveMsg] Start" << endl; 01292 #endif 01293 01294 #ifdef RIO_DEBUG2 01295 RioErr << "[sendMoveMsg] Sending move message to PL: action " 01296 << MoveAction2String( action ) << ", block " << block 01297 << "." << endl; 01298 #endif 01299 01300 int result; 01301 int escritos; 01302 msgMove msg; 01303 CommunicationItem *current = NULL; 01304 CommunicationItem *previous = NULL; 01305 01306 msg.code = MSGCODE_MOVE; 01307 msg.action = action; 01308 msg.block = block; 01309 01310 result = getTable()->searchPid( pid, ¤t, &previous ); 01311 01312 if( result == RESULT_COMMUNICATION_SEARCH_FOUND ) 01313 { 01314 escritos = write( current->getSocket(), &msg, sizeof( msg ) ); 01315 01316 if( escritos != sizeof( msg ) ) 01317 { 01318 RioErr << "CommunicationStream::sendMoveMsg erro ao escrever " 01319 << "o valor de msg: escritos "<< escritos << " bytes " 01320 << "de um total de " << sizeof( msg ) << " bytes." << endl; 01321 } 01322 else if( escritos == -1 ) 01323 { 01324 RioErr << "CommunicationStream::sendMoveMsg erro " << errno 01325 << "(" << strerror( errno ) << ") ao escrever o valor de " 01326 << "msg" << endl; 01327 } 01328 01329 } 01330 01331 #ifdef RIO_DEBUG1 01332 RioErr << "### [CommunicationStream - sendMoveMsg] Finish" << endl; 01333 #endif 01334 }
void CommunicationStream::sendPIDMsg | ( | int | myPid | ) |
Este m�todo n�o est� sendo usado nunca! void CommunicationStream::sendBlockMsg( int pid, unsigned int block ) { ifdef RIO_DEBUG1 RioErr << "### [CommunicationStream - sendBlockMsg] Start" << endl; endif.
ifdef RIO_DEBUG2 RioErr << "[sendBlockMsg] Sending message to client " << pid << block << "." << endl; endif
int result; msgBlock msg; CommunicationItem *current = NULL; CommunicationItem *previous = NULL;
msg.code = MSGCODE_BLOCK; msg.block = block;
result = getTable()->searchPid( pid, ¤t, &previous );
if( result == RESULT_COMMUNICATION_SEARCH_FOUND ) { write( current->getSocket(), &msg, sizeof( msg ) ); }
ifdef RIO_DEBUG1 RioErr << "### [CommunicationStream - sendBlockMsg] Finish" << endl; endif }
Definition at line 1369 of file CommMulticastStream.cpp.
01370 { 01371 #ifdef RIO_DEBUG1 01372 RioErr << "### [CommunicationStream - sendPIDMsg] Start" << endl; 01373 #endif 01374 01375 #ifdef RIO_DEBUG2 01376 RioErr << "[sendPIDMsg] Sending my PID to PL: " << myPid << "." << endl; 01377 #endif 01378 01379 int result; 01380 int escritos; 01381 msgPID msg; 01382 CommunicationItem *current = NULL; 01383 CommunicationItem *previous = NULL; 01384 01385 msg.code = MSGCODE_PID; 01386 msg.pid = myPid; 01387 01388 result = getTable()->searchPid( myPid, ¤t, &previous ); 01389 01390 if( result == RESULT_COMMUNICATION_SEARCH_FOUND ) 01391 { 01392 escritos = write( current->getSocket(), &msg, sizeof( msg ) ); 01393 01394 if( escritos != sizeof( msg ) ) 01395 { 01396 RioErr << "CommunicationStream::sendPIDMsg erro ao escrever " 01397 << "o valor de msg: escritos "<< escritos << " bytes " 01398 << "de um total de " << sizeof( msg ) << " bytes." << endl; 01399 } 01400 else if( escritos == -1 ) 01401 { 01402 RioErr << "CommunicationStream::sendPIDMsg erro " << errno 01403 << "(" << strerror( errno ) << ") ao escrever o valor de " 01404 << "msg" << endl; 01405 } 01406 01407 } 01408 01409 #ifdef RIO_DEBUG1 01410 RioErr << "### [CommunicationStream - sendPIDMsg] Finish" << endl; 01411 #endif 01412 }
void CommunicationStream::sendPIDMsgToSocket | ( | int | sd, | |
int | myPid | |||
) |
Definition at line 1104 of file CommMulticastStream.cpp.
01105 { 01106 #ifdef RIO_DEBUG1 01107 RioErr << "### [CommunicationStream - sendPIDMsgToSocket] Start" << endl; 01108 #endif 01109 01110 int escritos; 01111 msgPID msg; 01112 msg.code = MSGCODE_PID; 01113 msg.pid = myPid; 01114 01115 escritos = write( sd, &msg, sizeof( msgPID ) ); 01116 01117 if( escritos != sizeof( msgPID ) ) 01118 { 01119 RioErr << "CommunicationStream::sendPIDMsgToSocket erro ao escrever o " 01120 << "valor de msgPID: escritos "<< escritos << " bytes de um " 01121 << "total de " << sizeof( msgPID ) << " bytes." << endl; 01122 } 01123 else if( escritos == -1 ) 01124 { 01125 RioErr << "CommunicationStream::sendPIDMsgToSocket erro " << errno 01126 << "(" << strerror( errno ) << ") ao escrever o valor de msgPID" 01127 << endl; 01128 } 01129 01130 #ifdef RIO_DEBUG1 01131 RioErr << "### [CommunicationStream - sendPIDMsgToSocket] Finish" << endl; 01132 #endif 01133 }
void CommunicationStream::setTable | ( | CommunicationTable * | table | ) |
Definition at line 1414 of file CommMulticastStream.cpp.
void CommunicationStream::socketReceiveHandler | ( | ) |
Definition at line 306 of file CommMulticastStream.cpp.
00307 { 00308 #ifdef RIO_DEBUG1 00309 RioErr << "### [CommunicationStream - socketReceiveHandler] Start" << endl; 00310 #endif 00311 00312 #ifdef RIO_DEBUG2 00313 RioErr << "PID da socketReceiveHandler = " << pthread_self() << endl; 00314 #endif 00315 00316 struct timeval rTimeout; 00317 00318 int ns; 00319 int maxSock; 00320 00321 fd_set auxReadSet; 00322 CommunicationItem *aux; 00323 00324 while( 1 ) 00325 { 00326 TableMutexLock(); 00327 00328 if( !getTable()->getLastItem() ) 00329 maxSock = 0; 00330 else 00331 maxSock = getTable()->getLastItem()->getSocket(); 00332 00333 TableMutexUnlock(); 00334 00335 if( maxSock == 0 ) 00336 { 00337 usleep( 100 ); 00338 continue; 00339 } 00340 00341 rTimeout.tv_sec = 1; 00342 rTimeout.tv_usec = 0; 00343 00344 FD_ZERO( &auxReadSet ); 00345 00346 TableMutexLock( "sorcvhand2" ); 00347 aux = getTable()->getFirstItem(); 00348 while( aux ) 00349 { 00350 FD_SET( aux->getSocket(), &auxReadSet ); 00351 aux = aux->getNext(); 00352 } 00353 00354 TableMutexUnlock( "sorcvhand2" ); 00355 00356 ns = select( maxSock + 1, &auxReadSet, NULL, (fd_set *)0, &rTimeout ); 00357 00358 if( ns > 0 ) 00359 { 00360 TableMutexLock( "sorcvhand3" ); 00361 aux = getTable()->getFirstItem(); 00362 00363 while( aux ) 00364 { 00365 //H� a necessidade de se armazenar aux->getNext() neste ponto 00366 //pois esta informa��o ser� �til ap�s a chamada a 00367 //socketReceiveMessageHandler(), que dependendo da atividade a 00368 //ser feita deletar� aux da tabela. 00369 CommunicationItem *nextaux = aux->getNext(); 00370 00371 if( ( aux->getSocket() > 0 ) && 00372 ( FD_ISSET( aux->getSocket(), &auxReadSet ) ) 00373 ) 00374 { 00375 socketReceiveMessageHandler( aux->getSocket() ); 00376 } 00377 aux = nextaux; 00378 } 00379 00380 TableMutexUnlock( "sorcvhand3" ); 00381 } 00382 else if( ns!= 0 ) 00383 { 00384 Rioperror( "socketReceiveHandler:select" ); 00385 } 00386 } // fim do while( 1 ) 00387 00388 #ifdef RIO_DEBUG1 00389 RioErr << "### [CommunicationStream - socketReceiveHandler] Finish" << endl; 00390 #endif 00391 }
void CommunicationStream::socketReceiveMessageHandler | ( | int | sd | ) |
Definition at line 393 of file CommMulticastStream.cpp.
00394 { 00395 #ifdef RIO_DEBUG1 00396 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Start" << endl; 00397 #endif 00398 00399 static unsigned int cont = 0; 00400 unsigned char readByte; 00401 unsigned char readSeq[ MAXLEN_MSGSTRUCT ]; 00402 InfoClient *NewMember = NULL; 00403 InfoClient *member = NULL; 00404 CommunicationItem *current = NULL; 00405 CommunicationItem *previous = NULL; 00406 int result; 00407 CRioObject *obj; 00408 00409 int i = 0; 00410 int size = 0; 00411 unsigned char *code = 0; 00412 00413 #if defined(RIO_DEBUG2) || defined(RIO_EMUL) 00414 int member_mode = UNAVAILABLE; 00415 int leave = 0; 00416 InfoClient *member_next = 0; 00417 #endif 00418 00419 /* O loop abaixo est� lendo um byte por vez. Segundo o man do select (man 00420 * select_tut) � extremamente desrecomendada esta pr�tica. Reimplementar o 00421 * loop abaixo para fazer uma leitura �nica, se poss�vel. 00422 * */ 00423 while( read( sd, &readByte, 1 ) ) 00424 { 00425 readSeq[ i ] = readByte; 00426 00427 // We might get the message type to know when it end 00428 if( i == 0 ) 00429 { 00430 code = readSeq; 00431 00432 switch( *code ) 00433 { 00434 case MSGCODE_IP: 00435 size = sizeof( msgIP ); 00436 break; 00437 case MSGCODE_BLOCK: 00438 size = sizeof( msgBlock ); 00439 break; 00440 case MSGCODE_PID: 00441 size = sizeof( msgPID ); 00442 break; 00443 case MSGCODE_MOVE: 00444 size = sizeof( msgMove ); 00445 break; 00446 case MSGCODE_MODE: 00447 size = sizeof( msgMode ); 00448 break; 00449 00450 #ifdef RIO_EMUL 00451 case MSGCODE_FLUXO: 00452 size = sizeof( msgFluxo ); 00453 break; 00454 #endif 00455 00456 default: 00457 #ifdef RIO_DEBUG2 00458 RioErr << "[CommMulticast] Erro: Mensagem intrat�vel! (" 00459 << *code << ")" << endl; 00460 #endif 00461 00462 #ifdef RIO_DEBUG1 00463 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 1" << endl; 00464 #endif 00465 00466 return; 00467 break; 00468 } // fim do switch( *code ) 00469 } // fim do if( i == 0 ) 00470 00471 i++; 00472 if( i >= size ) 00473 break; 00474 } // fim do while( read( sd, &readByte, 1 ) ) 00475 00476 if( size == 0 ) 00477 { 00478 // Connection End, now we must remove socket from table 00479 RioErr << "[CommMulticast]: Removing client socket " << endl; 00480 00481 FD_CLR( sd, readfds ); 00482 getTable()->removeSocket( sd ); 00483 00484 close( sd ); 00485 00486 #ifdef RIO_DEBUG1 00487 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 2" << endl; 00488 #endif 00489 00490 return; 00491 } 00492 00493 if( i < size ) 00494 { 00495 #ifdef RIO_DEBUG1 00496 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 3" << endl; 00497 #endif 00498 00499 #ifdef RIO_DEBUG2 00500 RioErr << "[CommunicationStream - socketReceiveMessageHandler] Nao foi " 00501 << "possivel ler a mensagem por completo." << endl; 00502 #endif 00503 00504 return; 00505 } 00506 00507 MessageMutexLock( "SoRecMe1" ); 00508 00509 switch( *code ) 00510 { 00511 // Message received by client 00512 case MSGCODE_IP: 00513 { 00514 msgIP ipData; 00515 memcpy( &ipData, readSeq, sizeof( msgIP ) ); 00516 00517 RioErr << "[CommMulticast]: MSGCODE_IP Received" << endl; 00518 RioErr << "[CommMulticast]: IP Address = " << ipData.address << endl; 00519 RioErr << "[CommMulticast]: IP Port = " << ipData.port << endl; 00520 RioErr << "[CommMulticast]: IP Mode = " 00521 << ClientMode2String( ipData.mode ) << endl; 00522 RioErr << "[CommMulticast]: IP BLOCK = " << ipData.block << endl; 00523 RioErr << "[CommMulticast]: IP Action = " 00524 << IPAction2String( ipData.ipAction ) << endl; 00525 RioErr << "[CommMulticast]: IP Target = " << ipData.target << endl; 00526 00527 obj = getObject(); 00528 00529 if( !obj ) 00530 { 00531 RioErr << "Not found" << endl; 00532 MessageMutexUnlock( "SoRecMe2" ); 00533 00534 #ifdef RIO_DEBUG1 00535 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 4" << endl; 00536 #endif 00537 00538 return; 00539 } 00540 else 00541 { 00542 RioErr << "OK (" << cont++ << ")" << endl; 00543 } 00544 00545 result = getTable()->searchSocket( sd, ¤t, &previous ); 00546 if( result != RESULT_COMMUNICATION_SEARCH_FOUND ) 00547 { 00548 RioErr << "[CommMulticast]: Error result communication " << endl; 00549 MessageMutexUnlock( "SoRecMe3" ); 00550 00551 #ifdef RIO_DEBUG1 00552 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 5" << endl; 00553 #endif 00554 00555 return; 00556 } 00557 00558 //current � um CommunicationItem 00559 current->setMode( ipData.mode ); 00560 00561 if( ( ipData.mode == LEADER ) || ( ipData.mode == SUBLEADER) ) 00562 video->first = true; 00563 else 00564 video->first = false; 00565 00566 switch( ipData.ipAction ) 00567 { 00568 case LEAVE_THIS: 00569 { 00570 RioErr << "[CommMulticast]: Leave this" << endl; 00571 obj->LeaveGroup( ipData.port ); 00572 break; 00573 } 00574 00575 case LEAVE_ALL: 00576 { 00577 RioErr << "[CommMulticast]: Leave all" << endl; 00578 obj->LeaveAllGroups(); 00579 break; 00580 } 00581 00582 case JOIN_THIS: 00583 { 00584 RioErr << "[CommMulticast]: Join This" << endl; 00585 int enable_join = 1; 00586 obj->SetMulticastSocket( ipData.port, ipData.address, 00587 callback, bufferStream, 00588 enable_join ); 00589 break; 00590 } 00591 00592 case KEEP_THIS: 00593 { 00594 RioErr << "[CommMulticast]: Keep This" << endl; 00595 break; 00596 } 00597 00598 default: 00599 RioErr << "[CommMulticast]: Invalid mode " 00600 << ipData.ipAction << "!!" << endl; 00601 break; 00602 } 00603 00604 if( ipData.block > -1 ) 00605 { 00606 //Delta Before 00607 RioErr << "[CommMulticast]: Delta before: " 00608 << ipData.block << endl; 00609 00610 video->SetNextRequestBlock( ipData.block ); 00611 } 00612 00613 if( ipData.target > -1 ) 00614 { 00615 //Setting target 00616 RioErr << "[CommMulticast]: New client target: " 00617 << ipData.target << endl; 00618 00619 video->SetTarget( ipData.target ); 00620 00621 //Nota: Todo submembro e todo l�der � considerado est�vel, 00622 //isto �, n�o est� em estado de patching ( no caso do submembro, 00623 //esta � uma condi��o para fazer o merging entre dois grupos), 00624 //logo n�o h� a necessidade de se enviar um target infinito 00625 //(INT_MAX) para estes tipos de clientes. 00626 00627 //Cliente entrando em patching 00628 if( ( ipData.target != INT_MAX ) && !video->isPatching() ) 00629 video->Stream_Control( "plsoreme1" ); 00630 //Cliente saindo do patching 00631 else if( ( ipData.target == INT_MAX ) && video->isPatching() ) 00632 video->Stream_Control( "plsoreme2" ); 00633 } 00634 00635 //Se cliente recebeu um LEAVE_ALL que n�o indique que ele deve ir 00636 //para o modo INACTIVE, ent�o ele receber� uma nova mensagem do tipo 00637 //JOIN_THIS. Neste caso, apenas esta segunda mensagem dever� liberar 00638 //o mutex do cliente, chamando a MulticastReleaseCondition(). 00639 if( !( ( ipData.ipAction == LEAVE_ALL ) && 00640 ( ipData.mode != INACTIVE ) 00641 ) 00642 ) 00643 { 00644 video->MulticastMutexLock( "comtcs1" ); 00645 video->MulticastReleaseCondition( "comtcs1" ); 00646 video->MulticastMutexUnlock( "comtcs1" ); 00647 } 00648 00649 RioErr << "[CommMulticast]: BroadCast End" << endl; 00650 break; 00651 } // fim do case MSGCODE_IP: 00652 00653 // Message received by PI 00654 case MSGCODE_MOVE: 00655 { 00656 msgMove moveData; 00657 memcpy( &moveData, readSeq, sizeof( msgMove ) ); 00658 00659 RioErr << "[CommMulticast] MSGCODE_MOVE Received..." << endl; 00660 00661 if( getPLSession() == NULL ) 00662 { 00663 MessageMutexUnlock( "SoRecMe4" ); 00664 00665 #ifdef RIO_DEBUG1 00666 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 6" << endl; 00667 #endif 00668 00669 return; 00670 } 00671 00672 result = getTable()->searchSocket( sd, ¤t, &previous ); 00673 if( result != RESULT_COMMUNICATION_SEARCH_FOUND ) 00674 { 00675 MessageMutexUnlock( "SoRecMe5" ); 00676 00677 #ifdef RIO_DEBUG1 00678 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 7" << endl; 00679 #endif 00680 00681 return; 00682 } 00683 00684 getPLSession()->MoveMutexLock( "comtcs2" ); 00685 member = getPLSession()->FindClient( current->getPid() ); 00686 getPLSession()->MoveMutexUnlock( "comtcs2" ); 00687 00688 if( !member ) 00689 { 00690 RioErr << "[CommMulticast]: member " << current->getPid() 00691 << " nao encontrado. Saindo da " 00692 << "socketReceiveMessageHandler..." << endl; 00693 00694 MessageMutexUnlock( "SoRecMe6" ); 00695 00696 #ifdef RIO_DEBUG1 00697 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 8" << endl; 00698 #endif 00699 00700 return; 00701 } 00702 00703 #if defined(RIO_DEBUG2) || defined(RIO_EMUL) 00704 member_mode = member->mode; 00705 00706 if( ( moveData.action == ACTION_PAUSE ) || 00707 ( moveData.action == ACTION_STOP ) || 00708 ( moveData.block < 0 ) 00709 ) 00710 { 00711 leave = 1; 00712 } 00713 #endif 00714 00715 RioErr << "[CommMulticast]: MOVE CLIENT" << endl; 00716 RioErr << "[CommMulticast]: PID " << current->getPid() << endl; 00717 RioErr << "[CommMulticast]: GROUP " << member->multicast_port 00718 << endl; 00719 RioErr << "[CommMulticast]: action = " 00720 << MoveAction2String( moveData.action ) << endl; 00721 RioErr << "[CommMulticast]: block = " << moveData.block 00722 << endl; 00723 RioErr << "[CommMulticast]: mode = " 00724 << ClientMode2String( member->mode ) << endl; 00725 00726 result = getPLSession()->AllocClient( member->PIDClientThread, 00727 &moveData ); 00728 00729 if( result == PL_ERROR ) 00730 { 00731 MessageMutexUnlock( "SoRecMe7" ); 00732 00733 #ifdef RIO_DEBUG1 00734 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 9" << endl; 00735 #endif 00736 00737 return; 00738 } 00739 00740 #if defined(RIO_DEBUG2) || defined(RIO_EMUL) 00741 //se member era lider, se moveu e nao havia quem herdasse 00742 //a lideranca em seu lugar, temos um fluxo multicast a menos. 00743 if( ( member_mode == LEADER ) && 00744 ( member->mode != LEADER ) && 00745 ( member_next == NULL ) 00746 ) 00747 { 00748 pthread_mutex_lock( &log_mutex ); 00749 multicast_number--; 00750 pthread_mutex_unlock( &log_mutex ); 00751 } 00752 00753 gettimeofday( &time_list, 0 ); 00754 00755 final_time.tv_sec = time_list.tv_sec - initial_time.tv_sec; 00756 final_time.tv_usec = time_list.tv_usec - initial_time.tv_usec; 00757 00758 if( final_time.tv_usec < 0 ) 00759 { 00760 final_time.tv_sec -= 1; 00761 final_time.tv_usec += 1000000; 00762 } 00763 calculeted_time = ( unsigned int )(final_time.tv_sec * 1000.0 + 00764 final_time.tv_usec/1000.0); 00765 calculeted_time = calculeted_time/1000; 00766 00767 if( ( !leave ) && ( member->mode == LEADER ) ) 00768 { 00769 pthread_mutex_lock( &log_mutex ); 00770 multicast_number++; 00771 pthread_mutex_unlock( &log_mutex ); 00772 } 00773 00774 m_log_fluxo << " Multicast " << multicast_number 00775 << " Patching " << patching_number 00776 << " Total_fluxo " 00777 << multicast_number + patching_number 00778 << " Tempo " << calculeted_time << endl; 00779 #endif 00780 00781 #ifdef RIO_DEBUG2 00782 RioErr << "OK (" << cont++ << ")" << endl; 00783 #endif 00784 00785 break; 00786 } // fim do case MSGCODE_MOVE: 00787 00788 // Message received by PI 00789 case MSGCODE_PID: 00790 { 00791 msgPID pidData; 00792 memcpy( &pidData, readSeq, sizeof( msgPID ) ); 00793 00794 RioErr << "[CommMulticast] MSGCODE_PID Received..." << endl; 00795 RioErr << "[CommMulticast] PID: " << pidData.pid << endl; 00796 00797 getTable()->changeSocketPid( sd, pidData.pid ); 00798 00799 if( getPLSession() == NULL ) 00800 { 00801 MessageMutexUnlock( "SoRecMe8" ); 00802 00803 #ifdef RIO_DEBUG1 00804 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 10" << endl; 00805 #endif 00806 00807 return; 00808 } 00809 00810 getPLSession()->MoveMutexLock( "comtcs3" ); 00811 NewMember = getPLSession()->FindClient( pidData.pid ); 00812 getPLSession()->MoveMutexUnlock( "comtcs3" ); 00813 00814 #ifdef RIO_DEBUG2 00815 RioErr << "[CommMulticast]: Mensagem MSGCODE_IP enviada para o " 00816 << "cliente " << NewMember->PIDClientThread << " (mode " 00817 << ClientMode2String( NewMember->mode ) << ", group " 00818 << NewMember->multicast_port << ", ip " 00819 << NewMember->multicast_ip << ", block -1, action KEEP_THIS)" 00820 << endl; 00821 #endif 00822 00823 //A mensagem abaixo s� � enviada para que o cliente libere o mutex 00824 //multicastMutex: Assim que o cliente envia seu PID pela 00825 //MSGCODE_PID, ele suspende sua execu��o ao executar 00826 //pthread_cond_wait( &multicastCondition, &multicastMutex ). A 00827 //execu��o continua t�o logo pthread_cond_broadcast seja executada 00828 //(o que acontece ao receber uma MSGCODE_IP) 00829 sendIPMsg( NewMember->PIDClientThread, 00830 NewMember->multicast_ip, 00831 NewMember->multicast_port, 00832 NewMember->mode ); 00833 00834 #if defined(RIO_DEBUG2) || defined(RIO_EMUL) 00835 if( NewMember->mode == LEADER ) 00836 { 00837 pthread_mutex_lock( &log_mutex ); 00838 multicast_number++; 00839 pthread_mutex_unlock( &log_mutex ); 00840 } 00841 #endif 00842 00843 break; 00844 } // fim do case MSGCODE_PID: 00845 00846 // Message received by client 00847 case MSGCODE_MODE: 00848 { 00849 //MSGCODE_MODE � diferente de MSGCODE_IP no seguinte: MSGCODE_MODE 00850 //s� � enviada para promover lientes passivos a subl�deres ou a 00851 //l�deres ou para informar a um l�der que ele foi ultrapassado e, 00852 //portanto, deve deixar a lideran�a. Em s�ntese, MSGCODE_MODE 00853 //informa ao cliente (que n�o fez nenhuma intera�ao) que ele deve 00854 //mudar de fun��o no sistema (tornar-se l�der, subl�der ou passivo). 00855 00856 msgMode modeData; 00857 memcpy( &modeData, readSeq, sizeof( msgMode ) ); 00858 00859 result = getTable()->searchSocket( sd, ¤t, &previous ); 00860 if( result != RESULT_COMMUNICATION_SEARCH_FOUND ) 00861 { 00862 RioErr << "[CommMulticast] Erro!!! Cliente n�o encontrado na " 00863 << "tabela" << endl; 00864 00865 MessageMutexUnlock( "SoRecMe9" ); 00866 00867 #ifdef RIO_DEBUG1 00868 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 11" << endl; 00869 #endif 00870 00871 return; 00872 } 00873 00874 if( video->waitingMSGCODE_IP ) 00875 { 00876 #ifdef RIO_DEBUG2 00877 RioErr << "[CommMulticast] MSGCODE_MODE Received and will be ignored..." << endl; 00878 RioErr << "[CommMulticast]: Setaria mode para " 00879 << ClientMode2String( modeData.mode ) << endl; 00880 RioErr << "[CommMulticast]: Setaria Bloco para " 00881 << ( int )modeData.block << endl; 00882 #endif 00883 00884 MessageMutexUnlock( "SoRecMe9" ); 00885 return; 00886 } 00887 00888 #ifdef RIO_DEBUG2 00889 RioErr << "[CommMulticast] MSGCODE_MODE Received..." << endl; 00890 RioErr << "[CommMulticast]: Setando mode para " 00891 << ClientMode2String( modeData.mode ) << endl; 00892 RioErr << "[CommMulticast]: Setando group_block para " 00893 << ( int )modeData.block << endl; 00894 #endif 00895 00896 if( modeData.mode == LEADER ) 00897 { 00898 video->first = true; 00899 video->group_block = ( int )modeData.block; 00900 00901 //No caso de um MSGCODE_MODE, o cliente ser� um l�der for�ado. 00902 //Ele n�o ter� mais um alvo pois abaixo, na chamada a 00903 //burstRequest, ser�o solicitados todos os blocos que este 00904 //cliente n�o tem at� o atual l�der. 00905 video->SetTarget( INT_MAX ); 00906 00907 //Cliente saindo do patching. 00908 if( video->isPatching() ) 00909 video->Stream_Control( "plsoreme3" ); 00910 00911 if( current->getMode() == PASSIVE ) 00912 { 00913 //Se cliente era passivo e foi eleito l�der ent�o pode ser 00914 //que tenha que requisitar uma rajada de blocos. O m�todo 00915 //burstRequest() trata isto. 00916 burstRequest( modeData.block ); 00917 } 00918 } 00919 else if( modeData.mode == SUBLEADER ) 00920 { 00921 video->first = true; 00922 video->group_block = ( int )modeData.block; 00923 00924 //No caso de um MSGCODE_MODE, o cliente ser� um subl�der 00925 //for�ado. Por estar se tornando um subl�der, ele dever�, em 00926 //algum momento, alcan�ar seu l�der; portanto ter� seu target 00927 //alterado para modeData.target, que � o bloco que o atual 00928 //l�der do grupo est� pedindo para o grupo. 00929 00930 RioErr << "[CommMulticast]: New client target: " 00931 << modeData.target << endl; 00932 00933 video->SetTarget( modeData.target ); 00934 00935 //Cliente entrando no patching. 00936 if( !video->isPatching() ) 00937 video->Stream_Control( "plsoreme4" ); 00938 } 00939 else 00940 { 00941 //O �nico caso em que esta condi��o � executada � quando este 00942 //cliente � l�der e � ultrapassado, quando, ent�o, deve perder a 00943 //lideran�a. 00944 video->first = false; 00945 video->group_block = -1; 00946 video->SetTarget( INT_MAX ); 00947 00948 //Cliente saindo do patching 00949 if( video->isPatching() ) 00950 video->Stream_Control( "plsoreme5" ); 00951 } 00952 00953 current->setMode( modeData.mode ); 00954 00955 break; 00956 } // fim do case MSGCODE_MODE: 00957 00958 #if defined(RIO_DEBUG2) && defined(RIO_EMUL) 00959 case MSGCODE_FLUXO: 00960 { 00961 msgFluxo fluxoData; 00962 memcpy( &fluxoData, readSeq, sizeof( msgFluxo ) ); 00963 00964 m_log_fluxo << " " << endl; 00965 00966 m_log_fluxo << "Recebimento de mensagem de mudanca de fluxo " 00967 << endl; 00968 result = getTable()->searchSocket( sd, ¤t, &previous ); 00969 if( result != RESULT_COMMUNICATION_SEARCH_FOUND ) 00970 { 00971 MessageMutexUnlock( "SoRecMe10" ); 00972 00973 #ifdef RIO_DEBUG1 00974 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 12" << endl; 00975 #endif 00976 00977 return; 00978 } 00979 00980 gettimeofday( &time_list, 0 ); 00981 00982 final_time.tv_sec = time_list.tv_sec - initial_time.tv_sec; 00983 final_time.tv_usec = time_list.tv_usec - initial_time.tv_usec; 00984 00985 if( final_time.tv_usec < 0 ) 00986 { 00987 final_time.tv_sec -= 1; 00988 final_time.tv_usec += 1000000; 00989 } 00990 00991 calculeted_time = ( unsigned int ) (final_time.tv_sec * 1000.0 + 00992 final_time.tv_usec/1000.0); 00993 00994 calculeted_time = calculeted_time/1000; 00995 00996 pthread_mutex_lock( &log_mutex ); 00997 if( fluxoData.patching == 1 ) 00998 { 00999 patching_number++; 01000 m_log_fluxo << "Cliente entrando no patching. " 01001 << " patching_number depois: " 01002 << patching_number << endl; 01003 } 01004 else // if( fluxoData.patching == -1 ) 01005 { 01006 patching_number--; 01007 m_log_fluxo << "Cliente saindo no patching. " 01008 << " patching_number depois: " 01009 << patching_number << endl; 01010 } 01011 01012 m_log_fluxo << " Multicast " << multicast_number 01013 << " Patching " << patching_number 01014 << " Total_fluxo " 01015 << multicast_number + patching_number 01016 << " Tempo " << calculeted_time << endl; 01017 01018 getPLSession()->MoveMutexLock( "msgcdflux" ); 01019 getPLSession()->PrintLogFile(); 01020 getPLSession()->MoveMutexUnlock( "msgcdflux" ); 01021 01022 pthread_mutex_unlock( &log_mutex ); 01023 01024 break; 01025 } // fim do case MSGCODE_FLUXO: 01026 #endif 01027 } //fim do switch( *code ) 01028 01029 MessageMutexUnlock( "SoRecMe" ); 01030 01031 #ifdef RIO_DEBUG1 01032 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 13" << endl; 01033 #endif 01034 }
void * CommunicationStream::startListenThread | ( | void * | param | ) | [static] |
Definition at line 115 of file CommMulticastStream.cpp.
00116 { 00117 #ifdef RIO_DEBUG1 00118 RioErr << "### [CommunicationStream - startListenThread] Start" << endl; 00119 #endif 00120 00121 if( param == NULL ) 00122 { 00123 #ifdef RIO_DEBUG1 00124 RioErr << "### [CommunicationStream - startListenThread] Finish 1" << endl; 00125 #endif 00126 00127 return NULL; 00128 } 00129 00130 CommunicationStreamThreadParam *p; 00131 p = ( CommunicationStreamThreadParam * ) param; 00132 00133 p->stream->createListenSocket( p->port ); 00134 00135 #ifdef RIO_DEBUG1 00136 RioErr << "### [CommunicationStream - startListenThread] Finish 2" << endl; 00137 #endif 00138 00139 return NULL; 00140 }
void * CommunicationStream::startReceiveThread | ( | void * | param | ) | [static] |
Definition at line 142 of file CommMulticastStream.cpp.
00143 { 00144 #ifdef RIO_DEBUG1 00145 RioErr << "### [CommunicationStream - startReceiveThread] Start" << endl; 00146 #endif 00147 00148 if( param == NULL ) 00149 { 00150 #ifdef RIO_DEBUG1 00151 RioErr << "### [CommunicationStream - startReceiveThread] Finish 1" << endl; 00152 #endif 00153 00154 return NULL; 00155 } 00156 00157 CommunicationStreamThreadParam *p; 00158 p = ( CommunicationStreamThreadParam * ) param; 00159 00160 p->stream->socketReceiveHandler(); 00161 00162 #ifdef RIO_DEBUG1 00163 RioErr << "### [CommunicationStream - startReceiveThread] Finish 2" << endl; 00164 #endif 00165 00166 return NULL; 00167 }
void CommunicationStream::TableMutexLock | ( | string | msg = "" |
) | [protected] |
Definition at line 1538 of file CommMulticastStream.cpp.
01539 { 01540 //#ifdef RIO_DEBUG1 01541 //RioErr << "### [CommunicationStream - TableMutexLock] Start" << endl; 01542 //#endif 01543 01544 #ifdef RIO_DEBUG2 01545 if( msg.length() ) 01546 RioErr << "[TableMutexLock] " << msg << "... " << endl; 01547 #endif 01548 01549 pthread_mutex_lock( &TableMutex ); 01550 01551 #ifdef RIO_DEBUG2 01552 if( msg.length() ) 01553 RioErr << "..." << msg << " OK!" << endl; 01554 #endif 01555 01556 //#ifdef RIO_DEBUG1 01557 //RioErr << "### [CommunicationStream - TableMutexLock] Finish" << endl; 01558 //#endif 01559 }
void CommunicationStream::TableMutexUnlock | ( | string | msg = "" |
) | [protected] |
Definition at line 1561 of file CommMulticastStream.cpp.
01562 { 01563 //#ifdef RIO_DEBUG1 01564 //RioErr << "### [CommunicationStream - TableMutexUnlock] Start" << endl; 01565 //#endif 01566 01567 #ifdef RIO_DEBUG2 01568 if( msg.length() ) 01569 RioErr << "[TableMutexUnlock] " << msg << endl; 01570 #endif 01571 01572 pthread_mutex_unlock( &TableMutex ); 01573 01574 //#ifdef RIO_DEBUG1 01575 //RioErr << "### [CommunicationStream - TableMutexUnlock] Finish" << endl; 01576 //#endif 01577 }
BufferStream* CommunicationStream::bufferStream [protected] |
Definition at line 191 of file CommMulticastStream.h.
void* CommunicationStream::callback [protected] |
Definition at line 190 of file CommMulticastStream.h.
int CommunicationStream::init [protected] |
Definition at line 193 of file CommMulticastStream.h.
pthread_mutex_t CommunicationStream::MessageMutex [protected] |
Definition at line 194 of file CommMulticastStream.h.
CRioObject* CommunicationStream::object [protected] |
Definition at line 188 of file CommMulticastStream.h.
PLSessionManager* CommunicationStream::PLSession [protected] |
Definition at line 189 of file CommMulticastStream.h.
fd_set* CommunicationStream::readfds |
Definition at line 227 of file CommMulticastStream.h.
CommunicationTable* CommunicationStream::table [protected] |
Definition at line 187 of file CommMulticastStream.h.
pthread_mutex_t CommunicationStream::TableMutex [protected] |
Definition at line 195 of file CommMulticastStream.h.
CRioMMVideo* CommunicationStream::video [protected] |
Definition at line 192 of file CommMulticastStream.h.