00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <iostream>
00019 #include <sys/time.h>
00020 #include <sys/types.h>
00021 #include <sys/socket.h>
00022 #include <unistd.h>
00023 #include <string.h>
00024 #include <arpa/inet.h>
00025 #include <netinet/in.h>
00026 #include <stdio.h>
00027 #include <errno.h>
00028 #include <fcntl.h>
00029
00030 using namespace std;
00031
00032 #include "RioInterfaceTypes.h"
00033 #include "RioInterface.h"
00034 #include "PLSessionManager.h"
00035 #include "CommMulticastStream.h"
00036 #include "BufferStream.h"
00037 #include "RioFile.h"
00038 #include "RioMMObject.h"
00039 #include "RioMMVideo.h"
00040 #include "RioError.h"
00041
00042
00043 const char* const LOGFILE = "RIOPLFluxos.log";
00044
00045
00046
00047
00048
00049
00050 CommunicationStream::CommunicationStream( CRioObject *object,
00051 PLSessionManager *PLSession,
00052 void *callback,
00053 BufferStream *bufferStream,
00054 CRioMMVideo *video,
00055 char *LogsDirectory )
00056 {
00057 #ifdef RIO_DEBUG1
00058 RioErr << "### [CommunicationStream - Constructor] Start" << endl;
00059 #endif
00060
00061 this->object = object;
00062 table = new CommunicationTable();
00063 readfds = new fd_set;
00064 this->PLSession = PLSession;
00065 this->callback = callback;
00066 this->bufferStream = bufferStream;
00067 this->video = video;
00068 init = 0;
00069
00070 pthread_mutex_init( &MessageMutex, NULL );
00071 pthread_mutex_init( &TableMutex, NULL );
00072
00073 #ifdef RIO_DEBUG2
00074 gettimeofday( &initial_time, 0 );
00075 gettimeofday( &time_list, 0 );
00076 gettimeofday( &final_time, 0 );
00077
00078 patching_number = 0;
00079 multicast_number = 0;
00080 begin = false;
00081
00082 pthread_mutex_init( &log_mutex, NULL );
00083 if( LogsDirectory == NULL )
00084 sprintf( FLUXOSLOG, "%s", LOGFILE );
00085 else
00086 sprintf( FLUXOSLOG, "%s%s", LogsDirectory, LOGFILE );
00087 m_log_fluxo.open( FLUXOSLOG );
00088
00089 calculeted_time = 0;
00090 #endif
00091
00092 #ifdef RIO_DEBUG1
00093 RioErr << "### [CommunicationStream - Constructor] Finish" << endl;
00094 #endif
00095 }
00096
00097 PLSessionManager *CommunicationStream::getPLSession()
00098 {
00099 #ifdef RIO_DEBUG1
00100 RioErr << "### [CommunicationStream - getPLSession] Single" << endl;
00101 #endif
00102
00103 return PLSession;
00104 }
00105
00106 CRioObject *CommunicationStream::getObject()
00107 {
00108 #ifdef RIO_DEBUG1
00109 RioErr << "### [CommunicationStream - getObject] Single" << endl;
00110 #endif
00111
00112 return object;
00113 }
00114
00115 void *CommunicationStream::startListenThread( void *param )
00116 {
00117 #ifdef RIO_DEBUG1
00118 RioErr << "### [CommunicationStream - startListenThread] Start" << endl;
00119 #endif
00120
00121 if( param == NULL )
00122 {
00123 #ifdef RIO_DEBUG1
00124 RioErr << "### [CommunicationStream - startListenThread] Finish 1" << endl;
00125 #endif
00126
00127 return NULL;
00128 }
00129
00130 CommunicationStreamThreadParam *p;
00131 p = ( CommunicationStreamThreadParam * ) param;
00132
00133 p->stream->createListenSocket( p->port );
00134
00135 #ifdef RIO_DEBUG1
00136 RioErr << "### [CommunicationStream - startListenThread] Finish 2" << endl;
00137 #endif
00138
00139 return NULL;
00140 }
00141
00142 void *CommunicationStream::startReceiveThread( void *param )
00143 {
00144 #ifdef RIO_DEBUG1
00145 RioErr << "### [CommunicationStream - startReceiveThread] Start" << endl;
00146 #endif
00147
00148 if( param == NULL )
00149 {
00150 #ifdef RIO_DEBUG1
00151 RioErr << "### [CommunicationStream - startReceiveThread] Finish 1" << endl;
00152 #endif
00153
00154 return NULL;
00155 }
00156
00157 CommunicationStreamThreadParam *p;
00158 p = ( CommunicationStreamThreadParam * ) param;
00159
00160 p->stream->socketReceiveHandler();
00161
00162 #ifdef RIO_DEBUG1
00163 RioErr << "### [CommunicationStream - startReceiveThread] Finish 2" << endl;
00164 #endif
00165
00166 return NULL;
00167 }
00168
00169 int CommunicationStream::createSocket( AddressIP *address )
00170 {
00171 #ifdef RIO_DEBUG1
00172 RioErr << "### [CommunicationStream - createSocket] Start" << endl;
00173 #endif
00174
00175 if( address == NULL )
00176 {
00177 #ifdef RIO_DEBUG1
00178 RioErr << "### [CommunicationStream - createSocket] Finish 1" << endl;
00179 #endif
00180
00181 return 0;
00182 }
00183
00184 int sd;
00185 struct sockaddr_in servaddr;
00186
00187 memset( &servaddr, 0, sizeof( servaddr ) );
00188 servaddr.sin_family = AF_INET;
00189 servaddr.sin_port = address->getPort();
00190 servaddr.sin_addr.s_addr = address->getAddress();
00191
00192 if( ( sd = socket( PF_INET, SOCK_STREAM, 0 ) ) == -1 )
00193 {
00194 Rioperror("CommunicationStream:createSocket");
00195
00196 #ifdef RIO_DEBUG1
00197 RioErr << "### [CommunicationStream - createSocket] Finish 2" << endl;
00198 #endif
00199
00200 return 0;
00201 }
00202
00203
00204
00205 if( fcntl( sd, F_SETFD, FD_CLOEXEC ) < 0 )
00206 {
00207 Rioperror("CommunicationStream:fcntl");
00208
00209 #ifdef RIO_DEBUG1
00210 RioErr << "### [CommunicationStream - createSocket] Finish 3" << endl;
00211 #endif
00212
00213 return 0;
00214 }
00215
00216 if( connect( sd, ( struct sockaddr * ) &servaddr, sizeof(servaddr) ) == -1 )
00217 {
00218 Rioperror("CommunicationStream:connectSocket");
00219
00220 #ifdef RIO_DEBUG1
00221 RioErr << "### [CommunicationStream - createSocket] Finish 4" << endl;
00222 #endif
00223
00224 return 0;
00225 }
00226
00227 #ifdef RIO_DEBUG1
00228 RioErr << "### [CommunicationStream - createSocket] Finish 5" << endl;
00229 #endif
00230
00231 return sd;
00232 }
00233
00234 int CommunicationStream::createListenSocket( unsigned short port )
00235 {
00236 #ifdef RIO_DEBUG1
00237 RioErr << "### [CommunicationStream - createListenSocket] Start" << endl;
00238 #endif
00239
00240 #ifdef RIO_DEBUG2
00241 RioErr << "PID da createListenSocket = " << pthread_self() << endl;
00242 #endif
00243
00244 struct sockaddr_in servaddr;
00245 int sd;
00246 int nsd;
00247
00248 memset( &servaddr, 0, sizeof( servaddr ) );
00249 servaddr.sin_family = AF_INET;
00250 servaddr.sin_port = htons( port );
00251 servaddr.sin_addr.s_addr = htonl( INADDR_ANY );
00252
00253 if( ( sd = socket( PF_INET, SOCK_STREAM, 0 ) ) == -1 )
00254 {
00255 Rioperror("CommunicationStream:createListenSocket");
00256
00257 #ifdef RIO_DEBUG1
00258 RioErr << "### [CommunicationStream - createListenSocket] Finish 1" << endl;
00259 #endif
00260
00261 return 0;
00262 }
00263
00264 int aux = 1;
00265 if( setsockopt ( sd, SOL_SOCKET, SO_REUSEADDR,
00266 (char*)&aux, sizeof(aux)) == -1 )
00267 {
00268 Rioperror( "CommunicationStream:setsockopt");
00269 }
00270
00271
00272 if( bind( sd, ( struct sockaddr * ) &servaddr, sizeof( servaddr ) ) == -1 )
00273 {
00274 Rioperror("CommunicationStream:bindSocket");
00275
00276 #ifdef RIO_DEBUG1
00277 RioErr << "### [CommunicationStream - createListenSocket] Finish 2" << endl;
00278 #endif
00279
00280 return 0;
00281 }
00282
00283
00284 int rc;
00285 rc = listen( sd, SOMAXCONN );
00286 if( rc != 0 )
00287 RioErr << "Erro listen commulticast " << strerror( errno ) << endl;
00288
00289 FD_ZERO( readfds );
00290 while( 1 )
00291 {
00292 if( ( nsd = accept( sd, NULL, NULL ) ) != -1 )
00293 {
00294 TableMutexLock( "crlissock" );
00295 getTable()->insertNullPid( nsd );
00296 TableMutexUnlock(" crlissock" );
00297
00298 }
00299 else
00300 {
00301 RioErr << "Erro no accept: " << strerror( errno ) << endl;
00302 }
00303 }
00304 }
00305
00306 void CommunicationStream::socketReceiveHandler()
00307 {
00308 #ifdef RIO_DEBUG1
00309 RioErr << "### [CommunicationStream - socketReceiveHandler] Start" << endl;
00310 #endif
00311
00312 #ifdef RIO_DEBUG2
00313 RioErr << "PID da socketReceiveHandler = " << pthread_self() << endl;
00314 #endif
00315
00316 struct timeval rTimeout;
00317
00318 int ns;
00319 int maxSock;
00320
00321 fd_set auxReadSet;
00322 CommunicationItem *aux;
00323
00324 while( 1 )
00325 {
00326 TableMutexLock();
00327
00328 if( !getTable()->getLastItem() )
00329 maxSock = 0;
00330 else
00331 maxSock = getTable()->getLastItem()->getSocket();
00332
00333 TableMutexUnlock();
00334
00335 if( maxSock == 0 )
00336 {
00337 usleep( 100 );
00338 continue;
00339 }
00340
00341 rTimeout.tv_sec = 1;
00342 rTimeout.tv_usec = 0;
00343
00344 FD_ZERO( &auxReadSet );
00345
00346 TableMutexLock( "sorcvhand2" );
00347 aux = getTable()->getFirstItem();
00348 while( aux )
00349 {
00350 FD_SET( aux->getSocket(), &auxReadSet );
00351 aux = aux->getNext();
00352 }
00353
00354 TableMutexUnlock( "sorcvhand2" );
00355
00356 ns = select( maxSock + 1, &auxReadSet, NULL, (fd_set *)0, &rTimeout );
00357
00358 if( ns > 0 )
00359 {
00360 TableMutexLock( "sorcvhand3" );
00361 aux = getTable()->getFirstItem();
00362
00363 while( aux )
00364 {
00365
00366
00367
00368
00369 CommunicationItem *nextaux = aux->getNext();
00370
00371 if( ( aux->getSocket() > 0 ) &&
00372 ( FD_ISSET( aux->getSocket(), &auxReadSet ) )
00373 )
00374 {
00375 socketReceiveMessageHandler( aux->getSocket() );
00376 }
00377 aux = nextaux;
00378 }
00379
00380 TableMutexUnlock( "sorcvhand3" );
00381 }
00382 else if( ns!= 0 )
00383 {
00384 Rioperror( "socketReceiveHandler:select" );
00385 }
00386 }
00387
00388 #ifdef RIO_DEBUG1
00389 RioErr << "### [CommunicationStream - socketReceiveHandler] Finish" << endl;
00390 #endif
00391 }
00392
00393 void CommunicationStream::socketReceiveMessageHandler( int sd )
00394 {
00395 #ifdef RIO_DEBUG1
00396 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Start" << endl;
00397 #endif
00398
00399 static unsigned int cont = 0;
00400 unsigned char readByte;
00401 unsigned char readSeq[ MAXLEN_MSGSTRUCT ];
00402 InfoClient *NewMember = NULL;
00403 InfoClient *member = NULL;
00404 CommunicationItem *current = NULL;
00405 CommunicationItem *previous = NULL;
00406 int result;
00407 CRioObject *obj;
00408
00409 int i = 0;
00410 int size = 0;
00411 unsigned char *code = 0;
00412
00413 #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00414 int member_mode = UNAVAILABLE;
00415 int leave = 0;
00416 InfoClient *member_next = 0;
00417 #endif
00418
00419
00420
00421
00422
00423 while( read( sd, &readByte, 1 ) )
00424 {
00425 readSeq[ i ] = readByte;
00426
00427
00428 if( i == 0 )
00429 {
00430 code = readSeq;
00431
00432 switch( *code )
00433 {
00434 case MSGCODE_IP:
00435 size = sizeof( msgIP );
00436 break;
00437 case MSGCODE_BLOCK:
00438 size = sizeof( msgBlock );
00439 break;
00440 case MSGCODE_PID:
00441 size = sizeof( msgPID );
00442 break;
00443 case MSGCODE_MOVE:
00444 size = sizeof( msgMove );
00445 break;
00446 case MSGCODE_MODE:
00447 size = sizeof( msgMode );
00448 break;
00449
00450 #ifdef RIO_EMUL
00451 case MSGCODE_FLUXO:
00452 size = sizeof( msgFluxo );
00453 break;
00454 #endif
00455
00456 default:
00457 #ifdef RIO_DEBUG2
00458 RioErr << "[CommMulticast] Erro: Mensagem intrat�vel! ("
00459 << *code << ")" << endl;
00460 #endif
00461
00462 #ifdef RIO_DEBUG1
00463 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 1" << endl;
00464 #endif
00465
00466 return;
00467 break;
00468 }
00469 }
00470
00471 i++;
00472 if( i >= size )
00473 break;
00474 }
00475
00476 if( size == 0 )
00477 {
00478
00479 RioErr << "[CommMulticast]: Removing client socket " << endl;
00480
00481 FD_CLR( sd, readfds );
00482 getTable()->removeSocket( sd );
00483
00484 close( sd );
00485
00486 #ifdef RIO_DEBUG1
00487 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 2" << endl;
00488 #endif
00489
00490 return;
00491 }
00492
00493 if( i < size )
00494 {
00495 #ifdef RIO_DEBUG1
00496 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 3" << endl;
00497 #endif
00498
00499 #ifdef RIO_DEBUG2
00500 RioErr << "[CommunicationStream - socketReceiveMessageHandler] Nao foi "
00501 << "possivel ler a mensagem por completo." << endl;
00502 #endif
00503
00504 return;
00505 }
00506
00507 MessageMutexLock( "SoRecMe1" );
00508
00509 switch( *code )
00510 {
00511
00512 case MSGCODE_IP:
00513 {
00514 msgIP ipData;
00515 memcpy( &ipData, readSeq, sizeof( msgIP ) );
00516
00517 RioErr << "[CommMulticast]: MSGCODE_IP Received" << endl;
00518 RioErr << "[CommMulticast]: IP Address = " << ipData.address << endl;
00519 RioErr << "[CommMulticast]: IP Port = " << ipData.port << endl;
00520 RioErr << "[CommMulticast]: IP Mode = "
00521 << ClientMode2String( ipData.mode ) << endl;
00522 RioErr << "[CommMulticast]: IP BLOCK = " << ipData.block << endl;
00523 RioErr << "[CommMulticast]: IP Action = "
00524 << IPAction2String( ipData.ipAction ) << endl;
00525 RioErr << "[CommMulticast]: IP Target = " << ipData.target << endl;
00526
00527 obj = getObject();
00528
00529 if( !obj )
00530 {
00531 RioErr << "Not found" << endl;
00532 MessageMutexUnlock( "SoRecMe2" );
00533
00534 #ifdef RIO_DEBUG1
00535 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 4" << endl;
00536 #endif
00537
00538 return;
00539 }
00540 else
00541 {
00542 RioErr << "OK (" << cont++ << ")" << endl;
00543 }
00544
00545 result = getTable()->searchSocket( sd, ¤t, &previous );
00546 if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00547 {
00548 RioErr << "[CommMulticast]: Error result communication " << endl;
00549 MessageMutexUnlock( "SoRecMe3" );
00550
00551 #ifdef RIO_DEBUG1
00552 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 5" << endl;
00553 #endif
00554
00555 return;
00556 }
00557
00558
00559 current->setMode( ipData.mode );
00560
00561 if( ( ipData.mode == LEADER ) || ( ipData.mode == SUBLEADER) )
00562 video->first = true;
00563 else
00564 video->first = false;
00565
00566 switch( ipData.ipAction )
00567 {
00568 case LEAVE_THIS:
00569 {
00570 RioErr << "[CommMulticast]: Leave this" << endl;
00571 obj->LeaveGroup( ipData.port );
00572 break;
00573 }
00574
00575 case LEAVE_ALL:
00576 {
00577 RioErr << "[CommMulticast]: Leave all" << endl;
00578 obj->LeaveAllGroups();
00579 break;
00580 }
00581
00582 case JOIN_THIS:
00583 {
00584 RioErr << "[CommMulticast]: Join This" << endl;
00585 int enable_join = 1;
00586 obj->SetMulticastSocket( ipData.port, ipData.address,
00587 callback, bufferStream,
00588 enable_join );
00589 break;
00590 }
00591
00592 case KEEP_THIS:
00593 {
00594 RioErr << "[CommMulticast]: Keep This" << endl;
00595 break;
00596 }
00597
00598 default:
00599 RioErr << "[CommMulticast]: Invalid mode "
00600 << ipData.ipAction << "!!" << endl;
00601 break;
00602 }
00603
00604 if( ipData.block > -1 )
00605 {
00606
00607 RioErr << "[CommMulticast]: Delta before: "
00608 << ipData.block << endl;
00609
00610 video->SetNextRequestBlock( ipData.block );
00611 }
00612
00613 if( ipData.target > -1 )
00614 {
00615
00616 RioErr << "[CommMulticast]: New client target: "
00617 << ipData.target << endl;
00618
00619 video->SetTarget( ipData.target );
00620
00621
00622
00623
00624
00625
00626
00627
00628 if( ( ipData.target != INT_MAX ) && !video->isPatching() )
00629 video->Stream_Control( "plsoreme1" );
00630
00631 else if( ( ipData.target == INT_MAX ) && video->isPatching() )
00632 video->Stream_Control( "plsoreme2" );
00633 }
00634
00635
00636
00637
00638
00639 if( !( ( ipData.ipAction == LEAVE_ALL ) &&
00640 ( ipData.mode != INACTIVE )
00641 )
00642 )
00643 {
00644 video->MulticastMutexLock( "comtcs1" );
00645 video->MulticastReleaseCondition( "comtcs1" );
00646 video->MulticastMutexUnlock( "comtcs1" );
00647 }
00648
00649 RioErr << "[CommMulticast]: BroadCast End" << endl;
00650 break;
00651 }
00652
00653
00654 case MSGCODE_MOVE:
00655 {
00656 msgMove moveData;
00657 memcpy( &moveData, readSeq, sizeof( msgMove ) );
00658
00659 RioErr << "[CommMulticast] MSGCODE_MOVE Received..." << endl;
00660
00661 if( getPLSession() == NULL )
00662 {
00663 MessageMutexUnlock( "SoRecMe4" );
00664
00665 #ifdef RIO_DEBUG1
00666 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 6" << endl;
00667 #endif
00668
00669 return;
00670 }
00671
00672 result = getTable()->searchSocket( sd, ¤t, &previous );
00673 if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00674 {
00675 MessageMutexUnlock( "SoRecMe5" );
00676
00677 #ifdef RIO_DEBUG1
00678 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 7" << endl;
00679 #endif
00680
00681 return;
00682 }
00683
00684 getPLSession()->MoveMutexLock( "comtcs2" );
00685 member = getPLSession()->FindClient( current->getPid() );
00686 getPLSession()->MoveMutexUnlock( "comtcs2" );
00687
00688 if( !member )
00689 {
00690 RioErr << "[CommMulticast]: member " << current->getPid()
00691 << " nao encontrado. Saindo da "
00692 << "socketReceiveMessageHandler..." << endl;
00693
00694 MessageMutexUnlock( "SoRecMe6" );
00695
00696 #ifdef RIO_DEBUG1
00697 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 8" << endl;
00698 #endif
00699
00700 return;
00701 }
00702
00703 #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00704 member_mode = member->mode;
00705
00706 if( ( moveData.action == ACTION_PAUSE ) ||
00707 ( moveData.action == ACTION_STOP ) ||
00708 ( moveData.block < 0 )
00709 )
00710 {
00711 leave = 1;
00712 }
00713 #endif
00714
00715 RioErr << "[CommMulticast]: MOVE CLIENT" << endl;
00716 RioErr << "[CommMulticast]: PID " << current->getPid() << endl;
00717 RioErr << "[CommMulticast]: GROUP " << member->multicast_port
00718 << endl;
00719 RioErr << "[CommMulticast]: action = "
00720 << MoveAction2String( moveData.action ) << endl;
00721 RioErr << "[CommMulticast]: block = " << moveData.block
00722 << endl;
00723 RioErr << "[CommMulticast]: mode = "
00724 << ClientMode2String( member->mode ) << endl;
00725
00726 result = getPLSession()->AllocClient( member->PIDClientThread,
00727 &moveData );
00728
00729 if( result == PL_ERROR )
00730 {
00731 MessageMutexUnlock( "SoRecMe7" );
00732
00733 #ifdef RIO_DEBUG1
00734 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 9" << endl;
00735 #endif
00736
00737 return;
00738 }
00739
00740 #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00741
00742
00743 if( ( member_mode == LEADER ) &&
00744 ( member->mode != LEADER ) &&
00745 ( member_next == NULL )
00746 )
00747 {
00748 pthread_mutex_lock( &log_mutex );
00749 multicast_number--;
00750 pthread_mutex_unlock( &log_mutex );
00751 }
00752
00753 gettimeofday( &time_list, 0 );
00754
00755 final_time.tv_sec = time_list.tv_sec - initial_time.tv_sec;
00756 final_time.tv_usec = time_list.tv_usec - initial_time.tv_usec;
00757
00758 if( final_time.tv_usec < 0 )
00759 {
00760 final_time.tv_sec -= 1;
00761 final_time.tv_usec += 1000000;
00762 }
00763 calculeted_time = ( unsigned int )(final_time.tv_sec * 1000.0 +
00764 final_time.tv_usec/1000.0);
00765 calculeted_time = calculeted_time/1000;
00766
00767 if( ( !leave ) && ( member->mode == LEADER ) )
00768 {
00769 pthread_mutex_lock( &log_mutex );
00770 multicast_number++;
00771 pthread_mutex_unlock( &log_mutex );
00772 }
00773
00774 m_log_fluxo << " Multicast " << multicast_number
00775 << " Patching " << patching_number
00776 << " Total_fluxo "
00777 << multicast_number + patching_number
00778 << " Tempo " << calculeted_time << endl;
00779 #endif
00780
00781 #ifdef RIO_DEBUG2
00782 RioErr << "OK (" << cont++ << ")" << endl;
00783 #endif
00784
00785 break;
00786 }
00787
00788
00789 case MSGCODE_PID:
00790 {
00791 msgPID pidData;
00792 memcpy( &pidData, readSeq, sizeof( msgPID ) );
00793
00794 RioErr << "[CommMulticast] MSGCODE_PID Received..." << endl;
00795 RioErr << "[CommMulticast] PID: " << pidData.pid << endl;
00796
00797 getTable()->changeSocketPid( sd, pidData.pid );
00798
00799 if( getPLSession() == NULL )
00800 {
00801 MessageMutexUnlock( "SoRecMe8" );
00802
00803 #ifdef RIO_DEBUG1
00804 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 10" << endl;
00805 #endif
00806
00807 return;
00808 }
00809
00810 getPLSession()->MoveMutexLock( "comtcs3" );
00811 NewMember = getPLSession()->FindClient( pidData.pid );
00812 getPLSession()->MoveMutexUnlock( "comtcs3" );
00813
00814 #ifdef RIO_DEBUG2
00815 RioErr << "[CommMulticast]: Mensagem MSGCODE_IP enviada para o "
00816 << "cliente " << NewMember->PIDClientThread << " (mode "
00817 << ClientMode2String( NewMember->mode ) << ", group "
00818 << NewMember->multicast_port << ", ip "
00819 << NewMember->multicast_ip << ", block -1, action KEEP_THIS)"
00820 << endl;
00821 #endif
00822
00823
00824
00825
00826
00827
00828
00829 sendIPMsg( NewMember->PIDClientThread,
00830 NewMember->multicast_ip,
00831 NewMember->multicast_port,
00832 NewMember->mode );
00833
00834 #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
00835 if( NewMember->mode == LEADER )
00836 {
00837 pthread_mutex_lock( &log_mutex );
00838 multicast_number++;
00839 pthread_mutex_unlock( &log_mutex );
00840 }
00841 #endif
00842
00843 break;
00844 }
00845
00846
00847 case MSGCODE_MODE:
00848 {
00849
00850
00851
00852
00853
00854
00855
00856 msgMode modeData;
00857 memcpy( &modeData, readSeq, sizeof( msgMode ) );
00858
00859 result = getTable()->searchSocket( sd, ¤t, &previous );
00860 if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00861 {
00862 RioErr << "[CommMulticast] Erro!!! Cliente n�o encontrado na "
00863 << "tabela" << endl;
00864
00865 MessageMutexUnlock( "SoRecMe9" );
00866
00867 #ifdef RIO_DEBUG1
00868 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 11" << endl;
00869 #endif
00870
00871 return;
00872 }
00873
00874 if( video->waitingMSGCODE_IP )
00875 {
00876 #ifdef RIO_DEBUG2
00877 RioErr << "[CommMulticast] MSGCODE_MODE Received and will be ignored..." << endl;
00878 RioErr << "[CommMulticast]: Setaria mode para "
00879 << ClientMode2String( modeData.mode ) << endl;
00880 RioErr << "[CommMulticast]: Setaria Bloco para "
00881 << ( int )modeData.block << endl;
00882 #endif
00883
00884 MessageMutexUnlock( "SoRecMe9" );
00885 return;
00886 }
00887
00888 #ifdef RIO_DEBUG2
00889 RioErr << "[CommMulticast] MSGCODE_MODE Received..." << endl;
00890 RioErr << "[CommMulticast]: Setando mode para "
00891 << ClientMode2String( modeData.mode ) << endl;
00892 RioErr << "[CommMulticast]: Setando group_block para "
00893 << ( int )modeData.block << endl;
00894 #endif
00895
00896 if( modeData.mode == LEADER )
00897 {
00898 video->first = true;
00899 video->group_block = ( int )modeData.block;
00900
00901
00902
00903
00904
00905 video->SetTarget( INT_MAX );
00906
00907
00908 if( video->isPatching() )
00909 video->Stream_Control( "plsoreme3" );
00910
00911 if( current->getMode() == PASSIVE )
00912 {
00913
00914
00915
00916 burstRequest( modeData.block );
00917 }
00918 }
00919 else if( modeData.mode == SUBLEADER )
00920 {
00921 video->first = true;
00922 video->group_block = ( int )modeData.block;
00923
00924
00925
00926
00927
00928
00929
00930 RioErr << "[CommMulticast]: New client target: "
00931 << modeData.target << endl;
00932
00933 video->SetTarget( modeData.target );
00934
00935
00936 if( !video->isPatching() )
00937 video->Stream_Control( "plsoreme4" );
00938 }
00939 else
00940 {
00941
00942
00943
00944 video->first = false;
00945 video->group_block = -1;
00946 video->SetTarget( INT_MAX );
00947
00948
00949 if( video->isPatching() )
00950 video->Stream_Control( "plsoreme5" );
00951 }
00952
00953 current->setMode( modeData.mode );
00954
00955 break;
00956 }
00957
00958 #if defined(RIO_DEBUG2) && defined(RIO_EMUL)
00959 case MSGCODE_FLUXO:
00960 {
00961 msgFluxo fluxoData;
00962 memcpy( &fluxoData, readSeq, sizeof( msgFluxo ) );
00963
00964 m_log_fluxo << " " << endl;
00965
00966 m_log_fluxo << "Recebimento de mensagem de mudanca de fluxo "
00967 << endl;
00968 result = getTable()->searchSocket( sd, ¤t, &previous );
00969 if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
00970 {
00971 MessageMutexUnlock( "SoRecMe10" );
00972
00973 #ifdef RIO_DEBUG1
00974 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 12" << endl;
00975 #endif
00976
00977 return;
00978 }
00979
00980 gettimeofday( &time_list, 0 );
00981
00982 final_time.tv_sec = time_list.tv_sec - initial_time.tv_sec;
00983 final_time.tv_usec = time_list.tv_usec - initial_time.tv_usec;
00984
00985 if( final_time.tv_usec < 0 )
00986 {
00987 final_time.tv_sec -= 1;
00988 final_time.tv_usec += 1000000;
00989 }
00990
00991 calculeted_time = ( unsigned int ) (final_time.tv_sec * 1000.0 +
00992 final_time.tv_usec/1000.0);
00993
00994 calculeted_time = calculeted_time/1000;
00995
00996 pthread_mutex_lock( &log_mutex );
00997 if( fluxoData.patching == 1 )
00998 {
00999 patching_number++;
01000 m_log_fluxo << "Cliente entrando no patching. "
01001 << " patching_number depois: "
01002 << patching_number << endl;
01003 }
01004 else
01005 {
01006 patching_number--;
01007 m_log_fluxo << "Cliente saindo no patching. "
01008 << " patching_number depois: "
01009 << patching_number << endl;
01010 }
01011
01012 m_log_fluxo << " Multicast " << multicast_number
01013 << " Patching " << patching_number
01014 << " Total_fluxo "
01015 << multicast_number + patching_number
01016 << " Tempo " << calculeted_time << endl;
01017
01018 getPLSession()->MoveMutexLock( "msgcdflux" );
01019 getPLSession()->PrintLogFile();
01020 getPLSession()->MoveMutexUnlock( "msgcdflux" );
01021
01022 pthread_mutex_unlock( &log_mutex );
01023
01024 break;
01025 }
01026 #endif
01027 }
01028
01029 MessageMutexUnlock( "SoRecMe" );
01030
01031 #ifdef RIO_DEBUG1
01032 RioErr << "### [CommunicationStream - socketReceiveMessageHandler] Finish 13" << endl;
01033 #endif
01034 }
01035
01036 #if defined(RIO_DEBUG2) || defined(RIO_EMUL)
01037 int CommunicationStream::GetNPatchingStream()
01038 {
01039 #ifdef RIO_DEBUG1
01040 RioErr << "### [CommunicationStream - GetNPatchingStream] Single" << endl;
01041 #endif
01042
01043 return( patching_number );
01044 }
01045 #endif
01046
01047 void CommunicationStream::sendIPMsgToSocket( int sd, char *address,
01048 unsigned short port,
01049 ClientMode mode, int block )
01050 {
01051 #ifdef RIO_DEBUG1
01052 RioErr << "### [CommunicationStream - sendIPMsgToSocket] Start" << endl;
01053 #endif
01054
01055 int escritos;
01056 msgIP msg;
01057
01058 msg.code = MSGCODE_IP;
01059 strcpy( msg.address, address );
01060 msg.port = port;
01061 msg.mode = mode;
01062 msg.block = block;
01063
01064 escritos = write( sd, &msg, sizeof( msgIP ) );
01065
01066 if( escritos != sizeof( msgIP ) )
01067 {
01068 RioErr << "CommunicationStream::sendIPMsgToSocket erro ao escrever o "
01069 << "valor de msgIP: escritos "<< escritos << " bytes de um "
01070 << "total de " << sizeof( msgIP ) << " bytes." << endl;
01071 }
01072 else if( escritos == -1 )
01073 {
01074 RioErr << "CommunicationStream::sendIPMsgToSocket erro " << errno
01075 << "(" << strerror( errno ) << ") ao escrever o valor de msgIP"
01076 << endl;
01077 }
01078
01079 #ifdef RIO_DEBUG1
01080 RioErr << "### [CommunicationStream - sendIPMsgToSocket] Finish" << endl;
01081 #endif
01082 }
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104 void CommunicationStream::sendPIDMsgToSocket( int sd, int myPid )
01105 {
01106 #ifdef RIO_DEBUG1
01107 RioErr << "### [CommunicationStream - sendPIDMsgToSocket] Start" << endl;
01108 #endif
01109
01110 int escritos;
01111 msgPID msg;
01112 msg.code = MSGCODE_PID;
01113 msg.pid = myPid;
01114
01115 escritos = write( sd, &msg, sizeof( msgPID ) );
01116
01117 if( escritos != sizeof( msgPID ) )
01118 {
01119 RioErr << "CommunicationStream::sendPIDMsgToSocket erro ao escrever o "
01120 << "valor de msgPID: escritos "<< escritos << " bytes de um "
01121 << "total de " << sizeof( msgPID ) << " bytes." << endl;
01122 }
01123 else if( escritos == -1 )
01124 {
01125 RioErr << "CommunicationStream::sendPIDMsgToSocket erro " << errno
01126 << "(" << strerror( errno ) << ") ao escrever o valor de msgPID"
01127 << endl;
01128 }
01129
01130 #ifdef RIO_DEBUG1
01131 RioErr << "### [CommunicationStream - sendPIDMsgToSocket] Finish" << endl;
01132 #endif
01133 }
01134
01135 #ifdef RIO_EMUL
01136 void CommunicationStream::sendFluxoMsg( int pid, int patch, int multicast )
01137 {
01138 #ifdef RIO_DEBUG1
01139 RioErr << "### [CommunicationStream - sendFluxoMsg] Start" << endl;
01140 #endif
01141
01142 msgFluxo msg;
01143 int result;
01144 int escritos;
01145 CommunicationItem *current = NULL;
01146 CommunicationItem *previous = NULL;
01147
01148 msg.code = MSGCODE_FLUXO;
01149 msg.patching = patch;
01150 msg.multicast = multicast;
01151 msg.pid = pid;
01152
01153 result = getTable()->searchPid( pid, ¤t, &previous );
01154
01155 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01156 {
01157 escritos = write( current->getSocket(), &msg, sizeof( msgFluxo ) );
01158
01159 if( escritos != sizeof( msgFluxo ) )
01160 {
01161 RioErr << "CommunicationStream::sendFluxoMsg erro ao escrever "
01162 << "o valor de msgFluxo: escritos "<< escritos << " bytes "
01163 << "de um total de " << sizeof( msgFluxo ) << " bytes."
01164 << endl;
01165 }
01166 else if( escritos == -1 )
01167 {
01168 RioErr << "CommunicationStream::sendFluxoMsg erro " << errno
01169 << "(" << strerror( errno ) << ") ao escrever o valor de "
01170 << "msgFluxo" << endl;
01171 }
01172
01173 }
01174
01175 #ifdef RIO_DEBUG1
01176 RioErr << "### [CommunicationStream - sendFluxoMsg] Finish" << endl;
01177 #endif
01178 }
01179 #endif
01180
01181 void CommunicationStream::sendModeMsg( int pid, ClientMode mode, int block,
01182 int target )
01183 {
01184 #ifdef RIO_DEBUG1
01185 RioErr << "### [CommunicationStream - sendModeMsg] Start" << endl;
01186 #endif
01187
01188 #ifdef RIO_DEBUG2
01189 RioErr << "[sendModeMsg] Sending mode message to client " << pid
01190 << ", mode " << ClientMode2String( mode ) << ", block " << block
01191 << ", target " << target << "." << endl;
01192 #endif
01193
01194 int result;
01195 int escritos;
01196 msgMode msg;
01197 CommunicationItem *current = NULL;
01198 CommunicationItem *previous = NULL;
01199
01200 msg.code = MSGCODE_MODE;
01201 msg.mode = mode;
01202 msg.block = block;
01203 msg.target = target;
01204
01205 result = getTable()->searchPid( pid, ¤t, &previous );
01206
01207 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01208 {
01209 escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01210
01211 if( escritos != sizeof( msg ) )
01212 {
01213 RioErr << "CommunicationStream::sendModeMsg erro ao escrever "
01214 << "o valor de msg: escritos "<< escritos << " bytes "
01215 << "de um total de " << sizeof( msg ) << " bytes." << endl;
01216 }
01217 else if( escritos == -1 )
01218 {
01219 RioErr << "CommunicationStream::sendModeMsg erro " << errno
01220 << "(" << strerror( errno ) << ") ao escrever o valor de "
01221 << "msg" << endl;
01222 }
01223
01224 }
01225
01226 #ifdef RIO_DEBUG1
01227 RioErr << "### [CommunicationStream - sendModeMsg] Finish" << endl;
01228 #endif
01229 }
01230
01231 void CommunicationStream::sendIPMsg( int pid, char *address,
01232 unsigned short port, ClientMode mode,
01233 int block, IPAction ipAction,
01234 int target )
01235 {
01236 #ifdef RIO_DEBUG1
01237 RioErr << "### [CommunicationStream - sendIPMsg] Start" << endl;
01238 #endif
01239
01240 #ifdef RIO_DEBUG2
01241 RioErr << "[sendIPMsg] Sending message to client " << pid << ", address "
01242 << address << ", port " << port << ", mode "
01243 << ClientMode2String( mode ) << ", block " << block << ", IPaction "
01244 << IPAction2String( ipAction ) << ", target " << target << "." << endl;
01245 #endif
01246
01247 int result;
01248 int escritos;
01249 msgIP msg;
01250 CommunicationItem *current = NULL;
01251 CommunicationItem *previous = NULL;
01252
01253 memset( &msg, 0, sizeof( msg ) );
01254 msg.code = MSGCODE_IP;
01255 strcpy( msg.address, address );
01256 msg.port = port;
01257 msg.mode = mode;
01258 msg.block = block;
01259 msg.target = target;
01260 msg.ipAction = ipAction;
01261
01262 result = getTable()->searchPid( pid, ¤t, &previous );
01263
01264 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01265 {
01266 escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01267
01268 if( escritos != sizeof( msg ) )
01269 {
01270 RioErr << "CommunicationStream::sendIPMsg erro ao escrever "
01271 << "o valor de msg: escritos " << escritos << " bytes "
01272 << "de um total de " << sizeof( msg ) << " bytes." << endl;
01273 }
01274 else if( escritos == -1 )
01275 {
01276 RioErr << "CommunicationStream::sendIPMsg erro " << errno
01277 << "(" << strerror( errno ) << ") ao escrever o valor de "
01278 << "msg" << endl;
01279 }
01280
01281 }
01282
01283 #ifdef RIO_DEBUG1
01284 RioErr << "### [CommunicationStream - sendIPMsg] Finish" << endl;
01285 #endif
01286 }
01287
01288 void CommunicationStream::sendMoveMsg( int pid, MoveAction action, int block )
01289 {
01290 #ifdef RIO_DEBUG1
01291 RioErr << "### [CommunicationStream - sendMoveMsg] Start" << endl;
01292 #endif
01293
01294 #ifdef RIO_DEBUG2
01295 RioErr << "[sendMoveMsg] Sending move message to PL: action "
01296 << MoveAction2String( action ) << ", block " << block
01297 << "." << endl;
01298 #endif
01299
01300 int result;
01301 int escritos;
01302 msgMove msg;
01303 CommunicationItem *current = NULL;
01304 CommunicationItem *previous = NULL;
01305
01306 msg.code = MSGCODE_MOVE;
01307 msg.action = action;
01308 msg.block = block;
01309
01310 result = getTable()->searchPid( pid, ¤t, &previous );
01311
01312 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01313 {
01314 escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01315
01316 if( escritos != sizeof( msg ) )
01317 {
01318 RioErr << "CommunicationStream::sendMoveMsg erro ao escrever "
01319 << "o valor de msg: escritos "<< escritos << " bytes "
01320 << "de um total de " << sizeof( msg ) << " bytes." << endl;
01321 }
01322 else if( escritos == -1 )
01323 {
01324 RioErr << "CommunicationStream::sendMoveMsg erro " << errno
01325 << "(" << strerror( errno ) << ") ao escrever o valor de "
01326 << "msg" << endl;
01327 }
01328
01329 }
01330
01331 #ifdef RIO_DEBUG1
01332 RioErr << "### [CommunicationStream - sendMoveMsg] Finish" << endl;
01333 #endif
01334 }
01335
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365
01366
01367
01368
01369 void CommunicationStream::sendPIDMsg( int myPid )
01370 {
01371 #ifdef RIO_DEBUG1
01372 RioErr << "### [CommunicationStream - sendPIDMsg] Start" << endl;
01373 #endif
01374
01375 #ifdef RIO_DEBUG2
01376 RioErr << "[sendPIDMsg] Sending my PID to PL: " << myPid << "." << endl;
01377 #endif
01378
01379 int result;
01380 int escritos;
01381 msgPID msg;
01382 CommunicationItem *current = NULL;
01383 CommunicationItem *previous = NULL;
01384
01385 msg.code = MSGCODE_PID;
01386 msg.pid = myPid;
01387
01388 result = getTable()->searchPid( myPid, ¤t, &previous );
01389
01390 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01391 {
01392 escritos = write( current->getSocket(), &msg, sizeof( msg ) );
01393
01394 if( escritos != sizeof( msg ) )
01395 {
01396 RioErr << "CommunicationStream::sendPIDMsg erro ao escrever "
01397 << "o valor de msg: escritos "<< escritos << " bytes "
01398 << "de um total de " << sizeof( msg ) << " bytes." << endl;
01399 }
01400 else if( escritos == -1 )
01401 {
01402 RioErr << "CommunicationStream::sendPIDMsg erro " << errno
01403 << "(" << strerror( errno ) << ") ao escrever o valor de "
01404 << "msg" << endl;
01405 }
01406
01407 }
01408
01409 #ifdef RIO_DEBUG1
01410 RioErr << "### [CommunicationStream - sendPIDMsg] Finish" << endl;
01411 #endif
01412 }
01413
01414 void CommunicationStream::setTable( CommunicationTable *table )
01415 {
01416
01417
01418
01419 this->table = table;
01420 }
01421
01422 CommunicationTable *CommunicationStream::getTable()
01423 {
01424
01425
01426
01427
01428 return table;
01429 }
01430
01431 void CommunicationStream::burstRequest( int block )
01432 {
01433 #ifdef RIO_DEBUG1
01434 RioErr << "### [CommunicationStream - burstRequest] Start" << endl;
01435 #endif
01436
01437
01438
01439
01440
01441
01442 RioBlock clientNextRequestBlock = video->GetNextRequestBlock();
01443
01444 if( ( block > -1 ) &&
01445 ( clientNextRequestBlock > (RioBlock) block )
01446 )
01447 {
01448 #ifdef RIO_DEBUG2
01449 RioErr << "[CommMulticastStream] Requisitando blocos entre "
01450 << block << " e " << clientNextRequestBlock
01451 << " pois ultrapassei o lider e roubei sua lideran�a"
01452 << endl;
01453 #endif
01454
01455
01456
01457
01458
01459 bool readOk = true;
01460 for( RioBlock i = block + 1;
01461 ( i < clientNextRequestBlock ) && readOk;
01462 i++
01463 )
01464 {
01465 readOk = readOk && video->MulticastRead( i,
01466 MULTICASTTRAFFIC );
01467
01468 #ifdef RIO_DEBUG2
01469 if( !readOk )
01470 RioErr << "[burstRequest] Erro no pedido de bloco " << i
01471 << endl;
01472 else
01473 RioErr << "[burstRequest] Bloco " << i << " pedido com sucesso."
01474 << endl;
01475 #endif
01476 }
01477
01478
01479
01480
01481
01482 video->group_block = -1;
01483
01484
01485
01486
01487 if( video->isPatching() )
01488 video->Stream_Control( "plbure1" );
01489
01490 }
01491
01492 #ifdef RIO_DEBUG1
01493 RioErr << "### [CommunicationStream - burstRequest] Finish" << endl;
01494 #endif
01495 }
01496
01497 void CommunicationStream::MessageMutexLock( string msg )
01498 {
01499 #ifdef RIO_DEBUG1
01500 RioErr << "### [CommunicationStream - MessageMutexLock] Start" << endl;
01501 #endif
01502
01503 #ifdef RIO_DEBUG2
01504 if( msg.length() )
01505 RioErr << "[MessageMutexLock] " << msg << "... " << endl;
01506 #endif
01507
01508 pthread_mutex_lock( &MessageMutex );
01509
01510 #ifdef RIO_DEBUG2
01511 if( msg.length() )
01512 RioErr << "..." << msg << " OK!" << endl;
01513 #endif
01514
01515 #ifdef RIO_DEBUG1
01516 RioErr << "### [CommunicationStream - MessageMutexLock] Finish" << endl;
01517 #endif
01518 }
01519
01520 void CommunicationStream::MessageMutexUnlock( string msg )
01521 {
01522 #ifdef RIO_DEBUG1
01523 RioErr << "### [CommunicationStream - MessageMutexUnlock] Start" << endl;
01524 #endif
01525
01526 #ifdef RIO_DEBUG2
01527 if( msg.length() )
01528 RioErr << "[MessageMutexUnlock] " << msg << endl;
01529 #endif
01530
01531 pthread_mutex_unlock( &MessageMutex );
01532
01533 #ifdef RIO_DEBUG1
01534 RioErr << "### [CommunicationStream - MessageMutexUnlock] Finish" << endl;
01535 #endif
01536 }
01537
01538 void CommunicationStream::TableMutexLock( string msg )
01539 {
01540
01541
01542
01543
01544 #ifdef RIO_DEBUG2
01545 if( msg.length() )
01546 RioErr << "[TableMutexLock] " << msg << "... " << endl;
01547 #endif
01548
01549 pthread_mutex_lock( &TableMutex );
01550
01551 #ifdef RIO_DEBUG2
01552 if( msg.length() )
01553 RioErr << "..." << msg << " OK!" << endl;
01554 #endif
01555
01556
01557
01558
01559 }
01560
01561 void CommunicationStream::TableMutexUnlock( string msg )
01562 {
01563
01564
01565
01566
01567 #ifdef RIO_DEBUG2
01568 if( msg.length() )
01569 RioErr << "[TableMutexUnlock] " << msg << endl;
01570 #endif
01571
01572 pthread_mutex_unlock( &TableMutex );
01573
01574
01575
01576
01577 }
01578
01579 string CommunicationStream::MoveAction2String( MoveAction action )
01580 {
01581 #ifdef RIO_DEBUG1
01582 RioErr << "### [CommunicationStream - MoveAction2String] Start" << endl;
01583 #endif
01584
01585 string result;
01586
01587 switch( action )
01588 {
01589 case ACTION_PLAY: result = "PLAY"; break;
01590 case ACTION_STOP: result = "STOP"; break;
01591 case ACTION_PAUSE: result = "PAUSE"; break;
01592 case ACTION_FORWARD: result = "FORWARD"; break;
01593 case ACTION_REWIND: result = "REWIND"; break;
01594 case ACTION_READBUFFER: result = "READBUFFER"; break;
01595 case ACTION_REACHLEADER: result = "REACHLEADER"; break;
01596 default: result = "UNKNOWN!!!"; break;
01597 }
01598
01599 #ifdef RIO_DEBUG1
01600 RioErr << "### [CommunicationStream - MoveAction2String] Finish" << endl;
01601 #endif
01602
01603 return result;
01604 }
01605
01606 string CommunicationStream::ClientMode2String( ClientMode mode )
01607 {
01608
01609
01610
01611
01612 string result;
01613
01614 switch( mode )
01615 {
01616 case PASSIVE: result = "PASSIVE"; break;
01617 case LEADER: result = "LEADER"; break;
01618 case SUBLEADER: result = "SUBLEADER"; break;
01619 case INACTIVE: result = "INACTIVE"; break;
01620 case UNAVAILABLE: result = "UNAVAILABLE"; break;
01621 default: result = "UNKNOWN!!!"; break;
01622 }
01623
01624
01625
01626
01627
01628 return result;
01629 }
01630
01631 string CommunicationStream::IPAction2String( IPAction action )
01632 {
01633
01634
01635
01636
01637 string result;
01638
01639 switch( action )
01640 {
01641 case JOIN_THIS: result = "JOIN_THIS"; break;
01642 case LEAVE_THIS: result = "LEAVE_THIS"; break;
01643 case LEAVE_ALL: result = "LEAVE_ALL"; break;
01644 case KEEP_THIS: result = "KEEP_THIS"; break;
01645 default: result = "UNKNOWN!!!"; break;
01646 }
01647
01648
01649
01650
01651
01652 return result;
01653 }
01654
01655
01656
01657
01658
01659
01660 CommunicationTable::CommunicationTable()
01661 {
01662 #ifdef RIO_DEBUG1
01663 RioErr << "### [CommunicationTable - Constructor] Start" << endl;
01664 #endif
01665
01666
01667 firstItem = NULL;
01668 lastItem = NULL;
01669
01670 #ifdef RIO_DEBUG1
01671 RioErr << "### [CommunicationTable - Constructor] Finish" << endl;
01672 #endif
01673 }
01674
01675 int CommunicationTable::insert( CommunicationItem *item )
01676 {
01677 #ifdef RIO_DEBUG1
01678 RioErr << "### [CommunicationTable - insert] Start" << endl;
01679 #endif
01680
01681 int result;
01682 CommunicationItem *current;
01683 CommunicationItem *previous;
01684
01685 if( item == NULL )
01686 {
01687 #ifdef RIO_DEBUG1
01688 RioErr << "### [CommunicationTable - insert] Finish 1" << endl;
01689 #endif
01690
01691 return RESULT_COMMUNICATION_INSERT_ERROR;
01692 }
01693
01694 current = NULL;
01695 previous = NULL;
01696
01697 result = searchSocket( item->getSocket(), ¤t, &previous );
01698
01699 if( result == RESULT_COMMUNICATION_SEARCH_EMPTY_LIST )
01700 {
01701 firstItem = item;
01702 lastItem = item;
01703
01704 #ifdef RIO_DEBUG1
01705 RioErr << "### [CommunicationTable - insert] Finish 2" << endl;
01706 #endif
01707
01708 return RESULT_COMMUNICATION_INSERT_OK;
01709 }
01710
01711 if( result == RESULT_COMMUNICATION_SEARCH_NOT_FOUND )
01712 {
01713 item->setNext( current );
01714 if( previous )
01715 {
01716 previous->setNext( item );
01717 }
01718 else
01719 firstItem = item;
01720
01721
01722 if( current == NULL )
01723 {
01724 lastItem = item;
01725 }
01726
01727 #ifdef RIO_DEBUG1
01728 RioErr << "### [CommunicationTable - insert] Finish 3" << endl;
01729 #endif
01730
01731 return RESULT_COMMUNICATION_INSERT_OK;
01732 }
01733
01734 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01735 {
01736 #ifdef RIO_DEBUG1
01737 RioErr << "### [CommunicationTable - insert] Finish 4" << endl;
01738 #endif
01739
01740 return RESULT_COMMUNICATION_INSERT_ALREADY_EXIST;
01741 }
01742
01743 #ifdef RIO_DEBUG1
01744 RioErr << "### [CommunicationTable - insert] Finish 5" << endl;
01745 #endif
01746
01747 return RESULT_COMMUNICATION_INSERT_ERROR;
01748 }
01749
01750 int CommunicationTable::insertNullPid( int socket )
01751 {
01752 #ifdef RIO_DEBUG1
01753 RioErr << "### [CommunicationTable - insertNullPid] Start" << endl;
01754 #endif
01755
01756 int result;
01757 CommunicationItem *current = NULL;
01758 CommunicationItem *previous = NULL;
01759
01760 result = searchSocket( socket, ¤t, &previous );
01761
01762 if( result == RESULT_COMMUNICATION_SEARCH_EMPTY_LIST ||
01763 result == RESULT_COMMUNICATION_SEARCH_NOT_FOUND
01764 )
01765 {
01766 CommunicationItem *item = new CommunicationItem( 0, socket );
01767 insert( item );
01768 }
01769
01770 #ifdef RIO_DEBUG1
01771 RioErr << "### [CommunicationTable - insertNullPid] Finish 1" << endl;
01772 #endif
01773
01774 return RESULT_COMMUNICATION_INSERT_ERROR;
01775 }
01776
01777 int CommunicationTable::changeSocketPid( int socket, int pid )
01778 {
01779 #ifdef RIO_DEBUG1
01780 RioErr << "### [CommunicationTable - changeSocketPid] Start" << endl;
01781 #endif
01782
01783 int result;
01784 CommunicationItem *current = NULL;
01785 CommunicationItem *previous = NULL;
01786
01787 result = searchSocket( socket, ¤t, &previous );
01788
01789 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01790 {
01791 current->setPid( pid );
01792
01793 #ifdef RIO_DEBUG1
01794 RioErr << "### [CommunicationTable - insertNullPid] Finish 1" << endl;
01795 #endif
01796
01797 return RESULT_COMMUNICATION_CHANGE_OK;
01798 }
01799
01800 #ifdef RIO_DEBUG1
01801 RioErr << "### [CommunicationTable - insertNullPid] Finish 2" << endl;
01802 #endif
01803
01804 return RESULT_COMMUNICATION_CHANGE_ERROR;
01805 }
01806
01807
01808
01809
01810
01811
01812
01813
01814
01815
01816
01817
01818
01819
01820
01821
01822
01823
01824
01825
01826
01827
01828
01829
01830
01831
01832
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850
01851
01852
01853
01854
01855
01856
01857
01858
01859
01860
01861
01862
01863
01864
01865
01866
01867
01868
01869 int CommunicationTable::removeSocket( int socket )
01870 {
01871 #ifdef RIO_DEBUG1
01872 RioErr << "### [CommunicationTable - removeSocket] Start" << endl;
01873 #endif
01874
01875 int result;
01876 CommunicationItem *current = NULL;
01877 CommunicationItem *previous = NULL;
01878
01879 result = searchSocket( socket, ¤t, &previous );
01880
01881 if( result == RESULT_COMMUNICATION_SEARCH_NOT_FOUND )
01882 {
01883 #ifdef RIO_DEBUG1
01884 RioErr << "### [CommunicationTable - removeSocket] Finish 1" << endl;
01885 #endif
01886
01887 return RESULT_COMMUNICATION_REMOVE_NOT_FOUND;
01888 }
01889
01890 if( result == RESULT_COMMUNICATION_SEARCH_FOUND )
01891 {
01892 if( previous == NULL )
01893 firstItem = current->getNext();
01894 else
01895 previous->setNext( current->getNext() );
01896
01897 if( current->getNext() == NULL )
01898 lastItem = previous;
01899
01900
01901
01902 current->setNext( NULL );
01903 delete( current );
01904 }
01905
01906 #ifdef RIO_DEBUG1
01907 RioErr << "### [CommunicationTable - removeSocket] Finish 2" << endl;
01908 #endif
01909
01910 return RESULT_COMMUNICATION_REMOVE_ERROR;
01911 }
01912
01913 ClientMode CommunicationTable::searchPidForMode( int pid )
01914 {
01915 #ifdef RIO_DEBUG1
01916 RioErr << "### [CommunicationTable - searchPidForMode] Start" << endl;
01917 #endif
01918
01919 int result;
01920 CommunicationItem *current = NULL;
01921 CommunicationItem *previous = NULL;
01922
01923 result = searchPid( pid, ¤t, &previous );
01924
01925 if( result != RESULT_COMMUNICATION_SEARCH_FOUND )
01926 {
01927 #ifdef RIO_DEBUG1
01928 RioErr << "### [CommunicationTable - searchPidForMode] Finish 1" << endl;
01929 #endif
01930
01931 return UNAVAILABLE;
01932 }
01933
01934 #ifdef RIO_DEBUG1
01935 RioErr << "### [CommunicationTable - searchPidForMode] Finish 2" << endl;
01936 #endif
01937
01938 return current->getMode();
01939 }
01940
01941 int CommunicationTable::searchPid( int pid, CommunicationItem **current,
01942 CommunicationItem **previous )
01943 {
01944 #ifdef RIO_DEBUG1
01945 RioErr << "### [CommunicationTable - searchPid] Start" << endl;
01946 #endif
01947
01948 if( current == NULL || previous == NULL )
01949 {
01950 #ifdef RIO_DEBUG1
01951 RioErr << "### [CommunicationTable - searchPid] Finish 1" << endl;
01952 #endif
01953
01954 return RESULT_COMMUNICATION_SEARCH_NULL_REFERENCE;
01955 }
01956
01957 bool found = false;
01958 CommunicationItem *aux = this->firstItem;
01959 CommunicationItem *auxPrevious = NULL;
01960
01961 if( aux == NULL )
01962 {
01963 #ifdef RIO_DEBUG1
01964 RioErr << "### [CommunicationTable - searchPid] Finish 2" << endl;
01965 #endif
01966
01967 return RESULT_COMMUNICATION_SEARCH_EMPTY_LIST;
01968 }
01969
01970 while( aux != NULL && !found )
01971 {
01972 if( pid == aux->getPid() )
01973 {
01974 found = true;
01975 }
01976 else
01977 {
01978 auxPrevious = aux;
01979 aux = aux->getNext();
01980 }
01981 }
01982
01983 *previous = auxPrevious;
01984 *current = aux;
01985
01986 if( aux == NULL )
01987 {
01988 #ifdef RIO_DEBUG1
01989 RioErr << "### [CommunicationTable - searchPid] Finish 3" << endl;
01990 #endif
01991
01992 return RESULT_COMMUNICATION_SEARCH_NOT_FOUND;
01993 }
01994
01995 #ifdef RIO_DEBUG1
01996 RioErr << "### [CommunicationTable - searchPid] Finish 4" << endl;
01997 #endif
01998
01999 return RESULT_COMMUNICATION_SEARCH_FOUND;
02000 }
02001
02002 int CommunicationTable::searchSocket( int socket, CommunicationItem **current,
02003 CommunicationItem **previous )
02004 {
02005 #ifdef RIO_DEBUG1
02006 RioErr << "### [CommunicationTable - searchSocket] Start" << endl;
02007 #endif
02008
02009 if( current == NULL || previous == NULL )
02010 {
02011 #ifdef RIO_DEBUG1
02012 RioErr << "### [CommunicationTable - searchSocket] Finish 1" << endl;
02013 #endif
02014
02015 return RESULT_COMMUNICATION_SEARCH_NULL_REFERENCE;
02016 }
02017
02018 bool found = false;
02019 CommunicationItem *aux = this->firstItem;
02020 CommunicationItem *auxPrevious = NULL;
02021
02022 if( aux == NULL )
02023 {
02024 #ifdef RIO_DEBUG1
02025 RioErr << "### [CommunicationTable - searchSocket] Finish 2" << endl;
02026 #endif
02027
02028 return RESULT_COMMUNICATION_SEARCH_EMPTY_LIST;
02029 }
02030
02031 while( aux != NULL && !found )
02032 {
02033 if( socket > aux->getSocket() )
02034 {
02035 auxPrevious = aux;
02036 aux = aux->getNext();
02037 }
02038 else
02039 {
02040 found = true;
02041 }
02042 }
02043
02044 *previous = auxPrevious;
02045 *current = aux;
02046
02047 if( aux == NULL )
02048 {
02049 #ifdef RIO_DEBUG1
02050 RioErr << "### [CommunicationTable - searchSocket] Finish 3" << endl;
02051 #endif
02052
02053 return RESULT_COMMUNICATION_SEARCH_NOT_FOUND;
02054 }
02055
02056 if( socket < aux->getSocket() )
02057 {
02058 #ifdef RIO_DEBUG1
02059 RioErr << "### [CommunicationTable - searchSocket] Finish 4" << endl;
02060 #endif
02061
02062 return RESULT_COMMUNICATION_SEARCH_NOT_FOUND;
02063 }
02064
02065 #ifdef RIO_DEBUG1
02066 RioErr << "### [CommunicationTable - searchSocket] Finish 6" << endl;
02067 #endif
02068
02069 return RESULT_COMMUNICATION_SEARCH_FOUND;
02070 }
02071
02072
02073 void CommunicationTable::setFirstItem( CommunicationItem *firstItem )
02074 {
02075 #ifdef RIO_DEBUG1
02076 RioErr << "### [CommunicationTable - setFirstItem] Single" << endl;
02077 #endif
02078
02079 this->firstItem = firstItem;
02080 }
02081
02082 void CommunicationTable::setLastItem( CommunicationItem *lastItem )
02083 {
02084 #ifdef RIO_DEBUG1
02085 RioErr << "### [CommunicationTable - setLastItem] Single" << endl;
02086 #endif
02087
02088 this->lastItem = lastItem;
02089 }
02090
02091 CommunicationItem *CommunicationTable::getFirstItem()
02092 {
02093
02094
02095
02096
02097 return firstItem;
02098 }
02099
02100 CommunicationItem *CommunicationTable::getLastItem()
02101 {
02102
02103
02104
02105
02106 return lastItem;
02107 }
02108
02109
02110
02111
02112
02113
02114 CommunicationItem::CommunicationItem( int pid, int socket )
02115 : CommunicationItemData( pid, socket )
02116 {
02117 #ifdef RIO_DEBUG1
02118 RioErr << "### [CommunicationItem - Constructor 1] Start" << endl;
02119 #endif
02120
02121 setNext( NULL );
02122
02123 #ifdef RIO_DEBUG1
02124 RioErr << "### [CommunicationItem - Constructor 1] Finish" << endl;
02125 #endif
02126 }
02127
02128 CommunicationItem::CommunicationItem( CommunicationItem *next, int pid,
02129 int socket )
02130 : CommunicationItemData( pid, socket )
02131 {
02132 #ifdef RIO_DEBUG1
02133 RioErr << "### [CommunicationItem - Constructor 2] Start" << endl;
02134 #endif
02135
02136 setNext( next );
02137
02138 #ifdef RIO_DEBUG1
02139 RioErr << "### [CommunicationItem - Constructor 2] Finish" << endl;
02140 #endif
02141 }
02142
02143 CommunicationItem::CommunicationItem( CommunicationItem *previous,
02144 CommunicationItem *next,
02145 int pid, int socket )
02146 : CommunicationItemData( pid, socket )
02147 {
02148 #ifdef RIO_DEBUG1
02149 RioErr << "### [CommunicationItem - Constructor 3] Start" << endl;
02150 #endif
02151
02152 setNext( next );
02153
02154 if( previous )
02155 previous->setNext( this );
02156
02157 #ifdef RIO_DEBUG1
02158 RioErr << "### [CommunicationItem - Constructor 3] Finish" << endl;
02159 #endif
02160 }
02161
02162 void CommunicationItem::setNext( CommunicationItem *next )
02163 {
02164 #ifdef RIO_DEBUG1
02165 RioErr << "### [CommunicationItem - setNext] Single" << endl;
02166 #endif
02167
02168 this->next = next;
02169 }
02170
02171 CommunicationItem *CommunicationItem::getNext()
02172 {
02173
02174
02175
02176
02177 return this->next;
02178 }
02179
02180
02181 CommunicationItemData::CommunicationItemData( int pid, int socket,
02182 ClientMode mode )
02183 {
02184 #ifdef RIO_DEBUG1
02185 RioErr << "### [CommunicationItemData - CommunicationItemData] Start" << endl;
02186 #endif
02187
02188 setPid( pid );
02189 setSocket( socket );
02190 setMode( mode );
02191
02192 #ifdef RIO_DEBUG1
02193 RioErr << "### [CommunicationItemData - CommunicationItemData] Finish" << endl;
02194 #endif
02195 }
02196
02197 void CommunicationItemData::setPid( int pid )
02198 {
02199 #ifdef RIO_DEBUG1
02200 RioErr << "### [CommunicationItemData - setPid] Single" << endl;
02201 #endif
02202
02203 this->pid = pid;
02204 }
02205
02206 int CommunicationItemData::getPid()
02207 {
02208
02209
02210
02211
02212 return this->pid;
02213 }
02214
02215 void CommunicationItemData::setSocket( int socket )
02216 {
02217 #ifdef RIO_DEBUG1
02218 RioErr << "### [CommunicationItemData - setSocket] Single" << endl;
02219 #endif
02220
02221 this->socket = socket;
02222 }
02223
02224 int CommunicationItemData::getSocket()
02225 {
02226
02227
02228
02229
02230 return socket;
02231 }
02232
02233 void CommunicationItemData::setMode( ClientMode mode )
02234 {
02235 #ifdef RIO_DEBUG1
02236 RioErr << "### [CommunicationItemData - setMode] Single" << endl;
02237 #endif
02238
02239 this->mode = mode;
02240 }
02241
02242 ClientMode CommunicationItemData::getMode()
02243 {
02244
02245
02246
02247
02248 return mode;
02249 }
02250
02251
02252
02253
02254
02255 AddressIP::AddressIP( char *address, unsigned short port )
02256 {
02257 #ifdef RIO_DEBUG1
02258 RioErr << "### [AddressIP - Constructor 1] Start" << endl;
02259 #endif
02260
02261 setAddress( address );
02262 setPort( port );
02263
02264 #ifdef RIO_DEBUG1
02265 RioErr << "### [AddressIP - Constructor 1] Finish" << endl;
02266 #endif
02267 }
02268
02269 AddressIP::AddressIP( in_addr_t address, unsigned short port )
02270 {
02271 #ifdef RIO_DEBUG1
02272 RioErr << "### [AddressIP - Constructor 2] Start" << endl;
02273 #endif
02274
02275 setAddress( address );
02276 setPort( port );
02277
02278 #ifdef RIO_DEBUG1
02279 RioErr << "### [AddressIP - Constructor 2] Finish" << endl;
02280 #endif
02281 }
02282
02283 void AddressIP::setPort( unsigned short port )
02284 {
02285 #ifdef RIO_DEBUG1
02286 RioErr << "### [AddressIP - setPort] Single" << endl;
02287 #endif
02288
02289 this->port = htons( port );
02290 }
02291
02292 void AddressIP::setAddress( char *address )
02293 {
02294 #ifdef RIO_DEBUG1
02295 RioErr << "### [AddressIP - setAddress 1] Single" << endl;
02296 #endif
02297
02298 this->address = inet_addr( address );
02299 }
02300
02301 void AddressIP::setAddress( in_addr_t address )
02302 {
02303 #ifdef RIO_DEBUG1
02304 RioErr << "### [AddressIP - setAddress 2] Single" << endl;
02305 #endif
02306
02307 this->address = address;
02308 }
02309
02310 unsigned short AddressIP::getPort()
02311 {
02312
02313
02314
02315
02316 return port;
02317 }
02318
02319 in_addr_t AddressIP::getAddress()
02320 {
02321
02322
02323
02324
02325 return address;
02326 }
02327