00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107 #include <stdio.h>
00108 #include <stdlib.h>
00109 #include <iostream>
00110 #include <unistd.h>
00111 #include <sys/ioctl.h>
00112 #include <net/if.h>
00113 #include <netinet/in.h>
00114 #include <netinet/tcp.h>
00115
00116
00117 #include <sys/syscall.h>
00118
00119 using namespace std;
00120
00121 #include "BufferStream.h"
00122 #include "RioNeti.h"
00123 #include "RioNeti.auth"
00124 #include "RioNeti.pkt"
00125 #include "RioError.h"
00126 #include "RioTypes.h"
00127
00128 #define IP_STRING_SIZE 16
00129
00130 #ifdef WINDOWS
00131
00132
00133 #include "stdafx.h"
00134 const int RioNeti::TimeShift = 10;
00135 const int RioNeti::DEBUG_CNT = (1 << 0);
00136 const int RioNeti::DEBUG_FRAG = (1 << 2);
00137 const int RioNeti::DEBUG_TIM = (1 << 3);
00138 const int RioNeti::DEBUG_DATA = (1 << 4);
00139 const int RioNeti::DEBUG_CMDS = (1 << 5);
00140 const int RioNeti::DEBUG_MISC = (1 << 6);
00141 const int RioNeti::DEBUG_MORE = (1 << 8);
00142 const int RioNeti::DEBUG_TIMEOUT= (1 << 10);
00143 const int RioNeti::XM_REQBITS = 8;
00144 const int RioNeti::XM_REQNUM = (1 << XM_REQBITS);
00145 const int RioNeti::XM_REQMASK = (XM_REQNUM-1);
00146 const int NetBuf::RQ_BURSTSTART = 4;
00147 const int NetBuf::RQ_BURSTMIN = 4;
00148 const int NetBuf::RQ_BURSTMAX = 20;
00149 const int NetBuf::RQ_ACKEVERY = 4;
00150 const int NetBuf::RQ_MAXACKLIST = 10;
00151 #else
00152
00153 #endif
00154
00155 const int USEC = 1000000;
00156 const int USECBITS = 20;
00157 const int iphdrlen = 20;
00158 const int udphdrlen = 8;
00159
00160
00161
00162 #ifdef RIO_DEBUG_FILE
00163 ofstream RioNeti::m_log;
00164 #else
00165 debugCerr RioNeti::m_log;
00166 #endif
00167
00168
00169 RioNeti::RioNeti( bool serverInstance )
00170 {
00171 #ifdef RIO_DEBUG1
00172 RioErr << "[RioNeti - Construtor] Start" << endl;
00173 #endif
00174
00175 this->serverInstance = serverInstance;
00176 reqid_table = new RioNetiReqIdTable();
00177 m_maxpktl = FRAGMENTSIZE;
00178 m_maxdiskl = 128 * 1024;
00179 m_threadstop = 1;
00180 buffer_stream = NULL;
00181 callback = NULL;
00182 callback_transport = NULL;
00183 m_thread = 0;
00184
00185 m_cmdparm = NULL;
00186 xm_rstsec = 0;
00187 xm_rstip = 0;
00188 xm_rstport = 0;
00189 xm_rstreqid = 0;
00190 mbc_num = 0;
00191 memset( &readfds, 0, sizeof( fd_set ) );
00192 memset( &writefds, 0, sizeof( fd_set ) );
00193 reqseq = 0;
00194
00195 pthread_mutex_init( &socketMulticastMutex, NULL );
00196
00197
00198 m_sock = 0;
00199 for( int socki = 0; socki < MULTICAST_SOCKETS; socki++ )
00200 {
00201 multicastInfo[socki].socket = 0;
00202 multicastInfo[socki].isConnected = false;
00203 memset( &(multicastInfo[socki].myaddr_in), 0,
00204 sizeof( multicastInfo[socki].myaddr_in ) );
00205 memset( &(multicastInfo[socki].remoteaddr_in), 0,
00206 sizeof( multicastInfo[socki].remoteaddr_in ) );
00207 }
00208
00209 #ifdef WINDOWS
00210
00211 HashMutex = CreateMutex( NULL, FALSE, NULL );
00212 m_buf_lock = CreateMutex( NULL, FALSE, NULL );
00213 m_dns_lock = CreateMutex( NULL, FALSE, NULL );
00214 #else
00215
00216 pthread_mutex_init( &HashMutex, NULL );
00217 pthread_mutex_init( &m_buf_lock, NULL );
00218 pthread_mutex_init( &m_dns_lock, NULL );
00219 #endif
00220
00221 m_toaddrl = sizeof( m_toaddr );
00222 memset( &m_toaddr, 0, sizeof( m_toaddr ) );
00223 m_toaddr.sin_family = AF_INET;
00224 m_fraddrl = sizeof( m_fraddr );
00225 memset( &m_fraddr, 0, sizeof( m_fraddr ) );
00226 m_fraddr.sin_family = AF_INET;
00227 m_cmdproc = 0;
00228 cnt_sndpkt = 0;
00229 cnt_rcvpkt = 0;
00230 cnt_retry = 0;
00231 cnt_rst1 = 0;
00232 cnt_rst2 = 0;
00233 cnt_rcvfragack = 0;
00234 cnt_rcvfragacknc = 0;
00235 err_dupfrag = 0;
00236 err_overfrag = 0;
00237 err_pktsize = 0;
00238 err_pktmagic = 0;
00239 err_pktnoreq = 0;
00240 err_pktcmd = 0;
00241 err_pktfmt = 0;
00242
00243 for( int i = 0; i < XM_REQNUM; i++ )
00244 reqarray[ i ] = NULL;
00245
00246 DQUEUE_INIT(xm_timeq, NetBuf, nb_tidq);
00247 m_cmpltf = NULL;
00248 m_cmpltl = VIRTORG(m_cmpltf, NetBuf, nb_link);
00249 m_outqf = NULL;
00250 m_outql = VIRTORG(m_outqf, NetBuf, nb_outnext);
00251 mbc_free = NULL;
00252 m_debug = 0;
00253
00254
00255
00256
00257 m_threadmap = 0;
00258
00259 m_MappingInfo = NULL;
00260 m_MappingInfoSize = 0;
00261 m_ipaddrmap = NULL;
00262 m_ipportmap = NULL;
00263
00264
00265 m_StreamControl = NULL;
00266
00267
00268
00269 m_LogRotation = NULL;
00270
00271 #ifdef RIO_DEBUG1
00272 RioErr << "[RioNeti - Construtor] Finish" << endl;
00273 #endif
00274 }
00275
00276
00277 RioNeti::~RioNeti()
00278 {
00279 #ifdef RIO_DEBUG1
00280 m_log << "[RioNeti - Destructor] Start" << endl;
00281 #endif
00282
00283 if( !m_threadstop )
00284 Stop();
00285
00286 #ifdef WINDOWS
00287
00288 CloseHandle( HashMutex );
00289 CloseHandle( m_buf_lock );
00290 CloseHandle( m_dns_lock );
00291 CloseHandle( m_thread );
00292 #else
00293
00294 pthread_mutex_destroy( &HashMutex );
00295 pthread_mutex_destroy( &m_buf_lock );
00296 pthread_mutex_destroy( &m_dns_lock );
00297 #endif
00298
00299 delete reqid_table;
00300
00301
00302
00303 if( m_StreamControl != NULL )
00304 delete m_StreamControl;
00305
00306
00307
00308 if( m_LogRotation != NULL )
00309 delete m_LogRotation;
00310
00311 if( ( m_ipaddrmap != NULL ) && ( m_ipaddrmap != ( &m_ipaddr ) ) )
00312 delete m_ipaddrmap;
00313
00314 if( ( m_ipportmap != NULL ) && ( m_ipportmap != ( &m_ipport ) ) )
00315 delete m_ipportmap;
00316
00317 if( m_MappingInfo != NULL )
00318 delete m_MappingInfo;
00319
00320
00321 mbterm();
00322
00323 #ifdef RIO_DEBUG1
00324 m_log << "[RioNeti - Destructor] Finish" << endl;
00325 #endif
00326 }
00327
00328
00329
00330
00331
00332
00333 void RioNeti::allocMappingVectors( int size )
00334 {
00335 #ifdef RIO_DEBUG1
00336 m_log << "[RioNeti - allocMappingVectors] Start" << endl;
00337 #endif
00338
00339 m_ipaddrmap = new int[ size ];
00340 m_ipportmap = new int[ size ];
00341 m_MappingInfo = new NATMappingInfo[ size ];
00342 m_MappingInfoSize = size;
00343
00344
00345 for( int i = 0; i < size; i++ )
00346 {
00347 m_ipaddrmap[ i ] = -1;
00348 m_ipportmap[ i ] = 0;
00349 m_MappingInfo[ i ].IsEnabled = false;
00350 }
00351
00352 #ifdef RIO_DEBUG1
00353 m_log << "[RioNeti - allocMappingVectors] Finish" << endl;
00354 #endif
00355 }
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370 #ifdef RIO_DEBUG_FILE
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384 int RioNeti::Start( int port, int maxdiskl, int maxpktl,
00385 const char *RioNetiLogPath,
00386 unsigned int MaxClientsCredits,
00387 unsigned int MaxNetworkCredits, unsigned int NetworkRate )
00388 #else
00389 int RioNeti::Start( int port, int maxdiskl, int maxpktl,
00390 unsigned int MaxClientsCredits,
00391 unsigned int MaxNetworkCredits, unsigned int NetworkRate )
00392 #endif
00393 {
00394 #ifdef RIO_DEBUG1
00395 RioErr << "### [RioNeti - Start] Start" << endl;
00396 #endif
00397
00398
00399 #ifdef RIO_DEBUG_FILE
00400 if( RioNetiLogPath == NULL )
00401 {
00402 char tmpName[256];
00403 char tmpDomain[256];
00404 gethostname( tmpName, 255 );
00405 getdomainname( tmpDomain, 255 );
00406 string file;
00407 if( serverInstance )
00408 {
00409 int myPort = ntohs( port );
00410 if( myPort == STORAGESERVERUDPPORT )
00411 file = "RIOStorageEmul_";
00412 else if( myPort == RIOSERVERUDPPORT )
00413 file = "RIOServerEmul_";
00414 }
00415 else
00416 {
00417 char tmpPid[ 20 ];
00418
00419
00420 pid_t myPid;
00421 myPid = getpid();
00422 sprintf( tmpPid, "%d", ( int ) myPid );
00423 file = "RIOClientEmul_";
00424 file += tmpPid;
00425 file += "_";
00426 }
00427
00428 file += tmpName;
00429 if( strstr( tmpName, tmpDomain ) == NULL )
00430 {
00431 file += ".";
00432 file += tmpDomain;
00433 }
00434 file += ".log";
00435
00436 m_log.open( file.c_str() );
00437 }
00438 else
00439 m_log.open( RioNetiLogPath );
00440 #endif
00441
00442 int rc;
00443 struct sockaddr_in wk_addr;
00444 pthread_attr_t attribMonitor;
00445 pthread_attr_t attribNetMgr;
00446
00447 pthread_attr_init( &attribMonitor );
00448 pthread_attr_init( &attribNetMgr );
00449 pthread_attr_setstacksize( &attribMonitor, 2*PTHREAD_STACK_MIN );
00450 pthread_attr_setstacksize( &attribNetMgr, 2*PTHREAD_STACK_MIN );
00451
00452 #ifdef WINDOWS
00453
00454 u_long FAR i;
00455 DWORD THID;
00456 #else
00457
00458 int i;
00459 #endif
00460
00461 if( !m_threadstop )
00462 {
00463 #ifdef RIO_DEBUG1
00464 m_log << "[RioNeti - Start] Finish 1" << endl;
00465 #endif
00466
00467 return -1;
00468 }
00469
00470
00471 m_debug = 0;
00472 char *envdbgp = getenv( " RIONETI_DEBUG" );
00473 if( envdbgp )
00474 {
00475 m_debug = atoi( envdbgp );
00476 if( m_debug )
00477 {
00478 m_log << " RioNeti m_debug " << m_debug << endl;
00479 }
00480 }
00481
00482 i = 1000;
00483 envdbgp = getenv( " RIONETI_TRACEL" );
00484 if( envdbgp )
00485 {
00486 i = atoi( envdbgp );
00487 if( i < 1000 || i > 10000000 )
00488 i = 5000;
00489 if( m_debug )
00490 m_log << " RioNeti " << i << " trace entries" << endl;
00491 }
00492
00493 m_ipaddr = findNetInterface();
00494 m_ipport = port;
00495 m_maxdiskl = maxdiskl;
00496 m_maxpktl = maxpktl;
00497
00498 #ifdef WINDOWS
00499
00500 if( m_debug & DEBUG_MISC )
00501 {
00502 int pktdatal = m_maxpktl - (iphdrlen + udphdrlen);
00503 int fraglen = pktdatal - sizeof(RioPkt::pktfx);
00504 int fraglen0 = pktdatal - sizeof(RioPkt::pktf0);
00505 m_log << "m_maxpktl " << maxpktl << " pktdatal " << pktdatal
00506 << " fraglen " << fraglen << " fraglen0 " << fraglen0 << endl;
00507
00508 m_log << "sizeof(RioPkt::pktp) " << sizeof(RioPkt::pktp)
00509 << " sizeof(RioPkt::pktf0) " << sizeof(RioPkt::pktf0) << endl;
00510 m_log << "sizeof(RioPkt::pktfx) " << sizeof(RioPkt::pktfx)
00511 << " sizeof(RioPkt::pktfa) " << sizeof(RioPkt::pktfa) << endl;
00512 }
00513 #else
00514
00515 #ifdef RIO_DEBUG2
00516 int pktdatal = m_maxpktl - (iphdrlen + udphdrlen);
00517 int fraglen = pktdatal - sizeof(RioPkt::pktfx);
00518 int fraglen0 = pktdatal - sizeof(RioPkt::pktf0);
00519 m_log << "m_maxpktl " << maxpktl << " pktdatal " << pktdatal
00520 << " fraglen " << fraglen << " fraglen0 " << fraglen0 << endl;
00521
00522 m_log << "sizeof(RioPkt::pktp) " << sizeof(RioPkt::pktp)
00523 << " sizeof(RioPkt::pktf0) " << sizeof(RioPkt::pktf0) << endl;
00524 m_log << "sizeof(RioPkt::pktfx) " << sizeof(RioPkt::pktfx)
00525 << " sizeof(RioPkt::pktfa) " << sizeof(RioPkt::pktfa) << endl;
00526 #endif
00527 #endif
00528
00529
00530
00531
00532
00533
00534 if( ( MaxClientsCredits != 0 ) && ( MaxNetworkCredits != 0 ) &&
00535 ( NetworkRate != 0 ) ) {
00536
00537
00538 m_StreamControl = new CStreamControl;
00539 if( m_StreamControl == NULL ) {
00540 Rioperror( "creation of StreamControl failed ");
00541
00542 #ifdef RIO_DEBUG1
00543 m_log << "[RioNeti - Start] Finish 2" << endl;
00544 #endif
00545
00546 exit( -1 );
00547
00548 }
00549
00550
00551
00552
00553 unsigned long long int NetworkRateFS, TimeBetweenCredits;
00554
00555
00556 NetworkRateFS = ( ( unsigned long long int ) NetworkRate ) * 131072ull;
00557
00558 NetworkRateFS = NetworkRateFS / ( ( unsigned long long int ) FRAGMENTSIZE );
00559
00560
00561
00562
00563 TimeBetweenCredits = 1000000ull / NetworkRateFS;
00564
00565 if( m_StreamControl->Start( this, MaxClientsCredits, MaxNetworkCredits,
00566 TimeBetweenCredits ) != ERROR_NO_RIO_ERROR )
00567 {
00568 Rioperror( "start of StreamControl failed ");
00569
00570 #ifdef RIO_DEBUG1
00571 m_log << "[RioNeti - Start] Finish 3" << endl;
00572 #endif
00573
00574 exit( -1 );
00575 }
00576 }
00577
00578
00579 mbinit( 10 );
00580
00581 rc = socket( AF_INET, SOCK_DGRAM, 0 );
00582 if( rc < 0 )
00583 {
00584 Rioperror( "socket called failed" );
00585
00586 #ifdef RIO_DEBUG1
00587 m_log << "[RioNeti - Start] Finish 4" << endl;
00588 #endif
00589
00590 exit( 1 );
00591 }
00592
00593 m_sock = rc;
00594
00595
00596
00597 if( fcntl( m_sock, F_SETFD, FD_CLOEXEC ) < 0 )
00598 {
00599 Rioperror( "fcntl called failed" );
00600
00601 #ifdef RIO_DEBUG1
00602 m_log << "### [RioNeti - Start] Finish 5" << endl;
00603 #endif
00604
00605 exit( 1 );
00606 }
00607
00608 if( m_debug & DEBUG_MISC )
00609 m_log << " m_sock " << m_sock << endl;
00610
00611 memset( &wk_addr, 0, sizeof( wk_addr ) );
00612
00613 wk_addr.sin_family = AF_INET;
00614 wk_addr.sin_addr.s_addr = INADDR_ANY;
00615 wk_addr.sin_port = m_ipport;
00616 if( bind( m_sock, (struct sockaddr *) &wk_addr,sizeof( wk_addr )) < 0 )
00617 {
00618 Rioperror( "bind called failed" );
00619
00620 #ifdef RIO_DEBUG1
00621 m_log << "### [RioNeti - Start] Finish 6" << endl;
00622 #endif
00623
00624 exit( 1 );
00625 }
00626
00627
00628 #ifdef _SGI_SOURCE
00629 #define socklen_t int
00630 #endif
00631
00632 #ifdef WINDOWS
00633
00634 int wkl = sizeof( myAddress );
00635 #else
00636
00637 socklen_t wkl = sizeof( myAddress );
00638 #endif
00639
00640 if( getsockname( m_sock, (struct sockaddr *)&myAddress, &wkl ) != 0 )
00641 {
00642 Rioperror( "getsockname failed" );
00643
00644 #ifdef RIO_DEBUG1
00645 m_log << "### [RioNeti - Start] Finish 7" << endl;
00646 #endif
00647
00648 exit(1);
00649 }
00650
00651 if( m_debug & DEBUG_MISC )
00652 {
00653 m_log << " wk socket: " << inet_ntoa( myAddress.sin_addr ) << ":"
00654 << ntohs( myAddress.sin_port ) << endl;
00655 }
00656
00657 m_ipport = myAddress.sin_port;
00658
00659
00660
00661
00662
00663
00664 m_behindnat = false;
00665 m_ipaddrmap = &m_ipaddr;
00666 m_ipportmap = &m_ipport;
00667
00668
00669 mucksockopt( m_sock );
00670
00671
00672
00673 #ifdef WINDOWS
00674
00675 GetSystemTime( &pSystemTime );
00676 xm_now.tv_sec = pSystemTime.wSecond;
00677 xm_now.tv_usec = pSystemTime.wMilliseconds;
00678 #else
00679
00680 gettimeofday( &xm_now, NULL );
00681 #endif
00682
00683 FD_ZERO( &readfds );
00684 FD_ZERO( &writefds );
00685
00686
00687 writefdsp = 0;
00688
00689
00690
00691
00692
00693 i = 1;
00694
00695 #ifdef WINDOWS
00696
00697 rc = ioctlsocket(m_sock, FIONBIO, &i);
00698 if( rc )
00699 {
00700 Rioperror( "ioctlsocket FIONBIO failed");
00701
00702 #ifdef RIO_DEBUG1
00703 m_log << "### [RioNeti - Start] Finish 8" << endl;
00704 #endif
00705
00706 exit( 1 );
00707 }
00708 #else
00709
00710 rc = ioctl( m_sock, FIONBIO, &i );
00711 if( rc )
00712 {
00713 Rioperror( "ioctl FIONBIO failed" );
00714
00715 #ifdef RIO_DEBUG1
00716 m_log << "### [RioNeti - Start] Finish 9" << endl;
00717 #endif
00718
00719 exit( 1 );
00720 }
00721 #endif
00722
00723 m_threadstop = 0;
00724
00725
00726
00727 if( serverInstance )
00728 {
00729 SocketMulticastLock( "servlock" );
00730 CreateMulticastSocket( 0 );
00731 SocketMulticastUnlock( "servulck" );
00732 }
00733
00734 #ifdef WINDOWS
00735
00736 if( (m_thread = CreateThread( NULL, 0,
00737 (PTHREAD_START_ROUTINE )NetMgrThreadEp,
00738 (void *) this,0, &THID)) == NULL)
00739 {
00740 m_threadstop = 1;
00741
00742 #ifdef RIO_DEBUG1
00743 m_log << "### [RioNeti - Start] Finish 11" << endl;
00744 #endif
00745
00746 exit( 1 );
00747 }
00748 #else
00749
00750 if( pthread_create( &m_thread, &attribNetMgr, &NetMgrThreadEp,
00751 (void *) this ) )
00752 {
00753 m_threadstop = 1;
00754 Rioperror( "pthread_create failed" );
00755
00756 #ifdef RIO_DEBUG1
00757 m_log << "### [RioNeti - Start] Finish 12" << endl;
00758 #endif
00759
00760 exit( 1 );
00761 }
00762 #endif
00763
00764 #ifdef RIO_DEBUG1
00765 m_log << "### [RioNeti - Start] Finish 13" << endl;
00766 #endif
00767
00768 return 0;
00769 }
00770
00771
00772
00773
00774
00775 int RioNeti::findNetInterface()
00776 {
00777 #ifdef RIO_DEBUG1
00778 m_log << "[RioNeti - findNetInterface] Start" << endl;
00779 #endif
00780
00781 struct if_nameindex *iflist = NULL;
00782 struct if_nameindex *listsave = NULL;
00783 struct ifreq ifreq;
00784 int auxsock = 0;
00785 struct sockaddr_in sa;
00786 char localhost_ip[IP_STRING_SIZE] = "";
00787
00788 if( ( auxsock = socket( AF_INET, SOCK_STREAM, 0 ) ) < 0 )
00789 {
00790 Rioperror( "RM_initialize: socket for ioctl" );
00791
00792 #ifdef RIO_DEBUG1
00793 m_log << "[RioNeti - findNetInterface] Finish 1" << endl;
00794 #endif
00795
00796 exit(1);
00797 }
00798
00799
00800 listsave = if_nameindex();
00801 iflist = listsave;
00802
00803 for( ; *(char *)iflist != 0; iflist++ )
00804 {
00805
00806 strncpy( ifreq.ifr_name, iflist->if_name, IF_NAMESIZE );
00807
00808
00809 if( ioctl( auxsock, SIOCGIFADDR, &ifreq ) < 0 )
00810 {
00811
00812 if( errno != 99 )
00813 {
00814 Rioperror( "RioNeti_initialize: ioctl" );
00815
00816 #ifdef RIO_DEBUG1
00817 m_log << "[RioNeti - findNetInterface] Finish 2" << endl;
00818 #endif
00819
00820 exit(1);
00821 }
00822 }
00823
00824
00825 memset( &sa, 0, sizeof( struct sockaddr_in ) );
00826 memcpy( &sa, &ifreq.ifr_addr, sizeof( struct sockaddr_in ) );
00827 strcpy( localhost_ip, inet_ntoa( sa.sin_addr ) );
00828
00829 #ifdef RIO_DEBUG2
00830 m_log << "RioNeti::findNetInterface endereco IP associado a placa: "
00831 << localhost_ip << endl;
00832 #endif
00833
00834 if( strcmp( "127.0.0.1", localhost_ip ) != 0 )
00835 break;
00836 }
00837
00838 if_freenameindex( listsave );
00839 close( auxsock );
00840
00841 #ifdef RIO_DEBUG1
00842 m_log << "[RioNeti - findNetInterface] Finish 3" << endl;
00843 #endif
00844
00845 return sa.sin_addr.s_addr;
00846 }
00847
00848 void RioNeti::CreateMulticastSocket( int sockIndex )
00849 {
00850 #ifdef RIO_DEBUG1
00851 m_log << "[RioNeti - CreateMulticastSocket] Start" << endl;
00852 #endif
00853
00854 int rc;
00855
00856
00857 if( multicastInfo[sockIndex].socket )
00858 {
00859 #ifdef RIO_DEBUG2
00860 m_log << "CLOSING SOCKET" << endl;
00861 #endif
00862
00863 rc = close( multicastInfo[sockIndex].socket );
00864
00865 if( rc != 0 )
00866 Rioperror( "Failed to close socket" );
00867 }
00868
00869 multicastInfo[sockIndex].socket = socket( AF_INET, SOCK_DGRAM,
00870 IPPROTO_UDP );
00871
00872
00873
00874
00875
00876
00877 if( multicastInfo[sockIndex].socket < 0 )
00878 {
00879 m_log << " Error creating socket to transmite multicast"
00880 " traffic " << endl;
00881
00882 multicastInfo[sockIndex].isConnected = false;
00883 }
00884 else
00885 {
00886 multicastInfo[sockIndex].isConnected = true;
00887 }
00888
00889 #ifdef RIO_DEBUG2
00890 m_log << "CreateMulticastSocket: socket " << sockIndex
00891 << " criado com porta "
00892 << ntohs( multicastInfo[sockIndex].myaddr_in.sin_port ) << endl;
00893 #endif
00894
00895 #ifdef RIO_DEBUG1
00896 m_log << "[RioNeti - CreateMulticastSocket] Finish" << endl;
00897 #endif
00898 }
00899
00900 bool RioNeti::SetMulticastSocket( unsigned short multicastport,
00901 char* multicast_addr,
00902 void *callback, BufferStream *buffer_stream,
00903 int enable_join )
00904 {
00905 #ifdef RIO_DEBUG1
00906 m_log << "[RioNeti - SetMulticastSocket] Start" << endl;
00907 #endif
00908
00909
00910 int sockIndex = -1;
00911 int aux = 1;
00912 int rc;
00913
00914 SocketMulticastLock( "setsocklk" );
00915
00916 #ifdef RIO_DEBUG2
00917 m_log << "[SetMulticastSocket] status dos sockets: 0/1 = "
00918 << multicastInfo[0].isConnected << "/"
00919 << multicastInfo[1].isConnected << endl;
00920 #endif
00921
00922
00923
00924 for( int i = 0; i < MULTICAST_SOCKETS; i++ )
00925 {
00926
00927
00928
00929
00930
00931
00932
00933
00934 if( ( multicastInfo[i].isConnected == true ) &&
00935 ( multicastInfo[i].myaddr_in.sin_port == htons( multicastport ) )
00936 )
00937 {
00938 #ifdef RIO_DEBUG2
00939 m_log << "[RioNeti] SetMulticastSocket foi chamada para setar "
00940 << "porta " << multicastport << " que já estava setada. "
00941 << "Pedido será ignorado." << endl;
00942 #endif
00943
00944 SocketMulticastUnlock( "setsockulk1" );
00945
00946 #ifdef RIO_DEBUG1
00947 m_log << "[RioNeti - SetMulticastSocket] Finish 1" << endl;
00948 #endif
00949
00950 return false;
00951 }
00952 else if( ( multicastInfo[i].isConnected == false ) &&
00953 ( sockIndex == -1 )
00954 )
00955 {
00956 sockIndex = i;
00957 }
00958 }
00959
00960 if( sockIndex == -1 )
00961 {
00962 m_log << "[RioNeti] SetMulticastSocket ERROR: máximo de conexões "
00963 << "multicast alcançado. Não é possível iniciar uma nova conexão."
00964 << endl;
00965
00966 SocketMulticastUnlock( "setsockulk2" );
00967
00968 #ifdef RIO_DEBUG1
00969 m_log << "[RioNeti - SetMulticastSocket] Finish 2" << endl;
00970 #endif
00971
00972 return false;
00973 }
00974
00975 this->callback_transport = (RioCallBackTransport *) callback;
00976 this->callback = ((RioCallBackTransport *) callback)->callback;
00977 this->buffer_stream = buffer_stream;
00978
00979 multicastInfo[sockIndex].myaddr_in.sin_family = AF_INET;
00980 multicastInfo[sockIndex].myaddr_in.sin_port = htons( multicastport );
00981 multicastInfo[sockIndex].myaddr_in.sin_addr.s_addr = htonl( INADDR_ANY );
00982
00983
00984 if( enable_join )
00985 {
00986 CreateMulticastSocket( sockIndex );
00987
00988 mucksockopt( multicastInfo[sockIndex].socket );
00989
00990 if( setsockopt( multicastInfo[sockIndex].socket, SOL_SOCKET,
00991 SO_REUSEADDR, (char*)&aux, sizeof(aux)) == -1 )
00992 {
00993 m_log << " Reuseaddr error " << endl;
00994 }
00995
00996 SetTTLValue( multicastInfo[sockIndex].socket, 1 );
00997 SetLoopBack( multicastInfo[sockIndex].socket, 1 );
00998
00999 if( bind( multicastInfo[sockIndex].socket,
01000 (struct sockaddr *) &(multicastInfo[sockIndex].myaddr_in),
01001 sizeof(multicastInfo[sockIndex].myaddr_in)) == -1 )
01002 {
01003 Rioperror( "[RioNeti SetMulticastSocket Bind]" );
01004 multicastInfo[sockIndex].socket = -1;
01005 }
01006
01007 int i = 1;
01008 rc = ioctl( multicastInfo[sockIndex].socket, FIONBIO, &i );
01009 if( rc )
01010 {
01011 Rioperror( "ioctl FIONBIO failed" );
01012
01013 #ifdef RIO_DEBUG1
01014 m_log << "[RioNeti - SetMulticastSocket] Finish 3" << endl;
01015 #endif
01016
01017 exit( 1 );
01018 }
01019
01020 JoinGroup( sockIndex, multicast_addr );
01021 }
01022
01023 SocketMulticastUnlock( "setsockulk" );
01024
01025 #ifdef RIO_DEBUG1
01026 m_log << "[RioNeti - SetMulticastSocket] Finish 4" << endl;
01027 #endif
01028
01029 return true;
01030 }
01031
01032
01033
01034 int RioNeti::FreeBlock( unsigned int block_id, RioStreamType traffic )
01035 {
01036 #ifdef RIO_DEBUG1
01037 m_log << "[RioNeti - FreeBlock] Start" << endl;
01038 #endif
01039
01040 bool status = true;
01041 NetBuf *rp = NULL;
01042 int n_frag = 0;
01043 int blocksize = 0;
01044 int totalfrags = 0;
01045 RioNetiReqIdItem *current = NULL;
01046 RioNetiReqIdItem *previous = NULL;
01047
01048 #ifdef RIO_DEBUG2
01049 m_log << "[RioNeti] Entrei na FreeBlock..." << endl;
01050 #endif
01051
01052 if( traffic == MULTICASTTRAFFIC )
01053 {
01054 HashMutexLock( "freeblck1" );
01055 int result = reqid_table->Search( (int) block_id, ¤t,
01056 &previous );
01057
01058 if( result == RESULT_NETIREQT_SEARCH_FOUND )
01059 {
01060 #ifdef RIO_DEBUG2
01061 m_log << "[RioNeti] FreeBlock chamado para bloco " << block_id
01062 << ", multicast de reqid " << current->GetLreqid() << endl;
01063 #endif
01064
01065 rp = iHashFind( current->GetLreqid() );
01066
01067 if( !rp )
01068 {
01069 status = false;
01070
01071 #ifdef RIO_DEBUG2
01072 m_log << "[RioNeti] FreeBlock - Não foram encontrados "
01073 << "fragmentos do bloco multicast "
01074 << current->GetPreqid()
01075 << ". O mesmo não existe na tabela hash." << endl;
01076 #endif
01077 }
01078 else
01079 {
01080 if( rp->rq_fraghigh == 0 )
01081 {
01082
01083
01084
01085
01086
01087 status = false;
01088
01089 #ifdef RIO_DEBUG_EMUL
01090 RioRequest *Request = ( RioRequest * ) rp->nb_userparm;
01091 m_log << "[RioNeti] FreeBlock - Nao foram encontrados "
01092 << "fragmentos do bloco multicast "
01093 << current->GetPreqid() << " de reqid "
01094 << current->GetLreqid() << " na cache. Mais alto de "
01095 << "valor zero!" << endl;
01096 char sent[256];
01097 int numfrags;
01098 numfrags = 1 + (( rp->nb_blockl - rp->rq_fraglen0 +
01099 rp->rq_fraglen - 1) / rp->rq_fraglen );
01100 sprintf( sent, "= %d = %d = %u = %d%% perdido.", block_id,
01101 numfrags, Request->Block, 100 );
01102 m_log << sent << endl;
01103 #endif
01104 }
01105
01106 reqid_table->Remove( current->GetPreqid() );
01107 }
01108 }
01109 else
01110 {
01111 status = false;
01112
01113 #ifdef RIO_DEBUG2
01114 m_log << "[RioNeti] FreeBlock - Bloco multicast " << block_id
01115 << " não existe na tabela multicast." << endl;
01116 #endif
01117 }
01118 }
01119 else
01120 {
01121 HashMutexLock( "freeblck2" );
01122 rp = iHashFind( block_id );
01123 if( !rp )
01124 {
01125 status = false;
01126
01127 #ifdef RIO_DEBUG2
01128 m_log << "[RioNeti] FreeBlock - Não foram encontrados "
01129 << "fragmentos do bloco unicast de reqid " << block_id
01130 << ". O mesmo não existe na tabela hash." << endl;
01131 #endif
01132 }
01133 else if( rp->rq_fraghigh == 0 )
01134 {
01135 status = false;
01136
01137 #ifdef RIO_DEBUG_EMUL
01138 RioRequest *Request = ( RioRequest * ) rp->nb_userparm;
01139 m_log << "[RioNeti] FreeBlock - Não foram encontrados "
01140 << "fragmentos do bloco unicast de reqid " << block_id
01141 << " na cache. Mais alto de valor zero!" << endl;
01142 char sent[256];
01143 int numfrags;
01144 numfrags = 1 + (( rp->nb_blockl - rp->rq_fraglen0 +
01145 rp->rq_fraglen - 1) / rp->rq_fraglen );
01146 sprintf( sent, "Lost fragments = %d = %d = %u = %d%% perdido.",
01147 block_id, numfrags, Request->Block, 100 );
01148 m_log << sent << endl;
01149 #endif
01150 }
01151 }
01152
01153 if( status )
01154 {
01155 totalfrags = 1 + ((int) ( rp->nb_blockl/( m_maxpktl -
01156 (iphdrlen + udphdrlen) )));
01157
01158 rp->rq_needack = -1000;
01159 cnt_rcvfragack++;
01160
01161 #ifdef RIO_DEBUG_EMUL
01162 char sent[256];
01163 sprintf( sent, "[FreeBlock] Lost fragments: " );
01164 int lostfragments = 0;
01165 #endif
01166
01167
01168 for( int i = 0; i <= totalfrags - 1; i++ )
01169 {
01170 if( rp->rq_bits[i] == 1 )
01171 {
01172 n_frag++;
01173 }
01174 #ifdef RIO_DEBUG_EMUL
01175 else
01176 {
01177 sprintf( sent, "%d/", i );
01178 lostfragments++;
01179 }
01180 #endif
01181 }
01182
01183 #ifdef RIO_DEBUG_EMUL
01184 int numfrags;
01185 RioRequest *Request = ( RioRequest * ) rp->nb_userparm;
01186 numfrags = 1 + (( rp->nb_blockl - rp->rq_fraglen0 +
01187 rp->rq_fraglen - 1) / rp->rq_fraglen );
01188 sprintf( sent, "Lost fragments = %d = %d = %u = %d%% perdido.",
01189 block_id, lostfragments, Request->Block,
01190 ( int )( 100*lostfragments/numfrags ) );
01191 m_log << sent << endl;
01192 #endif
01193
01194 if( n_frag > 0 )
01195 {
01196 if( rp->rq_bits[ 0 ] == 1 )
01197 {
01198 blocksize += rp->rq_fraglen0;
01199 n_frag--;
01200 }
01201 if( rp->rq_bits[ totalfrags - 1 ] == 1 )
01202 {
01203 blocksize += ( rp->nb_blockl - ( rp->rq_fraglen0 +
01204 ( ( totalfrags - 2 ) * rp->rq_fraglen ) ) );
01205 n_frag--;
01206 }
01207 blocksize += n_frag * rp->rq_fraglen;
01208 qComplete( rp, 0 );
01209 }
01210 #ifdef RIO_DEBUG2
01211 else
01212 {
01213
01214
01215 m_log << "[FreeBlock] Erro critico!!! Nao existe nenhum "
01216 << "fragmento a ser recuperado para este bloco apesar do "
01217 << "detectado acima!!!" << endl;
01218 }
01219 #endif
01220 }
01221 #ifdef RIO_DEBUG2
01222 else
01223 {
01224 m_log << "[RioNeti] FreeBlock não pôde recuperar bloco de reqid "
01225 << block_id << "." << endl;
01226 }
01227
01228 m_log << "[RioNeti] FreeBlock recuperou " << blocksize << " bytes "<< endl;
01229 #endif
01230
01231 HashMutexUnlock( "freeblck3" );
01232
01233 #ifdef RIO_DEBUG1
01234 m_log << "[RioNeti - FreeBlock] Finish" << endl;
01235 #endif
01236
01237 return blocksize;
01238 }
01239
01240 int RioNeti::FreePendentBlocks( bool useCache, int nBuffers )
01241 {
01242 #ifdef RIO_DEBUG1
01243 m_log << "[RioNeti - FreePendentBlocks] Start" << endl;
01244 #endif
01245
01246 time_t now = time(NULL);
01247 int totalFreed = 0;
01248 RioNetiReqIdItem *current = NULL;
01249 RioNetiReqIdItem *previous = NULL;
01250 NetBuf *rp = NULL;
01251 int ix = 0;
01252 NetBuf *nbp = NULL;
01253 NetBuf *nbpNext = NULL;
01254
01255
01256
01257 #ifdef RIO_DEBUG2
01258 m_log << "Hora: " << now << endl;
01259 prtHash( "[FreePendentBlocks] Antes da varredura..." );
01260 #endif
01261
01262 for( ix = 0; ix < XM_REQNUM; ix++ )
01263 {
01264 for( nbp = reqarray[ix]; nbp != NULL; nbp = nbpNext )
01265 {
01266
01267
01268
01269 nbpNext = nbp->nb_link;
01270
01271
01272
01273
01274 if( ( now > nbp->nb_frag0arrive ) &&
01275 ( ( now - nbp->nb_frag0arrive ) >= (time_t)( 5 * nBuffers ) )
01276 )
01277 {
01278 if( nbp->nb_unicastRequested )
01279 {
01280 #ifdef RIO_DEBUG2
01281 m_log << "[FreePendentBlocks] Liberando bloco unicast "
01282 << "pendente de reqid " << nbp->nb_reqid
01283 << " preso há " << now - nbp->nb_frag0arrive
01284 << " segundos." << endl;
01285 #endif
01286 }
01287 else
01288 {
01289 #ifdef RIO_DEBUG2
01290 m_log << "[FreePendentBlocks] Liberando bloco multicast "
01291 << "pendente de reqid " << nbp->nb_reqid
01292 << " preso há " << now - nbp->nb_frag0arrive
01293 << " segundos." << endl;
01294 #endif
01295
01296
01297 int result = reqid_table->SearchL( nbp->nb_reqid, ¤t,
01298 &previous );
01299
01300 if( result == RESULT_NETIREQT_SEARCH_FOUND )
01301 {
01302 rp = iHashFind( current->GetLreqid() );
01303
01304 if( rp )
01305 reqid_table->Remove( current->GetPreqid() );
01306 }
01307 }
01308
01309 if( nbp->rq_fraghav == 0 )
01310 {
01311 #ifdef RIO_DEBUG2
01312 m_log << "[FreePendentBlocks] Bloco de reqid "
01313 << nbp->nb_reqid << " estourou o tempo limite de "
01314 << "chegada sem sequer receber seu fragmento zero. "
01315 << "Removendo-o da Hash." << endl;
01316 #endif
01317
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329 HashMutexLock( "freependentblck1" );
01330 iHashRmv( nbp );
01331 iOutRmv( nbp );
01332 iTimeRmv( nbp );
01333 nbp->nb_link = 0;
01334 mbfree( nbp );
01335 HashMutexUnlock( "freependentblck1" );
01336 }
01337 else
01338 {
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349
01350 totalFreed += FreeBlock( nbp->nb_reqid, UNICASTTRAFFIC );
01351 }
01352 }
01353 else
01354 {
01355 continue;
01356 }
01357 }
01358 }
01359
01360 #ifdef RIO_DEBUG2
01361 prtHash( "[FreePendentBlocks] Depois da varredura." );
01362 #endif
01363
01364 #ifdef RIO_DEBUG1
01365 m_log << "[RioNeti - FreePendentBlocks] Finish" << endl;
01366 #endif
01367
01368 return totalFreed;
01369 }
01370
01371 bool RioNeti::thereAreFragments( RioBlock block )
01372 {
01373 #ifdef RIO_DEBUG1
01374 m_log << "[RioNeti - thereAreFragments] Start" << endl;
01375 #endif
01376
01377 RioNetiReqIdItem *current = NULL;
01378 RioNetiReqIdItem *previous = NULL;
01379 int ix = 0;
01380 NetBuf *nbp = NULL;
01381 RioRequest *request = NULL;
01382
01383 for( ix = 0; ix < XM_REQNUM; ix++ )
01384 {
01385 for( nbp = reqarray[ix]; nbp != NULL; nbp = nbp->nb_link )
01386 {
01387 if( nbp->nb_unicastRequested )
01388 {
01389 request = (RioRequest*)(nbp->nb_userparm);
01390 if( request->Block == block )
01391 {
01392 #ifdef RIO_DEBUG2
01393 m_log << "[RioNeti - thereAreFragments] Encontrados "
01394 << "fragmentos do bloco unicast " << block
01395 << " de reqid " << nbp->nb_reqid << endl;
01396 #endif
01397
01398 #ifdef RIO_DEBUG1
01399 m_log << "[RioNeti - thereAreFragments] Finish 1" << endl;
01400 #endif
01401
01402 return true;
01403 }
01404 }
01405 else
01406 {
01407 int result = reqid_table->Search( (int) block, ¤t,
01408 &previous );
01409
01410 if( result == RESULT_NETIREQT_SEARCH_FOUND )
01411 {
01412 #ifdef RIO_DEBUG2
01413 m_log << "[RioNeti - thereAreFragments] Encontrados "
01414 << "fragmentos do bloco multicast " << block
01415 << " de reqid " << nbp->nb_reqid << endl;
01416 #endif
01417
01418 #ifdef RIO_DEBUG1
01419 m_log << "[RioNeti - thereAreFragments] Finish 2" << endl;
01420 #endif
01421
01422 return true;
01423 }
01424 }
01425 }
01426 }
01427
01428 #ifdef RIO_DEBUG1
01429 m_log << "[RioNeti - thereAreFragments] Finish 3" << endl;
01430 #endif
01431
01432 return false;
01433 }
01434
01435
01436 void RioNeti::SetTTLValue( int socket, char ttl )
01437 {
01438 #ifdef RIO_DEBUG1
01439 m_log << "[RioNeti - SetTTLValue] Start" << endl;
01440 #endif
01441
01442 if( setsockopt( socket, IPPROTO_IP, IP_MULTICAST_TTL,
01443 ( char*) &ttl, sizeof( ttl )) == -1 )
01444 {
01445 m_log << " SetTTLValue Error " << endl;
01446 }
01447
01448 #ifdef RIO_DEBUG1
01449 m_log << "[RioNeti - SetTTLValue] Finish" << endl;
01450 #endif
01451 }
01452
01453
01454 void RioNeti::SetLoopBack( int socket, char loop )
01455 {
01456 #ifdef RIO_DEBUG1
01457 m_log << "[RioNeti - SetLoopBack] Start" << endl;
01458 #endif
01459
01460 if( setsockopt( socket, IPPROTO_IP, IP_MULTICAST_LOOP,
01461 ( char*) &loop, sizeof( loop )) == -1 )
01462 {
01463 m_log << " SetLoopBack Error " << endl;
01464 }
01465
01466 #ifdef RIO_DEBUG1
01467 m_log << "[RioNeti - SetLoopBack] Finish" << endl;
01468 #endif
01469 }
01470
01471 void RioNeti::JoinGroup( int sockIndex, char* group_addr )
01472 {
01473 #ifdef RIO_DEBUG1
01474 m_log << "[RioNeti - JoinGroup] Start" << endl;
01475 #endif
01476
01477 #ifdef RIO_DEBUG2
01478 m_log << "Joining Group" << endl;
01479 #endif
01480
01481 struct ip_mreq mreq;
01482
01483 if( ( multicastInfo[sockIndex].myaddr_in.sin_addr.s_addr =
01484 inet_addr(group_addr)) == (unsigned) -1 )
01485 {
01486 m_log << " JoinGroup Error: inet_addr " << endl;
01487 }
01488
01489 mreq.imr_multiaddr = multicastInfo[sockIndex].myaddr_in.sin_addr;
01490 mreq.imr_interface.s_addr = INADDR_ANY;
01491
01492 if( setsockopt( multicastInfo[sockIndex].socket, IPPROTO_IP,
01493 IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof(mreq)) == -1 )
01494 {
01495 Rioperror( "[RioNeti] JoinGroup error on ADD_MEMBERSHIP" );
01496 }
01497
01498 #ifdef RIO_DEBUG1
01499 m_log << "[RioNeti - JoinGroup] Finish" << endl;
01500 #endif
01501 }
01502
01503 void RioNeti::LeaveGroup( unsigned short multicastport )
01504 {
01505 #ifdef RIO_DEBUG1
01506 m_log << "[RioNeti - LeaveGroup] Start" << endl;
01507 #endif
01508
01509 int rc;
01510 int sockIndex;
01511
01512 SocketMulticastLock( "leavelk" );
01513
01514 #ifdef RIO_DEBUG2
01515 m_log << "[LeaveGroup] Leaving Group " << multicastport << "("
01516 << htons(multicastport) << ")"<< endl;
01517 #endif
01518
01519 for( sockIndex = 0; sockIndex < MULTICAST_SOCKETS; sockIndex++ )
01520 {
01521 if( multicastInfo[sockIndex].myaddr_in.sin_port ==
01522 htons( multicastport )
01523 )
01524 break;
01525 }
01526
01527 if( sockIndex == MULTICAST_SOCKETS )
01528 {
01529 #ifdef RIO_DEBUG2
01530 m_log << "[RioNeti] LeaveGroup ERROR: porta " << multicastport
01531 << " não encontrada. O socket multicast respectivo não poderá "
01532 << "ser fechado." << endl;
01533
01534 m_log << "[RioNeti] LeaveGroup: Portas disponíveis: ";
01535 for( int socktmp = 0; socktmp < MULTICAST_SOCKETS; socktmp++ )
01536 m_log << "socket " << socktmp << "= "
01537 << multicastInfo[socktmp].myaddr_in.sin_port << " - ntohs("
01538 << ntohs( multicastInfo[socktmp].myaddr_in.sin_port)
01539 << ") - ";
01540 m_log << "" << endl;
01541 #endif
01542
01543 #ifdef RIO_DEBUG1
01544 m_log << "[RioNeti - LeaveGroup] Finish 1" << endl;
01545 #endif
01546
01547 return;
01548 }
01549
01550 if( multicastInfo[sockIndex].socket )
01551 {
01552
01553
01554
01555
01556
01557 #ifdef RIO_DEBUG2
01558 m_log << "CLOSING SOCKET" << endl;
01559 #endif
01560
01561 rc = close( multicastInfo[sockIndex].socket );
01562
01563 if( rc != 0 )
01564 Rioperror( "Failed to close socket" );
01565
01566 }
01567
01568 multicastInfo[sockIndex].socket = 0;
01569 multicastInfo[sockIndex].isConnected = false;
01570 memset( &(multicastInfo[sockIndex].myaddr_in), 0,
01571 sizeof( multicastInfo[sockIndex].myaddr_in ) );
01572 memset( &(multicastInfo[sockIndex].remoteaddr_in), 0,
01573 sizeof( multicastInfo[sockIndex].remoteaddr_in ) );
01574
01575 SocketMulticastUnlock( "leaveulk" );
01576
01577 #ifdef RIO_DEBUG1
01578 m_log << "[RioNeti - LeaveGroup] Finish 2" << endl;
01579 #endif
01580 }
01581
01582 void RioNeti::LeaveAllGroups( void )
01583 {
01584 #ifdef RIO_DEBUG1
01585 m_log << "[RioNeti - LeaveAllGroups] Start" << endl;
01586 #endif
01587
01588 int sockIndex;
01589
01590 for( sockIndex = 0; sockIndex < MULTICAST_SOCKETS; sockIndex++ )
01591 {
01592 if( multicastInfo[sockIndex].isConnected )
01593 {
01594 #ifdef RIO_DEBUG2
01595 m_log << "LeaveAllGroups chamando LeaveGroup para grupo "
01596 << ntohs( multicastInfo[sockIndex].myaddr_in.sin_port )
01597 << endl;
01598 #endif
01599
01600 LeaveGroup( ntohs( multicastInfo[sockIndex].myaddr_in.sin_port ) );
01601 }
01602 }
01603
01604 #ifdef RIO_DEBUG1
01605 m_log << "[RioNeti - LeaveAllGroups] Finish 1" << endl;
01606 #endif
01607 }
01608
01609
01610 int RioNeti::Stop()
01611 {
01612 #ifdef RIO_DEBUG1
01613 m_log << "[RioNeti - Stop] Start" << endl;
01614 #endif
01615
01616 NetBuf *nbp;
01617
01618 if( m_threadstop )
01619 {
01620 #ifdef RIO_DEBUG1
01621 m_log << "[RioNeti - Stop] Finish 1" << endl;
01622 #endif
01623
01624 return 0;
01625 }
01626
01627 m_threadstop = 1;
01628
01629
01630
01631 if( m_thread != 0 )
01632 {
01633 #ifdef RIO_DEBUG2
01634 m_log << "RioNeti::Stop enviando a mensagem para parar a thread "
01635 << "NetMgrThread" << endl;
01636 #endif
01637
01638 HashMutexLock( "stop" );
01639 nbp = mbget();
01640 nbp->nb_toipaddr = m_ipaddr;
01641 nbp->nb_toipport = m_ipport;
01642 memset( nbp->nb_buf, 0, 16 );
01643 nbp->nb_sendp = nbp->nb_buf;
01644 nbp->nb_sendl = 16;
01645 fSend( nbp );
01646 HashMutexUnlock( "stop" );
01647 }
01648
01649 #ifdef WINDOWS
01650
01651 WaitForSingleObject(m_thread,INFINITE);
01652
01653 closesocket(m_sock);
01654 #else
01655
01656
01657
01658 if( m_thread != 0 )
01659 {
01660 #ifdef RIO_DEBUG2
01661 m_log << "[RioNeti - Stop] Aguardando pthread_join..." << endl;
01662 #endif
01663
01664 pthread_join( m_thread, NULL );
01665
01666 #ifdef RIO_DEBUG2
01667 m_log << "[RioNeti - Stop] Joined!!!" << endl;
01668 #endif
01669 }
01670
01671
01672
01673
01674 if( m_threadmap != 0 )
01675 pthread_cancel( m_threadmap );
01676
01677 close( m_sock );
01678 #endif
01679
01680 if( m_debug & DEBUG_CNT )
01681 {
01682 prtcnt();
01683 }
01684
01685
01686 if( m_StreamControl != NULL )
01687 m_StreamControl->Stop();
01688
01689 #ifdef RIO_DEBUG1
01690 m_log << "[RioNeti - Stop] Finish 2" << endl;
01691 #endif
01692
01693 return 0;
01694 }
01695
01696
01697
01698 void RioNeti::getmyaddrport( int *ipaddr, int *port, int server )
01699 {
01700 #ifdef RIO_DEBUG1
01701 m_log << "[RioNeti - getmyaddrport] Start" << endl;
01702 #endif
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712 *ipaddr = m_ipaddrmap[server];
01713 *port = m_ipportmap[server];
01714
01715 #ifdef RIO_DEBUG1
01716 m_log << "[RioNeti - getmyaddrport] Finish" << endl;
01717 #endif
01718 }
01719
01720
01721
01722 void RioNeti::getalladdrport( int **ipaddr, int **port )
01723 {
01724 #ifdef RIO_DEBUG1
01725 m_log << "[RioNeti - getalladdrport] Start" << endl;
01726 #endif
01727
01728
01729
01730
01731
01732
01733 *ipaddr = m_ipaddrmap;
01734 *port = m_ipportmap;
01735
01736 #ifdef RIO_DEBUG1
01737 m_log << "[RioNeti - getalladdrport] Finish" << endl;
01738 #endif
01739 }
01740
01741
01742
01743 int RioNeti::getipaddr()
01744 {
01745 #ifdef RIO_DEBUG1
01746 m_log << "[RioNeti - getipaddr] Single" << endl;
01747 #endif
01748
01749
01750
01751
01752 return( m_ipaddrmap[0] );
01753 }
01754
01755
01756
01757
01758
01759
01760
01761 void RioNeti::SetCmdProc( cmdcallback_t cmdproc, void *cmdparm )
01762 {
01763 #ifdef RIO_DEBUG1
01764 m_log << "[RioNeti - SetCmdProc] Start" << endl;
01765 #endif
01766
01767 m_cmdproc = cmdproc;
01768 m_cmdparm = cmdparm;
01769
01770 #ifdef RIO_DEBUG1
01771 m_log << "[RioNeti - SetCmdProc] Finish" << endl;
01772 #endif
01773 }
01774
01775
01776
01777
01778
01779 void RioNeti::iOutAdd( NetBuf *nbp, void(*outrtn)(NetBuf *) )
01780 {
01781 #ifdef RIO_DEBUG1
01782 m_log << "[RioNeti - iOutAdd] Start" << endl;
01783 #endif
01784
01785 if( nbp->nb_outrtn )
01786 {
01787 #ifdef RIO_DEBUG1
01788 m_log << "[RioNeti - iOutAdd] Finish 1" << endl;
01789 #endif
01790
01791 return;
01792 }
01793
01794 nbp->nb_outrtn = outrtn;
01795
01796 nbp->nb_outnext = 0;
01797 m_outql->nb_outnext = nbp;
01798 m_outql = nbp;
01799
01800 #ifdef RIO_DEBUG1
01801 m_log << "[RioNeti - iOutAdd] Finish 2" << endl;
01802 #endif
01803 }
01804
01805
01806
01807
01808
01809
01810
01811
01812
01813 void RioNeti::iOutAddTop( NetBuf *nbp, void(*outrtn)(NetBuf *) )
01814 {
01815 #ifdef RIO_DEBUG1
01816 m_log << "[RioNeti - iOutAddTop] Start" << endl;
01817 #endif
01818
01819 if( nbp->nb_outrtn )
01820 {
01821 #ifdef RIO_DEBUG1
01822 m_log << "[RioNeti - iOutAddTop] Finish 1" << endl;
01823 #endif
01824
01825 return;
01826 }
01827
01828 nbp->nb_outrtn = outrtn;
01829
01830 nbp->nb_outnext = m_outqf;
01831
01832 if( !m_outqf )
01833 {
01834 m_outql = nbp;
01835 }
01836
01837 m_outqf = nbp;
01838
01839 #ifdef RIO_DEBUG1
01840 m_log << "[RioNeti - iOutAddTop] Finish 2" << endl;
01841 #endif
01842 }
01843
01844
01845
01846
01847
01848 void RioNeti::iOutRmv( NetBuf *nbp )
01849 {
01850 #ifdef RIO_DEBUG1
01851 m_log << "[RioNeti - iOutRmv] Start" << endl;
01852 #endif
01853
01854 NetBuf *p0, *p1;
01855
01856 if( !nbp->nb_outrtn )
01857 {
01858 #ifdef RIO_DEBUG1
01859 m_log << "[RioNeti - iOutRmv] Finish 1" << endl;
01860 #endif
01861
01862 return;
01863 }
01864
01865 p0 = VIRTORG(m_outqf, NetBuf, nb_outnext);
01866 for( p1 = p0->nb_outnext; p1 != NULL; p0 = p1, p1 = p0->nb_outnext)
01867 {
01868 if( p1 != nbp )
01869 continue;
01870
01871 p0->nb_outnext = nbp->nb_outnext;
01872 if( m_outql == nbp )
01873 {
01874 m_outql = p0;
01875 }
01876 nbp->nb_outrtn = 0;
01877
01878 #ifdef RIO_DEBUG1
01879 m_log << "[RioNeti - iOutRmv] Finish 2" << endl;
01880 #endif
01881
01882 return;
01883 }
01884 }
01885
01886
01887 void *RioNeti::NetMgrThreadEp( void *parm )
01888 {
01889 #ifdef RIO_DEBUG1
01890 m_log << "[RioNeti - NetMgrThreadEp] Start" << endl;
01891 #endif
01892
01893 #ifdef RIO_DEBUG2
01894 m_log << "[RioNeti - NetMgrThreadEp] Eu sou a NetMgrThread." << endl;
01895 #endif
01896
01897 RioNeti *netiPtr = (RioNeti *) parm;
01898
01899 netiPtr->NetMgrThread();
01900
01901 #ifdef RIO_DEBUG1
01902 m_log << "[RioNeti - NetMgrThreadEp] Finish" << endl;
01903 #endif
01904
01905 return 0;
01906 }
01907
01908
01909 void RioNeti::CleanupNetMgrThread( void *arg )
01910 {
01911 #ifdef RIO_DEBUG1
01912 m_log << "[RioNeti - CleanupNetMgrThread] Single" << endl;
01913 #endif
01914 }
01915
01916
01917
01918
01919
01920 void RioNeti::NetMgrThread( void )
01921 {
01922 #ifdef RIO_DEBUG1
01923 m_log << "[RioNeti - NetMgrThread] Start" << endl;
01924 #endif
01925
01926 int rc;
01927 NetBuf *rcvbuf = 0;
01928 NetBuf *nbp;
01929 int bufl = 0;
01930 char *bufp = 0;
01931 struct timeval xtimeout, auxtimeout, *pxtimeout;
01932 bool anyMulticastSocket;
01933 bool IsSocketsSet;
01934
01935 pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
01936 pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
01937 pthread_cleanup_push( RioNeti::CleanupNetMgrThread, NULL );
01938
01939 if( serverInstance )
01940 RioErr << "NETMGRTHREADID " << syscall( SYS_gettid ) << endl;
01941
01942 HashMutexLock();
01943 loop:
01944
01945 xtimeout.tv_sec = 0;
01946 xtimeout.tv_usec = 0;
01947 pxtimeout = NULL;
01948
01949 while( !DQUEUE_TEST( xm_timeq, NetBuf, nb_tidq ) )
01950 {
01951
01952 nbp = xm_timeq.linkf;
01953 if( ( ( xm_now.tv_usec > nbp->nb_tiwhen.tv_usec )&&
01954 ( xm_now.tv_sec == nbp->nb_tiwhen.tv_sec ) ) ||
01955 ( xm_now.tv_sec > nbp->nb_tiwhen.tv_sec ) )
01956 {
01957
01958 DQUEUE_RMV(nbp, nb_tidq);
01959 nbp->nb_tidq.linkf = nbp->nb_tidq.linkb = nbp;
01960
01961 (*nbp->nb_callback)(nbp, TYPE_TIMEOUT, 0, 0);
01962 continue;
01963 }
01964
01965 auxtimeout.tv_sec = nbp->nb_tiwhen.tv_sec - xm_now.tv_sec;
01966 auxtimeout.tv_usec = nbp->nb_tiwhen.tv_usec - xm_now.tv_usec;
01967
01968 if( auxtimeout.tv_usec < 0 )
01969 {
01970 auxtimeout.tv_sec--;
01971 auxtimeout.tv_usec += USEC;
01972 }
01973
01974 if( ( ( xtimeout.tv_sec == 0 ) && ( xtimeout.tv_usec == 0 ) ) ||
01975 ( ( ( xtimeout.tv_usec > auxtimeout.tv_usec ) &&
01976 ( xtimeout.tv_sec == auxtimeout.tv_sec ) ) ||
01977 ( xtimeout.tv_sec > auxtimeout.tv_sec ) ) )
01978 {
01979 xtimeout.tv_sec = auxtimeout.tv_sec;
01980 xtimeout.tv_usec = auxtimeout.tv_usec;
01981 pxtimeout = &xtimeout;
01982 }
01983 break;
01984 }
01985
01986 HashMutexUnlock();
01987
01988
01989 while( m_cmpltf )
01990 {
01991 nbp = m_cmpltf;
01992 m_cmpltf = nbp->nb_link;
01993 if( m_cmpltf == 0 )
01994 {
01995
01996
01997 m_cmpltl = VIRTORG(m_cmpltf, NetBuf, nb_link);
01998 }
01999
02000
02001 (*nbp->nb_usercall)(nbp->nb_userparm, nbp->nb_result);
02002 mbfree( nbp );
02003 }
02004
02005 if( m_debug & DEBUG_TIM )
02006 {
02007 char print[256];
02008 sprintf(print, " run now: %8.8lx %8.8lx" ,
02009 xm_now.tv_sec, xm_now.tv_usec);
02010 m_log << print << endl;
02011 if( m_debug & DEBUG_MORE )
02012 prttim( " before select" );
02013 }
02014
02015 if( m_threadstop )
02016 {
02017 if( rcvbuf != NULL )
02018 mbfree( rcvbuf );
02019
02020 #ifdef RIO_DEBUG1
02021 m_log << "[RioNeti - NetMgrThread] Finish 1" << endl;
02022 #endif
02023
02024 return;
02025 }
02026
02027
02028
02029
02030 FD_ZERO( &readfds );
02031 FD_ZERO( &writefds );
02032
02033 FD_SET( m_sock, &readfds );
02034 FD_SET( m_sock, &writefds );
02035
02036 max_in_set = m_sock;
02037
02038 SocketMulticastLock();
02039 anyMulticastSocket = false;
02040 for( int socki = 0; socki < MULTICAST_SOCKETS; socki++ )
02041 {
02042 if( multicastInfo[socki].isConnected )
02043 {
02044 anyMulticastSocket = true;
02045 FD_SET( multicastInfo[socki].socket, &readfds );
02046
02047
02048 FD_SET( multicastInfo[socki].socket, &writefds );
02049
02050 if( max_in_set < multicastInfo[socki].socket )
02051 max_in_set = multicastInfo[socki].socket;
02052 }
02053 }
02054
02055 max_in_set++;
02056
02057
02058
02059
02060
02061 if( !anyMulticastSocket )
02062 SocketMulticastUnlock();
02063
02064 #ifdef WINDOWS
02065
02066 rc = select(max_in_set, &readfds, NULL,(fd_set *)0, pxtimeout);
02067 if( rc <= 0 )
02068 {
02069 Rioperror( "[RioNeti] select Error" );
02070 abort();
02071 }
02072 GetSystemTime(&pSystemTime);
02073 xm_now.tv_sec = pSystemTime.wSecond;
02074 xm_now.tv_usec = pSystemTime.wMilliseconds;
02075 #else
02076
02077
02078
02079
02080
02081
02082
02083 SocketMulticastUnlock();
02084
02085 rc = select( max_in_set, &readfds, writefdsp, (fd_set *)0, pxtimeout );
02086
02087 if( rc == -1 )
02088 {
02089 if( errno == 4 )
02090 {
02091 #ifdef RIO_DEBUG2
02092 m_log << "RioNeti::NetMgrThread recebi o erro que a chamada "
02093 << "ao sistena foi interrompida. Saltando para o loop"
02094 << endl;
02095 #endif
02096
02097 goto loop;
02098 }
02099 else if( errno == EBADF )
02100 {
02101
02102
02103
02104
02105
02106 #ifdef RIO_DEBUG2
02107 m_log << "RioNeti::NetMgrThread recebi o erro EBADF. "
02108 << "Saltando para o loop" << endl;
02109 #endif
02110
02111 goto loop;
02112 }
02113 else
02114 {
02115 #ifdef RIO_DEBUG2
02116 m_log << "[NetMgrThread] Erro " << errno << ". Sockets values:"
02117 << " unicast: " << m_sock << ", multicasts: "
02118 << multicastInfo[0].socket << "/"
02119 << multicastInfo[1].socket << endl;
02120 #endif
02121
02122 Rioperror( "Unable to Select Socket" );
02123 abort();
02124 }
02125 }
02126 gettimeofday(&xm_now, NULL);
02127 #endif
02128
02129 for( int socki = 0; socki < MULTICAST_SOCKETS; socki++ )
02130 {
02131 if( multicastInfo[socki].isConnected &&
02132 FD_ISSET( multicastInfo[socki].socket, &readfds )
02133 )
02134 {
02135 receivemcast:
02136 if( !rcvbuf )
02137 {
02138 rcvbuf = mbget();
02139 bufp = rcvbuf->nb_buf;
02140 bufl = sizeof( rcvbuf->nb_buf );
02141 }
02142 m_fraddrl = sizeof( m_fraddr );
02143
02144 int groupl = sizeof(multicastInfo[socki].remoteaddr_in);
02145
02146 #ifdef WINDOWS
02147
02148 rc = recvfrom( multicastInfo[socki].socket, bufp, bufl, 0,
02149 (struct sockaddr *)&(multicastInfo[socki].remoteaddr_in),
02150 (int FAR*) &groupl);
02151
02152 #else
02153
02154 rc = recvfrom( multicastInfo[socki].socket, bufp, bufl, MSG_WAITALL,
02155 (struct sockaddr *) &multicastInfo[socki].remoteaddr_in,
02156 (socklen_t *) &groupl );
02157 #endif
02158
02159 #ifdef WINDOWS
02160
02161 if( rc == SOCKET_ERROR )
02162 {
02163 if( rc == WSAEWOULDBLOCK )
02164 {
02165 #ifdef RIO_DEBUG2
02166 m_log << "RioNeti - erro socket WSAEWOULDBLOCK " << endl;
02167 #endif
02168
02169 goto unicastreceive;
02170 }
02171
02172 if( SOCKET_ERROR == WSAECONNRESET )
02173 {
02174 #ifdef RIO_DEBUG2
02175 m_log << "RioNeti - erro socket WSAECONNRESET " << endl;
02176 #endif
02177
02178 goto unicastreceive;
02179 }
02180 if( SOCKET_ERROR == -1 )
02181 {
02182 goto unicastreceive;
02183 }
02184 if( rc == 10004 )
02185 {
02186 goto receivemcast;
02187 }
02188 goto unicastreceive;
02189 }
02190 #else
02191
02192 if( rc == -1 )
02193 {
02194 if( errno == EAGAIN )
02195 {
02196 goto unicastreceive;
02197 }
02198
02199 if( errno == ECONNREFUSED )
02200 {
02201 #ifdef RIO_DEBUG2
02202 m_log << "RioNeti - erro socket ECONNREFUSED " << endl;
02203 #endif
02204
02205 goto unicastreceive;
02206 }
02207
02208 #ifdef RIO_DEBUG2
02209 struct in_addr clientip;
02210 clientip.s_addr =
02211 multicastInfo[socki].remoteaddr_in.sin_addr.s_addr;
02212 m_log << "RioNeti: Error Received from " << inet_ntoa(clientip)
02213 << " port " << multicastInfo[socki].remoteaddr_in.sin_port
02214 << "(" << rc << ")" << endl;
02215 Rioperror( "recvfrom" );
02216 #endif
02217
02218 if( errno == EINTR )
02219 {
02220 #ifdef RIO_DEBUG2
02221 m_log <<" Thread " << pthread_self() << " error EINTR "
02222 << endl;
02223 #endif
02224
02225 goto receivemcast;
02226 }
02227
02228 #ifdef RIO_DEBUG2
02229 m_log << ( unsigned int ) pthread_self()
02230 << " NetMgrThread receive error: errno "
02231 << errno << " [" << strerror(errno) << " ]" << endl;
02232 #endif
02233
02234 goto unicastreceive;
02235 }
02236 #endif
02237
02238 rcvbuf->nb_recvl = rc;
02239 if( m_debug & DEBUG_DATA )
02240 {
02241 m_log << " received len " << rc << endl;
02242 }
02243
02244
02245 procpkt( &rcvbuf, MULTICASTTRAFFIC );
02246 goto receivemcast;
02247 }
02248 }
02249
02250 unicastreceive:
02251
02252 if( FD_ISSET( m_sock, &readfds ) )
02253 {
02254 receive:
02255 if( !rcvbuf )
02256 {
02257 rcvbuf = mbget();
02258 bufp = rcvbuf->nb_buf;
02259 bufl = sizeof( rcvbuf->nb_buf );
02260 }
02261 m_fraddrl = sizeof( m_fraddr );
02262
02263
02264 #ifdef WINDOWS
02265
02266 rc = recvfrom(m_sock, bufp, bufl, 0,(struct sockaddr *)&m_fraddr,
02267 (int FAR*) &m_fraddrl);
02268
02269 if( rc == SOCKET_ERROR )
02270 {
02271 if( rc == WSAEWOULDBLOCK )
02272 {
02273 #ifdef RIO_DEBUG2
02274 m_log << "RioNeti - erro socket unicast WSAEWOULDBLOCK"
02275 << endl;
02276 #endif
02277
02278 goto send;
02279 }
02280
02281 if( SOCKET_ERROR == WSAECONNRESET )
02282 {
02283 #ifdef RIO_DEBUG2
02284 m_log << "RioNeti - erro socket unicast WSAECONNRESET" << endl;
02285 #endif
02286
02287 goto send;
02288 }
02289 if( SOCKET_ERROR == -1 )
02290 {
02291 #ifdef RIO_DEBUG2
02292 m_log << "rioneti - socket error -1 " << strerror(errno)
02293 << endl;
02294 #endif
02295
02296 goto send;
02297 }
02298 if( rc == 10004 )
02299 {
02300 #ifdef RIO_DEBUG2
02301 m_log << "rioneti11 - socket error: " << strerror(errno)
02302 << endl;
02303 #endif
02304
02305 goto receive;
02306 }
02307
02308 #ifdef RIO_DEBUG1
02309 m_log << "[RioNeti - NetMgrThread] Finish 2" << endl;
02310 #endif
02311
02312 exit(1);
02313 }
02314 #else
02315
02316 rc = recvfrom( m_sock, bufp, bufl, MSG_WAITALL,
02317 (struct sockaddr *)&m_fraddr,
02318 (socklen_t *) &m_fraddrl );
02319
02320 if( rc == -1 )
02321 {
02322 if( errno == EAGAIN )
02323 {
02324 goto send;
02325 }
02326
02327 if( errno == ECONNREFUSED )
02328 {
02329 #ifdef RIO_DEBUG2
02330 m_log << "RioNeti - erro socket unicast ECONNREFUSED" << endl;
02331 #endif
02332
02333 goto send;
02334 }
02335
02336 #ifdef RIO_DEBUG2
02337 struct in_addr clientip;
02338 clientip.s_addr = m_fraddr.sin_addr.s_addr;
02339 m_log << "RioNeti: Error Received UDP from " << inet_ntoa(clientip)
02340 << " port " << m_fraddr.sin_port
02341 << "(" << m_fraddrl << ")" << "(" << rc << ")" << endl;
02342 Rioperror( "recvfrom" );
02343 #endif
02344
02345 if( errno == EINTR )
02346 {
02347 #ifdef RIO_DEBUG2
02348 m_log <<" Thread " << pthread_self() << " error EINTR "
02349 << endl;
02350 #endif
02351
02352 goto receive;
02353 }
02354
02355 #ifdef RIO_DEBUG2
02356 m_log << ( unsigned int )pthread_self() << " NetMgrThread receive error: errno "
02357 << errno
02358 << " [" << strerror(errno) << " ]" << endl;
02359 #endif
02360
02361 goto send;
02362 }
02363 #endif
02364
02365 rcvbuf->nb_recvl = rc;
02366 if( m_debug & DEBUG_DATA )
02367 {
02368 m_log << " received len " << rc << endl;
02369 }
02370
02371
02372 procpkt( &rcvbuf, UNICASTTRAFFIC );
02373 goto receive;
02374 }
02375
02376 send:
02377
02378
02379
02380
02381 HashMutexLock();
02382
02383 IsSocketsSet = FD_ISSET( m_sock, &writefds );
02384
02385 if( IsSocketsSet )
02386 {
02387 writefdsp = 0;
02388
02389
02390
02391 while( m_outqf && !writefdsp )
02392 {
02393 nbp = m_outqf;
02394 void(*rtn)(NetBuf *) = nbp->nb_outrtn;
02395 m_outqf = nbp->nb_outnext;
02396 if( nbp == m_outql )
02397 {
02398 m_outql = VIRTORG(m_outqf, NetBuf, nb_outnext);
02399 }
02400 nbp->nb_outrtn = 0;
02401
02402 if( m_debug & DEBUG_MISC )
02403 {
02404 char print[256];
02405 sprintf(print, " out-disp - %p" , nbp);
02406 m_log << print << endl;
02407 }
02408 (*rtn)(nbp);
02409 }
02410 }
02411
02412 goto loop;
02413
02414
02415
02416
02417
02418
02419
02420
02421 pthread_cleanup_pop( 1 );
02422 }
02423
02424
02425
02426
02427
02428
02429
02430
02431 void RioNeti::iHashNew( NetBuf *rp )
02432 {
02433 #ifdef RIO_DEBUG1
02434 m_log << "[RioNeti - iHashNew] Start" << endl;
02435 #endif
02436
02437 #ifdef RIO_DEBUG2
02438 if( rp->nb_unicastRequested )
02439 m_log << "[RioNeti] iHashNew - Adicionando bloco unicast de reqid "
02440 << rp->nb_reqid << " da tabela Hash." << endl;
02441 else
02442 m_log << "[RioNeti] iHashNew - Adicionando bloco multicast de reqid "
02443 << rp->nb_reqid << " da tabela Hash." << endl;
02444 #endif
02445
02446 rp->nb_hisreqid = 0;
02447 retry:
02448 rp->nb_reqid = ++reqseq;
02449
02450 if( rp->nb_reqid == 0 )
02451 goto retry;
02452 if( m_debug & DEBUG_DATA )
02453 {
02454 m_log << " iHashNew reqid " << rp->nb_reqid << endl;
02455 }
02456 int ix = XM_REQMASK & rp->nb_reqid;
02457 rp->nb_link = reqarray[ix];
02458 reqarray[ix] = rp;
02459
02460 #ifdef RIO_DEBUG1
02461 m_log << "[RioNeti - iHashNew] Finish" << endl;
02462 #endif
02463 }
02464
02465
02466
02467
02468 void RioNeti::iHashRmv( NetBuf *rp )
02469 {
02470 #ifdef RIO_DEBUG1
02471 m_log << "[RioNeti - iHashRmv] Start" << endl;
02472 #endif
02473
02474 #ifdef RIO_DEBUG2
02475 if( rp->nb_unicastRequested )
02476 m_log << "[RioNeti] iHashRmv - Removendo bloco unicast de reqid "
02477 << rp->nb_reqid << " da tabela Hash." << endl;
02478 else
02479 m_log << "[RioNeti] iHashRmv - Removendo bloco multicast de reqid "
02480 << rp->nb_reqid << " da tabela Hash." << endl;
02481 #endif
02482
02483 NetBuf *p0, *p1;
02484 int ix;
02485
02486 if( m_debug & DEBUG_DATA )
02487 {
02488 m_log << " iHashRmv reqid " << rp->nb_reqid << endl;
02489 }
02490 ix = XM_REQMASK & rp->nb_reqid;
02491
02492
02493
02494 p0 = VIRTORG(reqarray[ix], NetBuf, nb_link);
02495
02496 for( p1 = p0->nb_link; p1 != NULL; p0 = p1, p1 = p0->nb_link)
02497 {
02498 if( p1 != rp ) continue;
02499
02500 p0->nb_link = rp->nb_link;
02501
02502 #ifdef RIO_DEBUG1
02503 m_log << "[RioNeti - iHashRmv] Finish 1" << endl;
02504 #endif
02505
02506 return;
02507 }
02508
02509
02510 #ifdef RIO_DEBUG2
02511 char print[256];
02512 sprintf(print, " iHashRmv - reqid %d not found [%px]" ,
02513 rp->nb_reqid, rp);
02514 m_log << print << endl;
02515 m_log << "RioNeti- chamando raise " << endl;
02516 #endif
02517
02518 raise( SIGFPE );
02519
02520 #ifdef RIO_DEBUG1
02521 m_log << "[RioNeti - iHashRmv] Finish 2" << endl;
02522 #endif
02523 }
02524
02525
02526
02527
02528 NetBuf *RioNeti::iHashFind( int reqid )
02529 {
02530 #ifdef RIO_DEBUG1
02531 m_log << "[RioNeti - iHashFind] Start" << endl;
02532 #endif
02533
02534 int ix = XM_REQMASK & reqid;
02535
02536 for(NetBuf *nbp = reqarray[ix]; nbp != 0; nbp = nbp->nb_link)
02537 {
02538 if( nbp->nb_reqid == reqid )
02539 {
02540 #ifdef RIO_DEBUG1
02541 m_log << "[RioNeti - iHashFind] Finish 1" << endl;
02542 #endif
02543
02544 return nbp;
02545 }
02546 }
02547
02548 #ifdef RIO_DEBUG1
02549 m_log << "[RioNeti - iHashFind] Finish 2" << endl;
02550 #endif
02551
02552 return 0;
02553 }
02554
02555
02556
02557
02558
02559
02560 NetBuf *RioNeti::iHashFindRst( int ip, int port, int reqid )
02561 {
02562 #ifdef RIO_DEBUG1
02563 m_log << "[RioNeti - iHashFindRst] Start" << endl;
02564 #endif
02565
02566 if( m_debug & DEBUG_DATA )
02567 {
02568 char print[256];
02569 sprintf(print, " iHashFindRst %8.8x %8.8x %8.8x" ,
02570 (unsigned)ip, (unsigned)port, (unsigned)reqid);
02571 m_log << print << endl;
02572 }
02573 for( int i = 0; i < XM_REQNUM; i++ )
02574 {
02575 for( NetBuf *nbp = reqarray[i]; nbp != 0; nbp = nbp->nb_link)
02576 {
02577 if( m_debug & DEBUG_DATA )
02578 {
02579 char print[256];
02580 sprintf(print, " \t%d-\t%p %8.8x %8.8x %8.8x" ,
02581 i, nbp, (unsigned)nbp->nb_toipaddr,
02582 (unsigned)nbp->nb_toipport,
02583 (unsigned)nbp->nb_hisreqid );
02584 m_log << print << endl;
02585 }
02586 if( (nbp->nb_hisreqid == reqid ) && ( nbp->nb_toipport == port )
02587 && ( nbp->nb_toipaddr == ip ))
02588 {
02589 #ifdef RIO_DEBUG1
02590 m_log << "[RioNeti - iHashFindRst] Finish 1" << endl;
02591 #endif
02592
02593 return nbp;
02594 }
02595 }
02596 }
02597
02598 #ifdef RIO_DEBUG1
02599 m_log << "[RioNeti - iHashFindRst] Finish 2" << endl;
02600 #endif
02601
02602 return 0;
02603 }
02604
02605
02606
02607
02608 void RioNeti::iTimeNew( NetBuf *nbp, int interval )
02609 {
02610 #ifdef RIO_DEBUG1
02611 m_log << "[RioNeti - iTimeNew] Start" << endl;
02612 #endif
02613
02614 NetBuf *p1, *p2, *p3;
02615
02616
02617
02618
02619 #ifdef WINDOWS
02620
02621 GetSystemTime(&pSystemTime);
02622 xm_now.tv_sec = pSystemTime.wSecond;
02623 xm_now.tv_usec = pSystemTime.wMilliseconds;
02624 #else
02625
02626 gettimeofday(&xm_now, NULL);
02627 #endif
02628
02629 DQUEUE_RMV(nbp, nb_tidq);
02630
02631 nbp->nb_tiwhen.tv_sec = xm_now.tv_sec + (interval >> TimeShift);
02632 nbp->nb_tiwhen.tv_usec = xm_now.tv_usec
02633 + ((interval & ((1 << TimeShift) - 1)) << (USECBITS-TimeShift));
02634 if( nbp->nb_tiwhen.tv_usec > USEC )
02635 {
02636 nbp->nb_tiwhen.tv_usec -= USEC;
02637 nbp->nb_tiwhen.tv_sec++;
02638 }
02639
02640 #ifdef RIO_DEBUG2
02641 m_log << " iTimeNew " << nbp->nb_tiwhen.tv_sec << " "
02642 << nbp->nb_tiwhen.tv_usec << " " << interval << endl;
02643 #endif
02644
02645
02646 p1 = VIRTORG( xm_timeq, NetBuf, nb_tidq );
02647
02648 for( p2 = p1->nb_tidq.linkf; p2 != p1; p2 = p2->nb_tidq.linkf )
02649 {
02650 if( ( nbp->nb_tiwhen.tv_usec < p2->nb_tiwhen.tv_usec )
02651 && ( nbp->nb_tiwhen.tv_sec == p2->nb_tiwhen.tv_sec ))
02652 break;
02653 if( nbp->nb_tiwhen.tv_sec < p2->nb_tiwhen.tv_sec )
02654 break;
02655
02656
02657 if( p2 == p2->nb_tidq.linkf )
02658 {
02659 m_log << " Thread " << pthread_self()
02660 << " Error at the xm_timeq list" << endl;
02661 p2->nb_tidq.linkf = p1;
02662 }
02663 }
02664
02665 p3 = p2->nb_tidq.linkb;
02666
02667 DQUEUE_ADD(nbp, p3, nb_tidq);
02668
02669 #ifdef DEBUGTIM
02670 prttim(" after iTimeNew" );
02671 #endif
02672
02673 #ifdef RIO_DEBUG1
02674 m_log << "[RioNeti - iTimeNew] Finish" << endl;
02675 #endif
02676 }
02677
02678
02679
02680
02681
02682
02683 void RioNeti::iTimeRmv( NetBuf *nbp )
02684 {
02685 #ifdef RIO_DEBUG1
02686 m_log << "[RioNeti - iTimeRmv] Start" << endl;
02687 #endif
02688
02689 DQUEUE_RMV(nbp, nb_tidq);
02690 nbp->nb_tidq.linkf = nbp->nb_tidq.linkb = nbp;
02691
02692 #ifdef RIO_DEBUG1
02693 m_log << "[RioNeti - iTimeRmv] Finish" << endl;
02694 #endif
02695 }
02696
02697
02698 void RioNeti::prttim( const char *s )
02699 {
02700 #ifdef RIO_DEBUG1
02701 m_log << "[RioNeti - prttim] Start" << endl;
02702 #endif
02703
02704 NetBuf *tep, *p0;
02705
02706 p0 = VIRTORG(xm_timeq, NetBuf, nb_tidq);
02707 char print[256];
02708 sprintf(print, " prttim: @ xm_timeq %p, f/l %p %p %s" ,
02709 p0, xm_timeq.linkf, xm_timeq.linkb, s);
02710 m_log << print << endl;
02711
02712 for(tep = p0->nb_tidq.linkf; tep != p0; tep = tep->nb_tidq.linkf)
02713 {
02714 char print2[256];
02715 sprintf( print2,
02716 " %p te f/l %p %p sec/usec %8.8x %8.8x "
02717 " rtn %p" ,
02718 tep,
02719 tep->nb_tidq.linkf,
02720 tep->nb_tidq.linkb,
02721 (unsigned int) tep->nb_tiwhen.tv_sec,
02722 (unsigned int) tep->nb_tiwhen.tv_usec,
02723 tep->nb_callback);
02724 m_log << print2 << endl;
02725 }
02726
02727 #ifdef RIO_DEBUG1
02728 m_log << "[RioNeti - prttim] Finish" << endl;
02729 #endif
02730 }
02731
02732
02733 int RioNeti::nbSend( NetBuf *rp, char *FragmentData, int FragmentSize, int IP,
02734 int Port )
02735 {
02736 #ifdef RIO_DEBUG1
02737 m_log << "[RioNeti - nbSend] Start" << endl;
02738 #endif
02739
02740 int rc;
02741 int ip;
02742
02743
02744 char *sendp;
02745 int sendl;
02746 int DestIP;
02747 int DestPort;
02748
02749
02750
02751 if( rp != NULL )
02752 {
02753
02754 sendp = rp->nb_sendp;
02755 sendl = rp->nb_sendl;
02756 DestIP = rp->nb_toipaddr;
02757 DestPort = rp->nb_toipport;
02758 #ifdef RIO_DEBUG2
02759 m_log << "[RioNeti - nbSend] NetBuf: "
02760 << "nb_reqid " << rp->nb_reqid
02761 << " nb_tiwhen " << rp->nb_tiwhen.tv_sec
02762 << " " << rp->nb_tiwhen.tv_usec
02763 << " rq_fragsh " << rp->rq_fragsh
02764 << " rq_fragburst " << rp->rq_fragburst
02765 << " rq_burstcnt " << rp->rq_burstcnt
02766 << " rq_fraglistn " << rp->rq_fraglistn
02767 << " nb_sendack " << rp->nb_sendack
02768 << " nb_hisreqid " << rp->nb_hisreqid
02769 << " nb_bufp type " << (int)((RioPkt::pktp *) rp->nb_bufp)->type
02770 << endl;
02771 #endif
02772 }
02773 else
02774 {
02775 #ifdef RIO_DEBUG2
02776 m_log << "[RioNeti - nbSend] rp = NULL" << endl;
02777 #endif
02778
02779 sendp = FragmentData;
02780 sendl = FragmentSize;
02781 DestIP = IP;
02782 DestPort = Port;
02783 }
02784
02785 #ifdef RIO_DEBUG2
02786 in_addr dest;
02787 dest.s_addr = DestIP;
02788 m_log << "[RioNeti - nbSend] DestIP = " << inet_ntoa( dest )
02789 << " , DestPort = " << htons( DestPort ) << endl;
02790 #endif
02791
02792 cnt_sndpkt++;
02793 m_toaddr.sin_addr.s_addr = DestIP;
02794 m_toaddr.sin_port = DestPort;
02795 ip = htonl( DestIP );
02796
02797 sendto:
02798 if( ( ip & htonl( inet_addr( "224.0.0.0" ) ) ) ==
02799 htonl( inet_addr( "224.0.0.0" ) )
02800 )
02801 {
02802
02803 rc = sendto( multicastInfo[0].socket, sendp, sendl,
02804 MSG_WAITALL, (struct sockaddr *)&m_toaddr, m_toaddrl );
02805 if( rc < 0 )
02806 {
02807 Rioperror( "nbsend[1]: " );
02808 }
02809 }
02810 else
02811 {
02812 #ifdef RIO_DEBUG2
02813 m_log << "SendTo DEBUG UDP (nbSend):"
02814 << " m_sock " << (int) m_sock
02815 << " sendp " << (void *) sendp
02816 << " sendl " << (int) sendl;
02817 if( rp != NULL )
02818 m_log << " reqid " << rp->nb_reqid << endl;
02819 else
02820 m_log << endl;
02821 m_log << "RioNeti::nbSend o UDP sera usado para enviar um pacote"
02822 << endl;
02823 #endif
02824
02825 rc = sendto( m_sock, sendp, sendl, MSG_WAITALL,
02826 (struct sockaddr *)&m_toaddr, m_toaddrl);
02827
02828 #ifdef RIO_DEBUG2
02829 m_log << "SendTo RESLT (nbSend):"
02830 << " rc " << rc
02831 << endl;
02832 #endif
02833
02834 if( rc < 0 )
02835 {
02836 m_log << "nbsend[2]: " << "IP = "
02837 << inet_ntoa( m_toaddr.sin_addr ) << ", port = "
02838 << ntohs( m_toaddr.sin_port ) << endl;
02839 Rioperror( "nbsend[2]: " );
02840 }
02841 }
02842
02843 #ifdef WINDOWS
02844
02845 if( rc == SOCKET_ERROR )
02846 {
02847
02848
02849
02850
02851
02852
02853 if( SOCKET_ERROR == WSAECONNRESET )
02854 {
02855 #ifdef RIO_DEBUG2
02856 m_log << "RioNeti - Error WSAECONNRESET " << endl;
02857 #endif
02858
02859 goto sendto;
02860 }
02861
02862 if( SOCKET_ERROR == WSAEWOULDBLOCK )
02863 {
02864 writefdsp = &writefds;
02865
02866 #ifdef RIO_DEBUG2
02867 m_log << "RioNeti - Error WSAEWOULDBLOCK " << endl;
02868 #endif
02869
02870 #ifdef RIO_DEBUG1
02871 m_log << "[RioNeti - nbSend] Finish 4" << endl;
02872 #endif
02873
02874 return 1;
02875 }
02876 }
02877 #else
02878
02879 if( rc < 0 )
02880 {
02881
02882
02883
02884
02885
02886 if( errno == ECONNREFUSED )
02887 {
02888 #ifdef RIO_DEBUG2
02889 m_log << "RioNeti - Error ECONNREFUSED " << endl;
02890 #endif
02891
02892 goto sendto;
02893 }
02894
02895 if( errno == EAGAIN )
02896 {
02897 #ifdef RIO_DEBUG2
02898 m_log << "RioNeti - Error EAGAIN. Failed!! " << endl;
02899 #endif
02900
02901 goto sendto;
02902 }
02903
02904 if( errno == ENOBUFS )
02905 {
02906 #ifdef RIO_DEBUG2
02907 m_log << "RioNeti - Error ENOBUFS. Failed!! " << endl;
02908 #endif
02909
02910 goto sendto;
02911 }
02912
02913 #ifdef RIO_DEBUG2
02914 struct in_addr clientip;
02915 clientip.s_addr = DestIP;
02916 if( rp != NULL )
02917 {
02918 m_log << "RioNeti: Error nbSend to " << inet_ntoa( clientip )
02919 << " port " << DestPort
02920 << " frag " << rp->rq_fragsh
02921 << " reqid " << rp->nb_reqid
02922 << " hisreqid " << rp->nb_hisreqid
02923 << " size " << sendl << endl;
02924 char print[256];
02925 sprintf(print, " sendto error rc %d, errno %d [%s]" ,
02926 rc, errno, strerror(errno));
02927 char print2[256];
02928 sprintf(print, " m_sock %d sendp %p sendl %d" ,
02929 m_sock, sendp, sendl);
02930 m_log << print << endl << print2 << endl;
02931 }
02932 else
02933 {
02934 m_log << "RioNeti: Error nbSend to " << inet_ntoa( clientip )
02935 << " port " << DestPort
02936 << " size " << sendl << endl;
02937 char print[256];
02938 sprintf(print, " sendto error rc %d, errno %d [%s]" ,
02939 rc, errno, strerror(errno));
02940 char print2[256];
02941 sprintf(print, " m_sock %d sendp %p sendl %d" ,
02942 m_sock, sendp, sendl);
02943 m_log << print << endl << print2 << endl;
02944 }
02945 if( m_debug & DEBUG_FRAG )
02946 NetMgr::dumppkt( " -- m_toaddr --\n", (char *)&m_toaddr,
02947 sizeof(m_toaddr));
02948 Rioperror( "sendto" );
02949 #endif
02950 }
02951 #endif
02952
02953 if( m_debug & DEBUG_FRAG )
02954 {
02955 NetMgr::dumppkt(" -- sendpkt --\n", sendp, sendl);
02956 }
02957
02958
02959
02960 if( m_LogRotation != NULL )
02961 {
02962 char StrPacketSize[ 5 ];
02963 sprintf( StrPacketSize, "%d", sendl );
02964 if( m_LogRotation->NewLogLine( time( NULL ), StrPacketSize ) != S_OK )
02965 m_log << "RioNeti::nbSend erro ao escrever no log" << endl;
02966 }
02967
02968
02969
02970
02971
02972
02973 #ifdef RIO_DEBUG1
02974 m_log << "[RioNeti - nbSend] Finish 5" << endl;
02975 #endif
02976
02977 return 0;
02978 }
02979
02980
02981
02982
02983 int RioNeti::iSend( NetBuf *nbp )
02984 {
02985 int result;
02986
02987 #ifdef RIO_DEBUG1
02988 m_log << "[RioNeti - iSend] Start" << endl;
02989 #endif
02990
02991 #ifdef RIO_DEBUG2
02992 m_log << "RioNeti::iSend - sending a packet of type "
02993 << (int)((RioPkt::pktp *) nbp->nb_bufp)->type << endl;
02994 #endif
02995
02996 if( ( nbp->nb_sendack != 0 ) ||
02997 ( (int)((RioPkt::pktp *) nbp->nb_bufp)->type != TYPE_FRAG ) ||
02998 ( m_StreamControl == NULL ) )
02999 {
03000 #ifdef RIO_DEBUG2
03001 m_log << "RioNeti::iSend usando a funcao nbsend, nb_sendack="
03002 << nbp->nb_sendack << endl;
03003 #endif
03004
03005 result = nbSend( nbp );
03006 }
03007 else
03008 {
03009 #ifdef RIO_DEBUG2
03010 m_log << "RioNeti::iSend usando a funcao Put, nbp = " << nbp
03011 << ", nbp->nb_sendack=" << nbp->nb_sendack
03012 << ", nbp->rq_fragmax = " << nbp->rq_fragmax
03013 << ", nbp->rq_fragsh = " << nbp->rq_fragsh << endl;
03014 #endif
03015 result = m_StreamControl->Put( nbp );
03016 }
03017
03018 #ifdef RIO_DEBUG2
03019 m_log << "RioNeti::iSend result = " << result << endl;
03020 #endif
03021
03022 #ifdef RIO_DEBUG1
03023 m_log << "[RioNeti - iSend] Finish" << endl;
03024 #endif
03025
03026 return result;
03027 }
03028
03029
03030
03031
03032
03033
03034
03035
03036 NetBuf *RioNeti::pSend( int ipadr, int port, int reqid, call_type type )
03037 {
03038 #ifdef RIO_DEBUG1
03039 m_log << "[RioNeti - pSend] Start" << endl;
03040 #endif
03041
03042 NetBuf *nbp = mbget();
03043 nbp->nb_toipaddr = ipadr;
03044 nbp->nb_toipport = port;
03045 nbp->rq_retry = 5;
03046 nbp->nb_sendp = nbp->nb_bufp;
03047 nbp->nb_datalm = nbp->nb_bufp + nbp->nb_bufl;
03048
03049 RioPkt::pktp *p = (RioPkt::pktp *) nbp->nb_bufp;
03050
03051 p->type = type;
03052 p->x1 = 0;
03053 p->x2 = 0;
03054 p->reqid = reqid;
03055
03056
03057
03058 nbp->nb_datap = nbp->nb_bufp + sizeof(RioPkt::pktp);
03059
03060 iHashNew( nbp );
03061
03062 *((u32 *) nbp->nb_datap) = nbp->nb_reqid;
03063 nbp->nb_datap += sizeof(u32);
03064
03065 #ifdef RIO_DEBUG1
03066 m_log << "[RioNeti - pSend] Finish" << endl;
03067 #endif
03068
03069 return nbp;
03070 }
03071
03072
03073
03074
03075 void RioNeti::qSend( NetBuf *nbp )
03076 {
03077 #ifdef RIO_DEBUG1
03078 m_log << "[RioNeti - qSend] Start" << endl;
03079 #endif
03080
03081 nbp->nb_sendl = nbp->nb_datap - nbp->nb_sendp;
03082
03083 RioPkt::pktp *p = (RioPkt::pktp *) nbp->nb_sendp;
03084
03085 p->auth.makeauth(nbp->nb_sendp, nbp->nb_sendl, nbp);
03086
03087 gSend( nbp );
03088
03089 #ifdef RIO_DEBUG1
03090 m_log << "[RioNeti - qSend] Finish" << endl;
03091 #endif
03092 }
03093
03094
03095
03096 void RioNeti::gSend( NetBuf *nbp )
03097 {
03098 #ifdef RIO_DEBUG1
03099 m_log << "[RioNeti - gSend] Start" << endl;
03100 #endif
03101
03102 RioNeti *netiPtr = nbp->nb_rioneti;
03103
03104 if( netiPtr->writefdsp || netiPtr->iSend(nbp) )
03105 {
03106
03107 netiPtr->iOutAdd( nbp, &gSend );
03108 }
03109
03110 #ifdef RIO_DEBUG1
03111 m_log << "[RioNeti - gSend] Finish" << endl;
03112 #endif
03113 }
03114
03115
03116
03117
03118
03119 void RioNeti::fSend( NetBuf *nbp )
03120 {
03121 #ifdef RIO_DEBUG1
03122 m_log << "[RioNeti - fSend] Start" << endl;
03123 #endif
03124
03125 RioNeti *netiPtr = nbp->nb_rioneti;
03126
03127 if( netiPtr->writefdsp || netiPtr->iSend(nbp) )
03128 {
03129 netiPtr->iOutAdd( nbp, &fSend );
03130 }
03131 else
03132 {
03133 netiPtr->mbfree( nbp );
03134 }
03135
03136 #ifdef RIO_DEBUG1
03137 m_log << "[RioNeti - fSend] Finish" << endl;
03138 #endif
03139 }
03140
03141 void RioNeti::qComplete( NetBuf *nbp, int result )
03142 {
03143 #ifdef RIO_DEBUG1
03144 m_log << "[RioNeti - qComplete] Start" << endl;
03145 #endif
03146
03147
03148 if( nbp->nb_result == ERROR_RIONETI )
03149 {
03150 nbp->nb_result = result;
03151 }
03152
03153
03154
03155 iHashRmv(nbp);
03156 iOutRmv(nbp);
03157 iTimeRmv(nbp);
03158
03159
03160 nbp->nb_link = 0;
03161 m_cmpltl->nb_link = nbp;
03162 m_cmpltl = nbp;
03163
03164 #ifdef RIO_DEBUG1
03165 m_log << "[RioNeti - qComplete] Finish" << endl;
03166 #endif
03167 }
03168
03169
03170
03171
03172
03173 int RioNeti::iRetry( NetBuf *nbp )
03174 {
03175 #ifdef RIO_DEBUG1
03176 m_log << "[RioNeti - iRetry] Start" << endl;
03177 #endif
03178
03179 if( nbp->rq_retry <= 0 )
03180 {
03181 #ifdef RIO_DEBUG2
03182 m_log << "iRetry nbp->nb_result " << nbp->nb_result << endl;
03183 #endif
03184
03185 qComplete(nbp, ERROR_RIONETI+0xe0);
03186 if( m_debug & (1 << 9) )
03187 {
03188 abort();
03189 }
03190
03191 #ifdef RIO_DEBUG1
03192 m_log << "[RioNeti - iRetry] Finish 1" << endl;
03193 #endif
03194
03195 return 0;
03196 }
03197
03198 nbp->rq_retry--;
03199 cnt_retry++;
03200
03201 #ifdef RIO_DEBUG2
03202 m_log << "iRetry tentanto enviar novamente o pacote. Restam "
03203 << nbp->rq_retry << " tentativas adiconais. Foram feitas, ate o "
03204 << "momento " << cnt_retry << " tentativas de reenvio no total "
03205 << "(para todos os fragmentos enviados pelo servidor)" << endl;
03206 #endif
03207
03208 #ifdef RIO_DEBUG1
03209 m_log << "[RioNeti - iRetry] Finish 2" << endl;
03210 #endif
03211
03212 return 1;
03213 }
03214
03215
03216
03217
03218 void RioNeti::iSendFrag( NetBuf *nbp )
03219 {
03220 #ifdef RIO_DEBUG1
03221 m_log << "[RioNeti - iSendFrag] Start" << endl;
03222 #endif
03223
03224 RioNeti *netiPtr = nbp->nb_rioneti;
03225
03226 do
03227 {
03228
03229
03230
03231
03232
03233
03234 if( netiPtr->writefdsp || netiPtr->iSend( nbp ) )
03235 {
03236 if( nbp->nb_sendack != 0 )
03237 {
03238 netiPtr->iOutAdd(nbp, &iSendFrag);
03239 }
03240
03241
03242
03243 if( nbp->rq_fragburst > NetBuf::RQ_BURSTMAX )
03244 {
03245 nbp->rq_fragburst = NetBuf::RQ_BURSTMAX;
03246
03247 if( nbp->rq_burstcnt > nbp->rq_fragburst )
03248 {
03249 nbp->rq_burstcnt = nbp->rq_fragburst;
03250 }
03251 }
03252
03253 #ifdef RIO_DEBUG2
03254 m_log << "iSendFrag reqid " << nbp->nb_reqid
03255 << " fragburst = "<< nbp->rq_fragburst
03256 << " burst = "<< nbp->rq_burstcnt <<endl;
03257 #endif
03258
03259 #ifdef RIO_DEBUG1
03260 m_log << "[RioNeti - iSendFrag] Finish 1" << endl;
03261 #endif
03262
03263 return;
03264 }
03265 } while(!netiPtr->iSendFragNext(nbp));
03266
03267 #ifdef RIO_DEBUG1
03268 m_log << "[RioNeti - iSendFrag] Finish 2" << endl;
03269 #endif
03270 }
03271
03272
03273
03274
03275
03276 void RioNeti::iSendFragBuild( NetBuf *nbp, int fragnum )
03277 {
03278 #ifdef RIO_DEBUG1
03279 m_log << "[RioNeti - iSendFragBuild] Start" << endl;
03280 #endif
03281
03282 char *dp, *pp;
03283 int dl;
03284
03285 int maxsize = 0;
03286
03287 nbp->rq_bits[fragnum] = nbp->rq_fragackseq;
03288 pp = nbp->nb_bufp;
03289 nbp->nb_sendp = pp;
03290
03291 RioPkt::pktf0 *p = (RioPkt::pktf0 *) pp;
03292 p->pfx.type = TYPE_FRAG;
03293 p->pfx.x1 = 0;
03294 p->pfx.fragnum = htons(fragnum);
03295 p->pfx.reqid = nbp->nb_hisreqid;
03296
03297 if( fragnum == 0 )
03298 {
03299 dp = nbp->nb_blockp;
03300 dl = nbp->rq_fraglen0;
03301 p->ackreqid = nbp->nb_reqid;
03302 p->fragmax = htons(nbp->rq_fragmax);
03303 p->maxpktl = htons(nbp->rq_maxpktl);
03304 p->auth.makeauth(nbp->nb_blockp, nbp->nb_blockl, nbp);
03305 pp += sizeof(RioPkt::pktf0);
03306 maxsize = maxsize + sizeof(RioPkt::pktf0);
03307 }
03308 else
03309 {
03310 dp = nbp->nb_blockp +
03311 nbp->rq_fraglen0 + (fragnum-1)*(nbp->rq_fraglen);
03312 dl = nbp->rq_fraglen;
03313 pp += sizeof(RioPkt::pktfx);
03314 maxsize = maxsize + sizeof(RioPkt::pktfx);
03315 }
03316
03317 if( dp+dl > nbp->nb_blockp+nbp->nb_blockl)
03318 {
03319 dl = nbp->nb_blockp+nbp->nb_blockl - dp;
03320 if( dl <= 0)
03321 {
03322 dl = 0;
03323 }
03324 }
03325
03326 maxsize = maxsize + dl;
03327
03328 if( maxsize > FRAGMENTSIZE )
03329 RioErr << "RioNeti::iSendFragBuild maxsize = " << maxsize << " > "
03330 << FRAGMENTSIZE << endl;
03331
03332 memcpy(pp, dp, dl);
03333 nbp->nb_sendl = pp + dl - nbp->nb_sendp;
03334
03335 #ifdef RIO_DEBUG1
03336 m_log << "[RioNeti - iSendFragBuild] Finish" << endl;
03337 #endif
03338 }
03339
03340
03341
03342
03343
03344
03345
03346
03347
03348 int RioNeti::iSendFragNext( NetBuf *nbp )
03349 {
03350 #ifdef RIO_DEBUG1
03351 m_log << "[RioNeti - iSendFragNext] Start" << endl;
03352 #endif
03353
03354 int i, frag;
03355
03356 if( m_debug & DEBUG_FRAG)
03357 {
03358 m_log << " iSendFragNext - rq_burstcnt " << nbp->rq_burstcnt
03359 << " rq_fraglistn " << nbp->rq_fraglistn << " rq_fragackseq "
03360 << nbp->rq_fragackseq << endl;
03361 m_log << " rq_fraglist: ";
03362 for(i = 0; i < NetBuf::RQ_MAXACKLIST; i++)
03363 {
03364 m_log << " " << nbp->rq_fraglist[i] << " ["
03365 << nbp->rq_bits[nbp->rq_fraglist[i]] << "]";
03366 }
03367 m_log << " " << endl;
03368 }
03369
03370 if( nbp->nb_sendack != 0 )
03371 {
03372 if( !nbp->rq_burstcnt)
03373 {
03374 #ifdef RIO_DEBUG1
03375 m_log << "[RioNeti - iSendFragNext] Finish 1" << endl;
03376 #endif
03377
03378 return 1;
03379 }
03380
03381
03382
03383 for(i = 0; i < nbp->rq_fraglistn; i++)
03384 {
03385 frag = nbp->rq_fraglist[i];
03386 if( nbp->rq_fragackseq - nbp->rq_bits[frag] > 6)
03387 {
03388 nbp->rq_burstcnt--;
03389 iSendFragBuild(nbp, frag);
03390
03391 #ifdef RIO_DEBUG1
03392 m_log << "[RioNeti - iSendFragNext] Finish 2" << endl;
03393 #endif
03394
03395 return 0;
03396 }
03397 }
03398 }
03399
03400
03401 if( nbp->rq_fragsh < nbp->rq_fragmax )
03402 {
03403 nbp->rq_burstcnt--;
03404 iSendFragBuild(nbp, nbp->rq_fragsh++);
03405
03406 #ifdef RIO_DEBUG1
03407 m_log << "[RioNeti - iSendFragNext] Finish 3" << endl;
03408 #endif
03409
03410 return 0;
03411 }
03412
03413 if( nbp->nb_sendack != 0 )
03414 {
03415
03416
03417 if( nbp->rq_burstcnt > 3) nbp->rq_burstcnt = 3;
03418 for(i = 0; i < nbp->rq_fraglistn; i++)
03419 {
03420 frag = nbp->rq_fraglist[i];
03421 if( nbp->rq_fragackseq - nbp->rq_bits[frag] > 0 )
03422 {
03423 nbp->rq_burstcnt--;
03424 iSendFragBuild(nbp, frag);
03425
03426 #ifdef RIO_DEBUG1
03427 m_log << "[RioNeti - iSendFragNext] Finish 4" << endl;
03428 #endif
03429
03430 return 0;
03431 }
03432 }
03433
03434
03435
03436 if( nbp->rq_fraglistn )
03437 {
03438 nbp->rq_burstcnt = 0;
03439 iSendFragBuild(nbp, nbp->rq_fraglist[0]);
03440
03441 #ifdef RIO_DEBUG1
03442 m_log << "[RioNeti - iSendFragNext] Finish 5" << endl;
03443 #endif
03444
03445 return 0;
03446 }
03447 }
03448
03449
03450 #ifdef RIO_DEBUG1
03451 m_log << "[RioNeti - iSendFragNext] Finish 6" << endl;
03452 #endif
03453
03454 return 1;
03455 }
03456
03457
03458
03459
03460
03461 void RioNeti::iSendFragAck( NetBuf *nbp )
03462 {
03463 #ifdef RIO_DEBUG1
03464 m_log << "[RioNeti - iSendFragAck] Start" << endl;
03465 #endif
03466
03467 RioNeti *netiPtr = nbp->nb_rioneti;
03468
03469
03470 if( !nbp->rq_bits[0] )
03471 {
03472 #ifdef RIO_DEBUG1
03473 m_log << "[RioNeti - iSendFragAck] Finish 1" << endl;
03474 #endif
03475
03476 return;
03477 }
03478
03479 nbp->nb_sendp = nbp->nb_bufp;
03480
03481 RioPkt::pktfa *p = (RioPkt::pktfa *) nbp->nb_sendp;
03482 p->type = TYPE_FRAGACK;
03483 p->x1 = 0;
03484 p->fragackseq = htons(++nbp->rq_fragackseq);
03485 p->reqid = nbp->nb_hisreqid;
03486 p->fragnum = htons(nbp->rq_fraghav);
03487 p->fraghigh = htons(nbp->rq_fraghigh);
03488
03489 #ifdef RIO_DEBUG2
03490 m_log << " isendfragack reqid " << nbp->nb_reqid
03491 << " ackseq " << nbp->rq_fragackseq
03492 << " fragnum " << nbp->rq_fraghav
03493 << " high " << nbp->rq_fraghigh << endl;
03494 #endif
03495
03496 u16 *pl = (u16 *) (nbp->nb_sendp + sizeof(RioPkt::pktfa));
03497 int i = NetBuf::RQ_MAXACKLIST;
03498 for(u16 x = nbp->rq_fraglow+1; x < nbp->rq_fraghigh; x++)
03499 {
03500 if( !nbp->rq_bits[x] )
03501 {
03502 if( i-- > 0 )
03503 *pl++ = htons(x);
03504 else break;
03505 }
03506 }
03507
03508 nbp->nb_sendl = ((char *) pl) - nbp->nb_sendp;
03509 p->auth.makeauth(nbp->nb_sendp, nbp->nb_sendl, nbp);
03510
03511 if( netiPtr->iSend(nbp) )
03512 {
03513 netiPtr->iOutAddTop(nbp, &iSendFragAck);
03514
03515 #ifdef RIO_DEBUG1
03516 m_log << "[RioNeti - iSendFragAck] Finish 2" << endl;
03517 #endif
03518
03519 return;
03520 }
03521 else
03522 {
03523 if( nbp->rq_needack <= -1000 )
03524 {
03525
03526 netiPtr->qComplete(nbp, 0);
03527
03528 #ifdef RIO_DEBUG1
03529 m_log << "[RioNeti - iSendFragAck] Finish 3" << endl;
03530 #endif
03531
03532 return;
03533 }
03534 }
03535 nbp->rq_needack = NetBuf::RQ_ACKEVERY;
03536
03537 #ifdef RIO_DEBUG1
03538 m_log << "[RioNeti - iSendFragAck] Finish 4" << endl;
03539 #endif
03540
03541 }
03542
03543
03544
03545 void RioNeti::iSendAck( int ipaddr, int port, int reqid )
03546 {
03547 #ifdef RIO_DEBUG1
03548 m_log << "[RioNeti - iSendAck] Start" << endl;
03549 #endif
03550
03551 NetBuf *nbp;
03552 RioPkt::pktp *p;
03553
03554 if( m_debug & DEBUG_CMDS)
03555 {
03556 char print[256];
03557 sprintf( print, " SendAck: %8.8x %8.8x %8.8x" ,
03558 (unsigned)ipaddr, (unsigned)port, (unsigned)reqid );
03559 m_log << print << endl;
03560 }
03561
03562 nbp = mbget();
03563 nbp->nb_toipaddr = ipaddr;
03564 nbp->nb_toipport = port;
03565 nbp->nb_sendp = nbp->nb_buf;
03566 p = (RioPkt::pktp *) nbp->nb_sendp;
03567 p->type = TYPE_ACK;
03568 p->x1 = 0;
03569 p->x2 = 0;
03570 p->reqid = reqid;
03571 nbp->nb_sendl = sizeof( RioPkt::pktp );
03572 p->auth.makeauth( nbp->nb_sendp, nbp->nb_sendl, nbp );
03573 fSend( nbp );
03574
03575 #ifdef RIO_DEBUG1
03576 m_log << "[RioNeti - iSendAck] Finish" << endl;
03577 #endif
03578 }
03579
03580
03581
03582 void RioNeti::SendRst( int ipaddr, int port, int reqid, int result )
03583 {
03584 #ifdef RIO_DEBUG1
03585 m_log << "[RioNeti - SendRst] Start" << endl;
03586 #endif
03587
03588 NetBuf *nbp;
03589 RioPkt::pktp *p;
03590
03591 HashMutexLock( "sendrst1" );
03592 cnt_rst1++;
03593
03594 if( m_debug & DEBUG_CMDS)
03595 {
03596 char print[256];
03597 sprintf( print, " SendRst: %8.8x %8.8x %8.8x %8.8x" ,
03598 (unsigned)ipaddr, (unsigned)port, (unsigned)reqid,
03599 (unsigned)result );
03600 m_log << print << endl;
03601 }
03602
03603
03604 if( xm_now.tv_sec == xm_rstsec)
03605 {
03606 if( xm_rstip == ipaddr && xm_rstport == port
03607 && xm_rstreqid == reqid)
03608 {
03609 HashMutexUnlock( "sendrst2" );
03610
03611 #ifdef RIO_DEBUG1
03612 m_log << "[RioNeti - SendRst] Finish 1" << endl;
03613 #endif
03614
03615 return;
03616 }
03617 }
03618
03619 xm_rstsec = xm_now.tv_sec;
03620 xm_rstip = ipaddr;
03621 xm_rstport = port;
03622 xm_rstreqid = reqid;
03623
03624 nbp = mbget();
03625 nbp->nb_toipaddr = ipaddr;
03626 nbp->nb_toipport = port;
03627 nbp->nb_sendp = nbp->nb_buf;
03628 p = (RioPkt::pktp *) nbp->nb_sendp;
03629 p->type = TYPE_RST;
03630 p->x1 = 0;
03631 p->x2 = htons((u16) result);
03632 p->reqid = reqid;
03633 nbp->nb_sendl = sizeof(RioPkt::pktp);
03634 p->auth.makeauth(nbp->nb_sendp, nbp->nb_sendl, nbp);
03635
03636 fSend(nbp);
03637 cnt_rst2++;
03638 HashMutexUnlock( "sendrst3" );
03639
03640 #ifdef RIO_DEBUG1
03641 m_log << "[RioNeti - SendRst] Finish 2" << endl;
03642 #endif
03643 }
03644
03645
03646
03647
03648
03649
03650
03651
03652
03653 void RioNeti::procpktrst( char *pktp, int pktl )
03654 {
03655 #ifdef RIO_DEBUG1
03656 m_log << "[RioNeti - procpktrst] Start" << endl;
03657 #endif
03658
03659 NetBuf *rp;
03660
03661 RioPkt::pktp *p = (RioPkt::pktp *)pktp;
03662
03663 if( (unsigned)pktl != sizeof( RioPkt::pktp ) )
03664 {
03665 err_pktsize++;
03666
03667 #ifdef RIO_DEBUG1
03668 m_log << "[RioNeti - procpktrst] Finish 1" << endl;
03669 #endif
03670
03671 return;
03672 }
03673
03674 HashMutexLock( "prpktrst" );
03675 rp = iHashFindRst( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port, p->reqid );
03676
03677 if( rp == 0 )
03678 goto unlockret;
03679
03680 if( !p->auth.checkauth( pktp, pktl, rp ) )
03681 goto unlockret;
03682
03683
03684 if( rp->rq_fragsh >= rp->rq_fragmax )
03685 {
03686 qComplete( rp, 0 );
03687 }
03688 else
03689 {
03690
03691
03692 qComplete( rp, ntohs( p->x2 ) );
03693 }
03694
03695 unlockret:
03696 HashMutexUnlock( "prpktrst" );
03697
03698 #ifdef RIO_DEBUG1
03699 m_log << "[RioNeti - procpktrst] Finish 2" << endl;
03700 #endif
03701
03702 return;
03703 }
03704
03705
03706
03707
03708
03709
03710
03711 void RioNeti::procpktfrag( char *pktp, int pktl, RioStreamType traffic )
03712 {
03713 #ifdef RIO_DEBUG1
03714 m_log << "[RioNeti - procpktfrag] Start" << endl;
03715 #endif
03716
03717 int num;
03718 RioNetiReqIdItem *current = NULL;
03719 RioNetiReqIdItem *previous = NULL;
03720 RioPkt::pktfx *p = (RioPkt::pktfx *)pktp;
03721
03722 HashMutexLock();
03723
03724
03725 num = ntohs( p->fragnum );
03726
03727 #ifdef RIO_DEBUG2
03728 m_log << "[RioNeti - procpktfrag] fragnum = " << num << ", reqid = "
03729 << p->reqid << endl;
03730 #endif
03731
03732
03733
03734
03735
03736
03737
03738 if( ( traffic == MULTICASTTRAFFIC ) && callback_transport )
03739 {
03740 callback_t client_callback = callback_transport->callback;
03741
03742 callback_transport->block = p->reqid;
03743
03744 int result = reqid_table->Search( p->reqid, ¤t, &previous );
03745
03746 if( ( result == RESULT_NETIREQT_SEARCH_NOT_FOUND ) ||
03747 ( result == RESULT_NETIREQT_SEARCH_EMPTY_LIST )
03748 )
03749 {
03750 RioCallBackTransport *multicast_callback;
03751 multicast_callback = new RioCallBackTransport;
03752 multicast_callback->block = p->reqid;
03753 multicast_callback->callback = this->callback;
03754 multicast_callback->stream = callback_transport->stream;
03755 multicast_callback->object = callback_transport->object;
03756 multicast_callback->main_callback =
03757 callback_transport->main_callback;
03758
03759 RioNetiReqIdItem *item = new RioNetiReqIdItem( p->reqid );
03760
03761
03762
03763
03764 reqid_table->Insert( item );
03765
03766 char *auxBuf = new char[ buffer_stream->getBlockSize() ];
03767 item->SetBuf( auxBuf );
03768
03769
03770
03771
03772 callback_transport->block = p->reqid;
03773
03774 HashMutexUnlock();
03775
03776 #ifdef RIO_DEBUG2
03777 m_log << "[procpktfrag] Chamando ExpectData. "
03778 << "callback_transport->block = "
03779 << callback_transport->block << endl;
03780 #endif
03781
03782 int localreqid = ExpectData( auxBuf,
03783 buffer_stream->getBlockSize(),
03784 client_callback, multicast_callback,
03785 MULTICASTTRAFFIC, 0 );
03786
03787 HashMutexLock();
03788
03789 item->SetLreqid( localreqid );
03790 current = item;
03791 p->reqid = current->GetLreqid();
03792
03793 #ifdef RIO_DEBUG2
03794 m_log << "[RioNeti] reqid " << p->reqid << " é relativo ao bloco "
03795 << callback_transport->block << endl;
03796 #endif
03797
03798
03799 multicast_callback->data = current->GetBuf();
03800
03801 #ifdef RIO_DEBUG2
03802 m_log << "[1]recebido fragmento " << num << " do bloco de reqid "
03803 << current->GetPreqid() << ", traffic = " << traffic
03804 << endl;
03805 #endif
03806 }
03807 else
03808 {
03809 #ifdef RIO_DEBUG2
03810 m_log << "[2]recebido fragmento " << num << " do bloco de reqid "
03811 << p->reqid << ", traffic = MULTICASTTRAFFIC" << endl;
03812 #endif
03813
03814 if( result == RESULT_NETIREQT_SEARCH_FOUND )
03815 {
03816 p->reqid = current->GetLreqid();
03817 }
03818 else
03819 {
03820 #ifdef RIO_DEBUG2
03821 m_log << "RioNeti - retornando da procpkt " << endl;
03822 #endif
03823
03824 HashMutexUnlock();
03825
03826 #ifdef RIO_DEBUG1
03827 m_log << "[RioNeti - procpktfrag] Finish 1" << endl;
03828 #endif
03829
03830 return;
03831 }
03832 }
03833 }
03834 #ifdef RIO_DEBUG2
03835 else
03836 {
03837 m_log << "[3]recebido fragmento " << num << " do bloco de reqid "
03838 << p->reqid << ", traffic = " << traffic << endl;
03839 }
03840 #endif
03841
03842 NetBuf *rp = iHashFind( p->reqid );
03843 if( !rp )
03844 {
03845
03846
03847
03848 #ifdef RIO_DEBUG2
03849 if( traffic == MULTICASTTRAFFIC && callback_transport )
03850 {
03851 m_log << "Nao encontrado na tabela Hash o bloco multicast "
03852 << current->GetPreqid() << " para este fragmento de "
03853 << "numero " << num << endl;
03854 }
03855 else
03856 {
03857 m_log << "Nao encontrado na tabela Hash o bloco unicast "
03858 << p->reqid << " para este fragmento de numero " << num
03859 << endl;
03860 }
03861 #endif
03862
03863
03864
03865 #ifdef RIO_DEBUG2
03866 m_log << "DELAYEDPACKAGE " << (int)time(NULL) << " 1456.4" << endl;
03867 #endif
03868
03869 HashMutexUnlock();
03870
03871 SendRst( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port, p->reqid,
03872 0xe11f );
03873
03874 #ifdef RIO_DEBUG1
03875 m_log << "[RioNeti - procpktfrag] Finish 2" << endl;
03876 #endif
03877
03878 return;
03879 }
03880
03881 #ifdef RIO_DEBUG2
03882 m_log << "[RioNeti - procpktfrag] NetBuf: "
03883 << "nb_reqid " << rp->nb_reqid
03884 << " nb_tiwhen " << rp->nb_tiwhen.tv_sec
03885 << " " << rp->nb_tiwhen.tv_usec
03886 << " rq_fragsh " << rp->rq_fragsh
03887 << " rq_fragburst " << rp->rq_fragburst
03888 << " rq_burstcnt " << rp->rq_burstcnt
03889 << " rq_fraglistn " << rp->rq_fraglistn
03890 << " nb_sendack " << rp->nb_sendack
03891 << " nb_hisreqid " << rp->nb_hisreqid
03892 << " nb_bufp type " << (int)((RioPkt::pktp *) rp->nb_bufp)->type
03893 << endl;
03894 #endif
03895
03896 if( (unsigned)pktl <= sizeof( RioPkt::pktfx ) )
03897 {
03898 err_pktsize++;
03899
03900 #ifdef RIO_DEBUG2
03901 if( traffic == MULTICASTTRAFFIC && callback_transport )
03902 {
03903 m_log << "RioNeti - Erro multicast - " << num << "/"
03904 << rp->rq_fraghav << " bloco: " << current->GetPreqid()
03905 << endl;
03906 }
03907 else
03908 {
03909 m_log << "RioNeti - Erro unicast - " << num << "/"
03910 << rp->rq_fraghav << " reqid: " << p->reqid << endl;
03911 }
03912 #endif
03913
03914 goto unlockret;
03915 }
03916
03917 if( m_debug & DEBUG_FRAG )
03918 m_log << " procpkt frag " << num << endl;
03919
03920 if( ( unsigned )num > sizeof( rp->rq_bits ) )
03921 {
03922
03923 err_pktfmt++;
03924
03925 #ifdef RIO_DEBUG2
03926 if( traffic == MULTICASTTRAFFIC && callback_transport )
03927 {
03928 m_log << "RioNeti1 - multicast - " << num << "/" << rp->rq_fraghav
03929 << " bloco: " << current->GetPreqid() << endl;
03930 }
03931 else
03932 {
03933 m_log << "RioNeti1 - unicast - " << num << "/" << rp->rq_fraghav <<
03934 " reqid: " << p->reqid << endl;
03935 }
03936 #endif
03937
03938 goto unlockret;
03939 }
03940
03941
03942
03943 #ifdef DEBUGFRAGACK //drop fragments
03944 if( ( num == 20 ) || ( num == 40 ) )
03945 {
03946
03947 m_log << "[PROCPKTFRAG ] Descartando fragmento " << num
03948 << " do bloco de reqid " << p->reqid << ", traffic = " << traffic
03949 << endl;
03950
03951 goto unlockret;
03952 }
03953
03954
03955 #endif
03956
03957 --rp->rq_needack;
03958
03959 if( rp->rq_bits[ num ] )
03960 {
03961 err_dupfrag++;
03962 if( rp->rq_needack > 0 )
03963 rp->rq_needack = 0;
03964
03965 #ifdef RIO_DEBUG2
03966 if( traffic == MULTICASTTRAFFIC && callback_transport )
03967 {
03968 m_log << "ack1 multicast - " << num << "/" << rp->rq_fraghav
03969 << " bloco: " << current->GetPreqid() << endl;
03970 }
03971 else
03972 {
03973 m_log << "ack1 unicast - " << num << "/" << rp->rq_fraghav <<
03974 " reqid: " << p->reqid << endl;
03975 }
03976 #endif
03977
03978 goto checkack;
03979 }
03980
03981 char *dp;
03982 if( num == 0 )
03983 {
03984 #ifdef RIO_DEBUG2
03985 if( rp->nb_unicastRequested )
03986 m_log << "[procpktfrag] Recebendo fragmento zero do bloco unicast "
03987 << "de reqid " << rp->nb_reqid << endl;
03988 else
03989 m_log << "[procpktfrag] Recebendo fragmento zero do bloco "
03990 << "multicast de reqid " << rp->nb_reqid << endl;
03991 #endif
03992
03993 rp->rq_needack = 0;
03994 rp->nb_hisreqid = ( ( RioPkt::pktf0 * )pktp )->ackreqid;
03995 rp->nb_frag0arrive = time( NULL );
03996 rp->rq_fragmax = ntohs( ( ( RioPkt::pktf0 * ) pktp )->fragmax) ;
03997 rp->rq_maxpktl = ntohs( ( ( RioPkt::pktf0 * )pktp )->maxpktl );
03998 rp->rq_fraglen = rp->rq_maxpktl
03999 - (iphdrlen + udphdrlen + sizeof( RioPkt::pktfx ));
04000 rp->rq_fraglen0 = rp->rq_maxpktl
04001 - (iphdrlen + udphdrlen + sizeof( RioPkt::pktf0 ));
04002 rp->nb_toipaddr = m_fraddr.sin_addr.s_addr;
04003 rp->nb_toipport = m_fraddr.sin_port;
04004 dp = rp->nb_blockp;
04005 pktp += sizeof( RioPkt::pktf0 );
04006 pktl -= sizeof( RioPkt::pktf0 );
04007 if( m_debug & DEBUG_FRAG )
04008 {
04009 char print[256];
04010 sprintf( print, " ---frag 0: ipaddr %8.8x port %8.8x",
04011 (unsigned int)rp->nb_toipaddr,
04012 (unsigned int)rp->nb_toipport );
04013 m_log << print << endl;
04014 NetMgr::dumppkt( " -- m_fraddr --\n",
04015 (char *)&m_fraddr, sizeof( m_fraddr ) );
04016 }
04017 }
04018 else
04019 {
04020
04021 if( rp->rq_fraghav == 0 )
04022 {
04023
04024
04025 rp->rq_fragDiscarded++;
04026
04027 #ifdef RIO_DEBUG2
04028 if( rp->nb_unicastRequested )
04029 m_log << "[procpktfrag] Descartando fragmento " << num
04030 << "(bloco unicast de reqid " << p->reqid
04031 << ") pois o fragmento 0 ainda não chegou!" << endl;
04032 else
04033 m_log << "[procpktfrag] Descartando fragmento " << num
04034 << "(bloco multicast de reqid " << p->reqid
04035 << ") pois o fragmento 0 ainda não chegou!" << endl;
04036 #endif
04037
04038
04039
04040 #ifdef RIO_DEBUG2
04041 m_log << "DISCARTEDPACKAGE " << (int)time(NULL) << " "
04042 << sizeof( RioPkt::pktfx ) << endl;
04043 #endif
04044
04045 goto unlockret;
04046 }
04047 dp = rp->nb_blockp + rp->rq_fraglen0 + (num - 1) * rp->rq_fraglen;
04048 pktp += sizeof( RioPkt::pktfx );
04049 pktl -= sizeof( RioPkt::pktfx );
04050 }
04051
04052 rp->rq_bits[ num ] = 1;
04053 if( rp->rq_fraghigh < num )
04054 rp->rq_fraghigh = num;
04055
04056 if( dp + pktl > rp->nb_blockp + rp->nb_blockl )
04057 {
04058
04059 err_overfrag++;
04060
04061 #ifdef RIO_DEBUG2
04062 if( traffic == MULTICASTTRAFFIC && callback_transport )
04063 {
04064 m_log << "RioNeti - ack2 multicast - " << num << "/"
04065 << rp->rq_fraghav << " bloco: " << current->GetPreqid()
04066 << endl;
04067 }
04068 else
04069 {
04070 m_log << "RioNeti - ack2 unicast - " << num << "/"
04071 << rp->rq_fraghav << " reqid: " << p->reqid << endl;
04072 }
04073 #endif
04074
04075 goto checkack;
04076 }
04077
04078 memcpy( dp, pktp, pktl );
04079 rp->rq_fraghav++;
04080
04081 if( m_debug & DEBUG_FRAG )
04082 {
04083 m_log << "procpkt: hav " << rp->rq_fraghav << " max "
04084 << rp->rq_fragmax << endl;
04085 }
04086
04087 if( rp->rq_fraghav >= rp->rq_fragmax )
04088 {
04089
04090
04091
04092
04093
04094 #ifdef RIO_DEBUG_EMUL
04095 RioRequest *Request = ( RioRequest * ) rp->nb_userparm;
04096 m_log << "[RioNeti] Whole Block (reqid " << rp->nb_reqid
04097 << ") " << Request->Block << " arrived and assembled" << endl;
04098 #endif
04099
04100 rp->rq_needack = -1000;
04101 if( !rp->nb_sendack )
04102 {
04103 if( traffic == MULTICASTTRAFFIC && callback_transport )
04104 {
04105
04106
04107 reqid_table->Remove( current->GetPreqid() );
04108 }
04109
04110 cnt_rcvfragack++;
04111 qComplete( rp, 0 );
04112 }
04113 }
04114
04115 checkack:
04116
04117 if( rp->rq_needack <= 0 && !rp->nb_outrtn && rp->nb_sendack )
04118 iOutAddTop( rp, &iSendFragAck );
04119
04120 unlockret:
04121 HashMutexUnlock();
04122
04123 #ifdef RIO_DEBUG1
04124 m_log << "[RioNeti - procpktfrag] Finish 3" << endl;
04125 #endif
04126
04127 return;
04128 }
04129
04130
04131
04132
04133
04134
04135 void RioNeti::procpktfragack( char *pktp, int pktl )
04136 {
04137 #ifdef RIO_DEBUG1
04138 m_log << "[RioNeti - procpktfragack] Start" << endl;
04139 #endif
04140
04141 int seq;
04142 int numhave;
04143 int numhigh;
04144 int numrec;
04145 u16 *dl;
04146 u16 *dx;
04147 int listn;
04148 int list0;
04149
04150 RioPkt::pktfa *p = (RioPkt::pktfa *) pktp;
04151 HashMutexLock( "prcpkfrgack1" );
04152 NetBuf *rp = iHashFind( p->reqid );
04153 if( !rp )
04154 {
04155 HashMutexUnlock( "prcpkfrgack2" );
04156 SendRst( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port, p->reqid, 0xe21f);
04157
04158 #ifdef RIO_DEBUG1
04159 m_log << "[RioNeti - procpktfragack] Finish 1" << endl;
04160 #endif
04161
04162 return;
04163 }
04164
04165 #ifdef RIO_DEBUG2
04166 m_log << "[RioNeti - procpktack] NetBuf: "
04167 << "nb_reqid " << rp->nb_reqid
04168 << " nb_tiwhen " << rp->nb_tiwhen.tv_sec
04169 << " " << rp->nb_tiwhen.tv_usec
04170 << " rq_fragsh " << rp->rq_fragsh
04171 << " rq_fragburst " << rp->rq_fragburst
04172 << " rq_burstcnt " << rp->rq_burstcnt
04173 << " rq_fraglistn " << rp->rq_fraglistn
04174 << " nb_sendack " << rp->nb_sendack
04175 << " nb_hisreqid " << rp->nb_hisreqid
04176 << " nb_bufp type " << (int)((RioPkt::pktp *) rp->nb_bufp)->type
04177 << endl;
04178 #endif
04179
04180 if( (unsigned)pktl < sizeof( RioPkt::pktfa ) )
04181 {
04182 err_pktsize++;
04183 goto unlockret;
04184 }
04185
04186
04187 seq = ntohs( p->fragackseq );
04188 numhave = ntohs( p->fragnum );
04189 numhigh = ntohs( p->fraghigh );
04190
04191 #ifdef RIO_DEBUG2
04192 m_log << " procpktfragack nb_reqid " << rp->nb_reqid
04193 << " seq " << seq
04194 << " qtde " << numhave
04195 << " high " << numhigh << endl;
04196 #endif
04197
04198 if( m_debug & DEBUG_FRAG )
04199 {
04200 m_log << " procpktfragack ackseq " << seq << " ackhave " << numhave
04201 << " ackhigh " << numhigh << endl;
04202 m_log << "\trq_fragackseq " << rp->rq_fragackseq << " rq_fraghav "
04203 << rp->rq_fraghav << " rq_fragburst " << rp->rq_fragburst << endl;
04204 }
04205
04206 if( seq <= rp->rq_fragackseq )
04207 goto unlockret;
04208
04209 rp->rq_fragackseq = seq;
04210 cnt_rcvfragack++;
04211
04212 if( rp->rq_fragmax <= numhave )
04213 {
04214 if( m_debug & DEBUG_FRAG )
04215 {
04216 m_log << " fragack - complete! max " << rp->rq_fragmax << " have "
04217 << numhave << endl;
04218 }
04219 qComplete( rp, 0 );
04220 goto unlockret;
04221 }
04222
04223 numrec = numhave - rp->rq_fraghav;
04224
04225 rp->rq_burstcnt += numrec + 1;
04226 if( rp->rq_burstcnt > NetBuf::RQ_BURSTMAX )
04227 rp->rq_burstcnt = NetBuf::RQ_BURSTMAX;
04228
04229
04230 rp->rq_fraghav = numhave;
04231
04232 listn = rp->rq_fraglistn;
04233 list0 = rp->rq_fraglist[ 0 ];
04234
04235
04236 dl = (u16 *) (pktp + sizeof(RioPkt::pktfa));
04237 dx = (u16 *) (pktp + pktl);
04238 for( rp->rq_fraglistn = 0; dl < dx; dl++ )
04239 {
04240 u16 xfrag = ntohs( *dl );
04241 if( rp->rq_fragackseq - rp->rq_bits[ xfrag ] > 2 )
04242 {
04243 if( rp->rq_fraglistn >= NetBuf::RQ_MAXACKLIST )
04244 break;
04245 rp->rq_fraglist[ rp->rq_fraglistn++ ] = xfrag;
04246 }
04247 }
04248
04249
04250 if( rp->rq_fraglistn != listn ||
04251 (listn > 0 && rp->rq_fraglist[0] != list0)
04252 )
04253 {
04254 cnt_rcvfragacknc++;
04255 rp->rq_burstcnt >>= 1;
04256 if( rp->rq_burstcnt < NetBuf::RQ_BURSTMIN )
04257 rp->rq_burstcnt = NetBuf::RQ_BURSTMIN;
04258 }
04259
04260
04261 if( !rp->nb_outrtn )
04262 {
04263 if( !iSendFragNext( rp ) )
04264 iSendFrag( rp );
04265 }
04266
04267 unlockret:
04268 HashMutexUnlock( "prcpkfrgack3" );
04269
04270 #ifdef RIO_DEBUG1
04271 m_log << "[RioNeti - procpktfragack] Finish 2" << endl;
04272 #endif
04273
04274 return;
04275 }
04276
04277
04278
04279
04280
04281 void RioNeti::procpkt( NetBuf **rcvbuf, RioStreamType traffic )
04282 {
04283 #ifdef RIO_DEBUG1
04284 m_log << "[RioNeti - procpkt] Start" << endl;
04285 #endif
04286
04287 char *pktp;
04288 int pktl;
04289 RioPkt::pktp *p;
04290 RioPkt::pktpmap *pmap;
04291 NetBuf *rp;
04292 call_type type;
04293 int reqid;
04294 int hisreqid;
04295 int serverid;
04296
04297 #ifdef RIO_DEBUG2
04298
04299 struct sockaddr_in mymapip;
04300 #endif
04301
04302 NetBuf *nbp = *rcvbuf;
04303 pktp = nbp->nb_bufp;
04304 pktl = nbp->nb_recvl;
04305
04306 cnt_rcvpkt++;
04307
04308 if( m_debug & DEBUG_DATA )
04309 NetMgr::dumppkt( " -- procpkt --\n", pktp, pktl );
04310
04311
04312 type = (call_type) ( ((RioPkt::pktp *)pktp)->type );
04313 reqid = ( ( RioPkt::pktp * ) pktp )->reqid;
04314
04315 #ifdef RIO_DEBUG2
04316 m_log << "[RioNeti - procpkt] Received packet: type " << type
04317 << ", reqid = " << reqid << ", ip = "
04318 << inet_ntoa( m_fraddr.sin_addr ) << ", porta = "
04319 << ntohs( m_fraddr.sin_port ) << endl;
04320 #endif
04321
04322 switch( type )
04323 {
04324 case TYPE_FRAG:
04325 procpktfrag( pktp, pktl, traffic );
04326 break;
04327
04328 case TYPE_FRAGACK:
04329 procpktfragack( pktp, pktl );
04330 break;
04331
04332 case TYPE_RST:
04333 procpktrst( pktp, pktl );
04334 break;
04335
04336 case TYPE_ACK:
04337 reqid = ((RioPkt::pktp *)pktp)->reqid;
04338 HashMutexLock( "prcpkt1" );
04339 rp = iHashFind( reqid );
04340 if( !rp )
04341 {
04342 #ifdef RIO_DEBUG2
04343 m_log << "RioNeti: did not find reqid "<< reqid << endl;
04344 #endif
04345
04346 HashMutexUnlock( "prcpkt2" );
04347
04348 #ifdef RIO_DEBUG1
04349 m_log << "[RioNeti - procpkt] Finish 1" << endl;
04350 #endif
04351
04352 return;
04353 }
04354
04355 #ifdef RIO_DEBUG2
04356 m_log << "[RioNeti - procpkt TYPE_ACK] NetBuf: "
04357 << "nb_reqid " << rp->nb_reqid
04358 << " nb_tiwhen " << rp->nb_tiwhen.tv_sec
04359 << " " << rp->nb_tiwhen.tv_usec
04360 << " rq_fragsh " << rp->rq_fragsh
04361 << " rq_fragburst " << rp->rq_fragburst
04362 << " rq_burstcnt " << rp->rq_burstcnt
04363 << " rq_fraglistn " << rp->rq_fraglistn
04364 << " nb_sendack " << rp->nb_sendack
04365 << " nb_hisreqid " << rp->nb_hisreqid
04366 << " nb_bufp type " << (int)((RioPkt::pktp *) rp->nb_bufp)->type
04367 << endl;
04368 #endif
04369
04370 p = (RioPkt::pktp *)pktp;
04371 if( !p->auth.checkauth(pktp, pktl, rp) )
04372 {
04373 HashMutexUnlock( "prcpkt3" );
04374
04375 #ifdef RIO_DEBUG1
04376 m_log << "[RioNeti - procpkt] Finish 2" << endl;
04377 #endif
04378
04379 return;
04380 }
04381 pktp += sizeof( RioPkt::pktp );
04382 pktl -= sizeof( RioPkt::pktp );
04383
04384
04385 (rp->nb_callback)( rp, type, pktp, pktl );
04386 HashMutexUnlock( "prcpkt4" );
04387
04388 #ifdef RIO_DEBUG1
04389 m_log << "[RioNeti - procpkt] Finish 3" << endl;
04390 #endif
04391
04392 return;
04393
04394 case TYPE_RESULT:
04395 p = (RioPkt::pktp *)pktp;
04396 reqid = p->reqid;
04397 hisreqid = *((u32 *)( pktp + sizeof( RioPkt::pktp ) ));
04398 HashMutexLock( "prcpkt5" );
04399 rp = iHashFind( reqid );
04400 if( !rp )
04401 {
04402 HashMutexUnlock( "prcpkt6" );
04403
04404 #ifdef RIO_DEBUG2
04405 m_log << "RioNeti: did not find reqid "<< reqid << endl;
04406 #endif
04407
04408 SendRst( m_fraddr.sin_addr.s_addr,
04409 m_fraddr.sin_port,
04410 p->reqid, 0xe11e );
04411
04412 #ifdef RIO_DEBUG1
04413 m_log << "[RioNeti - procpkt] Finish 4" << endl;
04414 #endif
04415
04416 return;
04417 }
04418
04419 #ifdef RIO_DEBUG2
04420 m_log << "[RioNeti - procpkt TYPE_RESULT] NetBuf: "
04421 << "nb_reqid " << rp->nb_reqid
04422 << " nb_tiwhen " << rp->nb_tiwhen.tv_sec
04423 << " " << rp->nb_tiwhen.tv_usec
04424 << " rq_fragsh " << rp->rq_fragsh
04425 << " rq_fragburst " << rp->rq_fragburst
04426 << " rq_burstcnt " << rp->rq_burstcnt
04427 << " rq_fraglistn " << rp->rq_fraglistn
04428 << " nb_sendack " << rp->nb_sendack
04429 << " nb_hisreqid " << rp->nb_hisreqid
04430 << " nb_bufp type " << (int)((RioPkt::pktp *) rp->nb_bufp)->type
04431 << endl;
04432 #endif
04433
04434 p = (RioPkt::pktp *)pktp;
04435 if( !p->auth.checkauth( pktp, pktl, rp ) )
04436 {
04437 HashMutexUnlock( "prcpkt7" );
04438
04439 #ifdef RIO_DEBUG1
04440 m_log << "[RioNeti - procpkt] Finish 5" << endl;
04441 #endif
04442
04443 return;
04444 }
04445
04446 iSendAck( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port, hisreqid );
04447
04448 pktp += sizeof( RioPkt::pktp ) + sizeof( u32 );
04449 pktl -= sizeof( RioPkt::pktp ) + sizeof( u32 );
04450
04451
04452 (rp->nb_callback)( rp, type, pktp, pktl );
04453 HashMutexUnlock( "prcpkt8" );
04454
04455 #ifdef RIO_DEBUG1
04456 m_log << "[RioNeti - procpkt] Finish 6" << endl;
04457 #endif
04458
04459 return;
04460
04461 case TYPE_CMD:
04462 p = (RioPkt::pktp *)pktp;
04463 reqid = p->reqid;
04464
04465
04466 HashMutexLock( "prcpkt9" );
04467 if( reqid == 0 )
04468 {
04469 nbp->nb_usercmd = m_cmdproc;
04470 nbp->nb_userparm = m_cmdparm;
04471 rp = 0;
04472 }
04473 else
04474 {
04475 rp = iHashFind( reqid );
04476 if( rp )
04477 {
04478
04479 nbp->nb_usercmd = rp->nb_usercmd;
04480 nbp->nb_userparm = rp->nb_userparm;
04481 }
04482 else
04483 {
04484
04485
04486
04487 HashMutexUnlock( "prcpkt10" );
04488
04489 #ifdef RIO_DEBUG2
04490 m_log << "RioNeti: did not find reqid "<< reqid << endl;
04491 #endif
04492
04493 SendRst( m_fraddr.sin_addr.s_addr,
04494 m_fraddr.sin_port,
04495 p->reqid, 0xe11e );
04496
04497 #ifdef RIO_DEBUG1
04498 m_log << "[RioNeti - procpkt] Finish 7" << endl;
04499 #endif
04500
04501 return;
04502 }
04503 }
04504 if( !p->auth.checkauth( pktp, pktl, rp ) )
04505 {
04506 HashMutexUnlock( "prcpkt11" );
04507
04508 #ifdef RIO_DEBUG1
04509 m_log << "[RioNeti - procpkt] Finish 8" << endl;
04510 #endif
04511
04512 return;
04513 }
04514
04515 hisreqid = *((u32 *)( pktp + sizeof( RioPkt::pktp ) ));
04516 pktp += sizeof( RioPkt::pktp ) + sizeof( u32 );
04517 pktl -= sizeof( RioPkt::pktp ) + sizeof( u32 );
04518
04519 iSendAck( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port, hisreqid );
04520
04521 if( nbp->nb_usercmd == 0 )
04522 {
04523 HashMutexUnlock( "prcpkt12" );
04524 SendRst( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port,
04525 p->reqid, 0xe00e );
04526
04527 #ifdef RIO_DEBUG1
04528 m_log << "[RioNeti - procpkt] Finish 9" << endl;
04529 #endif
04530
04531 return;
04532 }
04533 HashMutexUnlock( "prcpkt13" );
04534 (nbp->nb_usercmd)( nbp->nb_userparm, pktp, pktl );
04535
04536 #ifdef RIO_DEBUG1
04537 m_log << "[RioNeti - procpkt] Finish 10" << endl;
04538 #endif
04539
04540 return;
04541
04542
04543
04544
04545
04546 case TYPE_SENDMAP:
04547 p = (RioPkt::pktp *)pktp;
04548 hisreqid = *((u32 *)( pktp + sizeof( RioPkt::pktp ) ));
04549
04550 HashMutexLock( "prcpkt14" );
04551
04552
04553
04554
04555
04556
04557
04558
04559
04560
04561
04562
04563
04564
04565
04566
04567 #ifdef RIO_DEBUG2
04568 m_log << "RioNeti::procpkt SENDMAP enviando ACK IP = "
04569 << inet_ntoa( m_fraddr.sin_addr )
04570 << ", porta = " << ntohs( m_fraddr.sin_port )
04571 << ", reqid = " << p->reqid << endl;
04572 #endif
04573
04574
04575
04576 iSendMapAck( m_fraddr.sin_addr.s_addr, m_fraddr.sin_port,
04577 hisreqid );
04578
04579 HashMutexUnlock( "prcpkt16" );
04580
04581 #ifdef RIO_DEBUG1
04582 m_log << "[RioNeti - procpkt] Finish 12" << endl;
04583 #endif
04584 return;
04585
04586
04587
04588
04589
04590
04591
04592
04593
04594
04595
04596
04597 case TYPE_MAPACK:
04598 reqid = ((RioPkt::pktpmap *)pktp)->reqid;
04599
04600
04601
04602 serverid = -1;
04603 for (unsigned int i = 0; i < m_MappingInfoSize; i++ )
04604 {
04605 if( m_MappingInfo[ i ].reqid == reqid )
04606 {
04607 serverid = i;
04608 break;
04609 }
04610 }
04611
04612 if( serverid == -1 )
04613 {
04614 #ifdef RIO_DEBUG2
04615 m_log << "RioNeti: did not find reqid in m_MappingInfo" << reqid
04616 << endl;
04617 #endif
04618
04619 #ifdef RIO_DEBUG1
04620 m_log << "[RioNeti - procpkt] Finish 13" << endl;
04621 #endif
04622
04623 return;
04624 }
04625
04626 HashMutexLock( "prcpkt17" );
04627 rp = iHashFind( reqid );
04628
04629 if( !rp )
04630 {
04631 #ifdef RIO_DEBUG2
04632 m_log << "RioNeti: did not find reqid "<< reqid << endl;
04633 #endif
04634
04635 HashMutexUnlock( "prcpkt18" );
04636
04637 #ifdef RIO_DEBUG1
04638 m_log << "[RioNeti - procpkt] Finish 13" << endl;
04639 #endif
04640
04641 return;
04642 }
04643
04644 #ifdef RIO_DEBUG2
04645 m_log << "[RioNeti - procpkt TYPE_MAPACK] NetBuf: "
04646 << "nb_reqid " << rp->nb_reqid
04647 << " nb_tiwhen " << rp->nb_tiwhen.tv_sec
04648 << " " << rp->nb_tiwhen.tv_usec
04649 << " rq_fragsh " << rp->rq_fragsh
04650 << " rq_fragburst " << rp->rq_fragburst
04651 << " rq_burstcnt " << rp->rq_burstcnt
04652 << " rq_fraglistn " << rp->rq_fraglistn
04653 << " nb_sendack " << rp->nb_sendack
04654 << " nb_hisreqid " << rp->nb_hisreqid
04655 << " nb_bufp type " << (int)((RioPkt::pktp *) rp->nb_bufp)->type
04656 << endl;
04657 #endif
04658
04659 pmap = (RioPkt::pktpmap *)pktp;
04660
04661 #ifdef RIO_DEBUG2
04662 struct in_addr ip;
04663 ip.s_addr = pmap->IP;
04664 m_log << "[RioNeti - procpkt] Received mapping #" << serverid
04665 << endl;
04666 m_log << "[RioNeti - procpkt] " << inet_ntoa( ip )
04667 << " , " << htons( pmap->port ) << endl;
04668 #endif
04669
04670
04671
04672
04673 if( pmap->IP != (unsigned int)m_ipaddrmap[ serverid ] )
04674 {
04675 m_ipaddrmap[serverid] = pmap->IP;
04676 m_ipportmap[serverid] = pmap->port;
04677
04678
04679
04680 m_behindnat = ( m_ipaddrmap[serverid] !=
04681 m_ipaddr );
04682 }
04683
04684
04685 m_MappingInfo[ serverid ].IsEnabled = true;
04686
04687 if( !pmap->auth.checkauth(pktp, pktl, rp) )
04688 {
04689 HashMutexUnlock( "prcpkt19" );
04690
04691 #ifdef RIO_DEBUG1
04692 m_log << "[RioNeti - procpkt] Finish 14" << endl;
04693 #endif
04694
04695 return;
04696 }
04697
04698 pktp += sizeof( RioPkt::pktpmap );
04699 pktl -= sizeof( RioPkt::pktpmap );
04700
04701 #ifdef RIO_DEBUG2
04702 mymapip.sin_addr.s_addr = rp->nb_toipaddr;
04703 m_log << "RioNeti::procpkt recebendo MAPACK IP = "
04704 << inet_ntoa( mymapip.sin_addr )
04705 << ", porta = " << ntohs( rp->nb_toipport )
04706 << ", reqid = " << reqid << endl;
04707 #endif
04708
04709
04710 (rp->nb_callback)( rp, type, pktp, pktl );
04711
04712 HashMutexUnlock( "prcpkt20" );
04713
04714 #ifdef RIO_DEBUG1
04715 m_log << "[RioNeti - procpkt] Finish 15" << endl;
04716 #endif
04717 return;
04718
04719 default:
04720 err_pktcmd++;
04721
04722 #ifdef RIO_DEBUG2
04723 m_log << "[RioNeti - procpkt default] type "
04724 << (int)((RioPkt::pktp *) pktp)->type
04725 << endl;
04726 #endif
04727
04728 #ifdef RIO_DEBUG1
04729 m_log << "[RioNeti - procpkt] Finish 16" << endl;
04730 #endif
04731
04732 return;
04733
04734 }
04735
04736 #ifdef RIO_DEBUG1
04737 m_log << "[RioNeti - procpkt] Finish 17" << endl;
04738 #endif
04739 }
04740
04741 void RioNeti::SendCmd( int ipadr, int port, int reqid, char *cmdp, int cmdl,
04742 callback_t callback, void *callbackparm )
04743 {
04744 #ifdef RIO_DEBUG1
04745 m_log << "[RioNeti - SendCmd] Start" << endl;
04746 #endif
04747
04748 char *endp;
04749
04750 HashMutexLock( "sndcmd" );
04751
04752 NetBuf *nbp = pSend( ipadr, port, reqid, RioNeti::TYPE_CMD);
04753 nbp->nb_usercall = callback;
04754 nbp->nb_userparm = callbackparm;
04755
04756 endp = nbp->nb_datap + cmdl;
04757
04758
04759 if( endp > nbp->nb_datalm )
04760 abort();
04761
04762 memcpy( nbp->nb_datap, cmdp, cmdl );
04763 nbp->nb_datap += cmdl;
04764
04765 nbp->nb_callback = &sendcmdcb;
04766 nbp->rq_retry = 5;
04767 nbp->rq_retrytim = 1 << (TimeShift - 3);
04768
04769 iTimeNew( nbp, nbp->rq_retrytim );
04770 qSend( nbp );
04771 HashMutexUnlock( "sndcmd" );
04772
04773 #ifdef RIO_DEBUG1
04774 m_log << "[RioNeti - SendCmd] Finish" << endl;
04775 #endif
04776 }
04777
04778
04779
04780
04781
04782
04783
04784 void RioNeti::sendcmdcb( NetBuf *nbp, int type, char *cmdp, int cmdl )
04785 {
04786 #ifdef RIO_DEBUG1
04787 m_log << "[RioNeti - sendcmdcb] Start" << endl;
04788 #endif
04789
04790 RioNeti *netiPtr = nbp->nb_rioneti;
04791
04792 if( netiPtr->m_debug & DEBUG_CMDS )
04793 m_log << " sendcmdcb " << type << " reqid " << nbp->nb_reqid << endl;
04794
04795 switch( (call_type)type )
04796 {
04797 case TYPE_ACK:
04798 case TYPE_MAPACK:
04799
04800
04801 netiPtr->qComplete( nbp, 0 );
04802 break;
04803
04804 case TYPE_RESULT:
04805
04806 #ifdef RIO_DEBUG2
04807 m_log << "sendcmdcb1 nbp->nb_result " << nbp->nb_result << endl;
04808 #endif
04809
04810 netiPtr->qComplete( nbp, ntohl( *(u32 *)cmdp ) );
04811 break;
04812
04813 case TYPE_TIMEOUT:
04814 if( !netiPtr->iRetry( nbp ) )
04815 {
04816 #ifdef RIO_DEBUG1
04817 m_log << "[RioNeti - sendcmdcb] Finish 1" << endl;
04818 #endif
04819
04820 return;
04821 }
04822
04823 nbp->rq_retrytim <<= 1;
04824 netiPtr->iTimeNew( nbp, nbp->rq_retrytim );
04825 netiPtr->qSend( nbp );
04826 break;
04827
04828 default:
04829
04830 #ifdef RIO_DEBUG2
04831 m_log << "sendcmdcb2 nbp->nb_result " << nbp->nb_result << endl;
04832 #endif
04833
04834 netiPtr->qComplete( nbp, ERROR_RIONETI + 3 );
04835 break;
04836 }
04837
04838 #ifdef RIO_DEBUG1
04839 m_log << "[RioNeti - sendcmdcb] Finish 2" << endl;
04840 #endif
04841 }
04842
04843
04844
04845
04846
04847
04848
04849
04850
04851
04852
04853 void RioNeti::SendResult( int ipadr, int port, int reqid, int result,
04854 callback_t callback, void *callbackparm )
04855 {
04856 #ifdef RIO_DEBUG1
04857 m_log << "[RioNeti - SendResult] Start" << endl;
04858 #endif
04859
04860 HashMutexLock( "sndresul" );
04861
04862 NetBuf *nbp = pSend( ipadr, port, reqid, RioNeti::TYPE_RESULT );
04863 nbp->nb_usercall = callback;
04864 nbp->nb_userparm = callbackparm;
04865
04866 *((u32 *)nbp->nb_datap) = htonl( result );
04867 nbp->nb_datap += sizeof( u32 );
04868
04869 nbp->nb_callback = &sendcmdcb;
04870
04871 nbp->rq_retry = 5;
04872 nbp->rq_retrytim = 1 << (TimeShift - 3);
04873 iTimeNew( nbp, nbp->rq_retrytim );
04874
04875 #ifdef RIO_DEBUG2
04876 struct in_addr clientip;
04877 clientip.s_addr = ipadr;
04878 m_log << " RioNeti : SendResult ip addr " << inet_ntoa( clientip )
04879 << " port " << ntohs( port ) << " reqid " << reqid << endl;
04880 #endif
04881
04882 qSend( nbp );
04883 HashMutexUnlock( "sndresul" );
04884
04885 #ifdef RIO_DEBUG1
04886 m_log << "[RioNeti - SendResult] Finish" << endl;
04887 #endif
04888 }
04889
04890 int RioNeti::ExpectData( char *bufadr, int buflen, callback_t callback,
04891 void *callbackparm, RioStreamType traffic,
04892 int sendack )
04893 {
04894 #ifdef RIO_DEBUG1
04895 m_log << "[RioNeti - ExpectData] Start" << endl;
04896 #endif
04897
04898 NetBuf *nbp;
04899 int reqid;
04900
04901 nbp = mbget();
04902
04903
04904 nbp->nb_blockp = bufadr;
04905 nbp->nb_blockl = buflen;
04906
04907 nbp->nb_usercall = callback;
04908 nbp->nb_userparm = callbackparm;
04909 nbp->nb_callback = &expectblockcb;
04910
04911 if( traffic == UNICASTTRAFFIC )
04912 nbp->nb_unicastRequested = true;
04913 else
04914 nbp->nb_unicastRequested = false;
04915
04916
04917
04918
04919
04920
04921
04922
04923
04924
04925 nbp->nb_frag0arrive = time( NULL ) + 10;
04926
04927
04928 if( sendack != 0 )
04929 nbp->nb_sendack = 1;
04930 else
04931 nbp->nb_sendack = 0;
04932
04933 nbp->rq_needack = NetBuf::RQ_ACKEVERY;
04934 nbp->rq_fragmax = 0;
04935 nbp->rq_fraghav = 0;
04936 nbp->rq_fraghigh = 0;
04937 nbp->rq_fraglow = 0;
04938 nbp->rq_fragackseq = 0;
04939 nbp->rq_fraglist[0] = 0;
04940 memset( nbp->rq_bits, 0, sizeof( nbp->rq_bits ) );
04941
04942 HashMutexLock( "expectdata" );
04943
04944
04945
04946 iHashNew( nbp );
04947
04948 #ifdef RIO_DEBUG2
04949 if( nbp->nb_unicastRequested )
04950 m_log << " [RioNeti] expectdata - Inserido bloco unicast de reqid "
04951 << nbp->nb_reqid << " na tabela Hash." << endl;
04952 else
04953 m_log << " [RioNeti] expectdata - Inserido bloco multicast de"
04954 << " reqid " << nbp->nb_reqid << " na tabela Hash." << endl;
04955 #endif
04956
04957 reqid = nbp->nb_reqid;
04958
04959 HashMutexUnlock( "expectdata" );
04960
04961 #ifdef RIO_DEBUG1
04962 m_log << "[RioNeti - ExpectData] Finish" << endl;
04963 #endif
04964
04965 return reqid;
04966 }
04967
04968
04969
04970
04971
04972
04973 void RioNeti::expectblockcb( NetBuf *nbp, int type, char *cmdp, int cmdl )
04974 {
04975 #ifdef RIO_DEBUG1
04976 m_log << "[RioNeti - expectblockcb] Start" << endl;
04977 #endif
04978
04979 RioNeti *netiPtr = nbp->nb_rioneti;
04980 switch( (call_type) type )
04981 {
04982 case TYPE_RESULT:
04983
04984 #ifdef RIO_DEBUG2
04985 m_log << "expectblockcb1 nbp->nb_result " << nbp->nb_result
04986 << endl;
04987 #endif
04988
04989 netiPtr->qComplete( nbp, ntohl( *(u32 *)cmdp ) );
04990 break;
04991
04992 default:
04993
04994 #ifdef RIO_DEBUG2
04995 m_log << "expectblockcb2 nbp->nb_result " << nbp->nb_result
04996 << endl;
04997 #endif
04998
04999
05000 netiPtr->qComplete( nbp, ERROR_RIONETI + 1 );
05001 break;
05002 }
05003
05004 #ifdef RIO_DEBUG1
05005 m_log << "[RioNeti - expectblockcb] Finish" << endl;
05006 #endif
05007 }
05008
05009
05010
05011
05012
05013
05014
05015
05016
05017
05018
05019
05020
05021
05022 int RioNeti::ExpectCmd( callback_t callback, void *callbackparm,
05023 cmdcallback_t cmdcallback )
05024 {
05025 #ifdef RIO_DEBUG1
05026 m_log << "[RioNeti - Constructor] Start" << endl;
05027 #endif
05028
05029 NetBuf *nbp;
05030 int reqid;
05031
05032 nbp = mbget();
05033 nbp->nb_usercall = callback;
05034 nbp->nb_userparm = callbackparm;
05035 nbp->nb_usercmd = cmdcallback;
05036 nbp->nb_callback = &expectblockcb;
05037
05038 HashMutexLock( "expctcmd" );
05039
05040 iHashNew( nbp );
05041
05042 reqid = nbp->nb_reqid;
05043
05044 HashMutexUnlock( "expctcmd" );
05045
05046 #ifdef RIO_DEBUG1
05047 m_log << "[RioNeti - Constructor] Finish" << endl;
05048 #endif
05049
05050 return reqid;
05051 }
05052
05053
05054
05055
05056
05057
05058
05059
05060
05061
05062
05063
05064
05065
05066
05067 int RioNeti::CancelExpect( int reqid, int result )
05068 {
05069 #ifdef RIO_DEBUG1
05070 m_log << "[RioNeti - CancelExpect] Start" << endl;
05071 #endif
05072
05073 NetBuf *nbp;
05074
05075 HashMutexLock( "cancelexpct" );
05076
05077 nbp = iHashFind( reqid );
05078
05079
05080 if( nbp == 0 )
05081 {
05082 #ifdef RIO_DEBUG2
05083 m_log << "[RioNeti] CancelExpect - Tentativa de remoção do bloco de reqid "
05084 << reqid << " da tabela Hash falhou: bloco não encontrado." << endl;
05085 #endif
05086
05087 goto unlockret;
05088 }
05089
05090 iHashRmv(nbp);
05091 iOutRmv(nbp);
05092 iTimeRmv(nbp);
05093 mbfree( nbp );
05094
05095 unlockret:
05096
05097 HashMutexUnlock( "cancelexpct" );
05098
05099 #ifdef RIO_DEBUG1
05100 m_log << "[RioNeti - CancelExpect] Finish" << endl;
05101 #endif
05102
05103 return 1;
05104
05105
05106
05107
05108 }
05109
05110
05111
05112
05113
05114
05115
05116
05117
05118
05119
05120
05121
05122 void RioNeti::SendData( int ip, int port, int reqid, char *bufadr, int buflen,
05123 callback_t callback, void *callbackparm, int StreamTraffic,
05124 unsigned int VideoRate, unsigned int *sendreqid )
05125 {
05126 #ifdef RIO_DEBUG1
05127 m_log << "[RioNeti - SendData] Start" << endl;
05128 #endif
05129
05130 NetBuf *nbp;
05131
05132 #ifdef RIO_DEBUG2
05133 m_log << "RioNeti - SendData:"
05134 << " port " << ntohs( port )
05135 << " reqid " << reqid
05136 << " buflen " << buflen
05137 << " StreamTraffic " << StreamTraffic << endl;
05138 #endif
05139
05140 nbp = mbget();
05141 nbp->nb_blockp = bufadr;
05142 nbp->nb_blockl = buflen;
05143 nbp->nb_toipaddr = ip;
05144 nbp->nb_toipport = port;
05145 nbp->nb_usercall = callback;
05146 nbp->nb_userparm = callbackparm;
05147 nbp->rq_fragackseq = 0;
05148 nbp->rq_fraghav = 0;
05149 memset( nbp->rq_bits, 0, sizeof( nbp->rq_bits ) );
05150
05151 nbp->rq_fraglistn = 0;
05152
05153
05154
05155
05156
05157 if( ( m_StreamControl != NULL ) && ( VideoRate != 0ull ) )
05158 {
05159
05160
05161
05162
05163 unsigned long long int VideoRateFS;
05164
05165
05166 VideoRateFS = (( unsigned long long int ) VideoRate ) * 128ull;
05167
05168 VideoRateFS = VideoRateFS / ( ( unsigned long long int ) FRAGMENTSIZE );
05169
05170
05171 nbp->nb_timebetweencredits = 1000000ull / VideoRateFS;
05172 }
05173
05174
05175
05176 HashMutexLock( "snddata1" );
05177
05178 iHashNew( nbp );
05179
05180
05181
05182
05183 if( sendreqid != NULL )
05184 *sendreqid = nbp->nb_reqid;
05185
05186 nbp->nb_hisreqid = reqid;
05187 nbp->nb_callback = &sendblockcb;
05188 nbp->rq_fragburst = NetBuf::RQ_BURSTSTART;
05189 nbp->rq_fragsh = 0;
05190
05191 nbp->rq_maxpktl = m_maxpktl;
05192
05193 nbp->rq_fraglen = nbp->rq_maxpktl
05194 - (iphdrlen + udphdrlen + sizeof( RioPkt::pktfx ));
05195 nbp->rq_fraglen0 = nbp->rq_maxpktl
05196 - (iphdrlen + udphdrlen + sizeof( RioPkt::pktf0 ));
05197
05198 nbp->rq_fragmax = 1 +
05199 ((nbp->nb_blockl - nbp->rq_fraglen0 + nbp->rq_fraglen - 1)
05200 / nbp->rq_fraglen);
05201
05202 #ifdef RIO_DEBUG2
05203 struct in_addr clientip;
05204 clientip.s_addr = ip;
05205 m_log << " RioNeti : SendData ip addr " << inet_ntoa(clientip)
05206 << " port " << htons( port )
05207 << " buflen " << buflen
05208 << " clientreqid " << reqid
05209 << " reqid " << nbp->nb_reqid<< endl;
05210 m_log << " nbp->rq_fragmax "<< nbp->rq_fragmax
05211 << " nbp->rq_fraglen0 " << nbp->rq_fraglen0
05212 << " nbp->rq_fraglen " << nbp->rq_fraglen << endl;
05213 #endif
05214
05215 if( StreamTraffic != 3 )
05216 nbp->nb_sendack = 1;
05217 else
05218 nbp->nb_sendack = 0;
05219
05220
05221 if( nbp->nb_sendack != 0 )
05222 {
05223 nbp->rq_retryfrag = 0;
05224 nbp->rq_retry = 5;
05225 nbp->rq_retrytim = 1 << (TimeShift - 1);
05226 }
05227 else
05228 {
05229 nbp->rq_retry = 0;
05230 }
05231
05232 nbp->rq_burstcnt = nbp->rq_fragburst;
05233 if( !iSendFragNext( nbp ) )
05234 iSendFrag( nbp );
05235
05236
05237 if( nbp->nb_sendack != 0 )
05238 {
05239 iTimeNew( nbp, nbp->rq_retrytim );
05240 }
05241 else
05242 {
05243
05244 qComplete( nbp, 0 );
05245 }
05246
05247 HashMutexUnlock( "snddata1" );
05248
05249 #ifdef RIO_DEBUG1
05250 m_log << "[RioNeti - SendData] Finish" << endl;
05251 #endif
05252 }
05253
05254
05255
05256 void RioNeti::sendblockcb( NetBuf *nbp, int type, char *cmdp, int cmdl )
05257 {
05258 #ifdef RIO_DEBUG1
05259 m_log << "[RioNeti - sendblockcb] Start" << endl;
05260 #endif
05261
05262 static int dbgcnt = 0;
05263 RioNeti *netiPtr = nbp->nb_rioneti;
05264
05265 if( netiPtr->m_debug & DEBUG_DATA )
05266 {
05267 m_log << " sendblockcb: " << type << " reqid " << nbp->nb_reqid
05268 << " hisreqid " << nbp->nb_hisreqid << endl;
05269 }
05270
05271 switch( (RioNeti::call_type)type )
05272 {
05273 case TYPE_RESULT:
05274 #ifdef RIO_DEBUG2
05275 m_log << "sendblockcb1 nbp->nb_result " << nbp->nb_result << endl;
05276 #endif
05277
05278 netiPtr->qComplete( nbp, ntohl(*(u32 *)cmdp) );
05279 break;
05280
05281 case TYPE_TIMEOUT:
05282 if( netiPtr->m_debug & DEBUG_TIMEOUT )
05283 {
05284 #ifdef RIO_DEBUG2
05285 m_log << " TIMEOUT: " << (int *) nbp
05286 << " reqid " << nbp->nb_reqid
05287 << " rq_fragsh " << nbp->rq_fragsh
05288 << " rq_fragburst " << nbp->rq_fragburst
05289 << " rq_burstcnt " << nbp->rq_burstcnt
05290 << " \n\trq_retryfrag " << nbp->rq_retryfrag
05291 << " rq_fraghav " << nbp->rq_fraghav
05292 << " rq_retry " << nbp->rq_retry
05293 << " rq_retrytim " << nbp->rq_retrytim
05294 << " rq_fragmax " << nbp->rq_fragmax
05295 << " \n\trq_fraglistn " << nbp->rq_fraglistn
05296 << " list " << nbp->rq_fraglist[0]
05297 << " " << nbp->rq_fraglist[1]
05298 << " " << nbp->rq_fraglist[2]
05299 << " " << nbp->rq_fraglist[3]
05300 << endl;
05301 #endif
05302
05303 if( ++dbgcnt > 30 )
05304 {
05305 dbgcnt = -1000000;
05306 }
05307 }
05308 if( nbp->rq_retryfrag == nbp->rq_fraghav )
05309 {
05310
05311 if( !netiPtr->iRetry( nbp ) )
05312 {
05313 #ifdef RIO_DEBUG1
05314 m_log << "[RioNeti - sendblockcb] Finish 1" << endl;
05315 #endif
05316
05317 return;
05318 }
05319
05320
05321 if( nbp->rq_fraglistn == 0 &&
05322 nbp->rq_fragsh == nbp->rq_fragmax
05323 )
05324 {
05325 nbp->rq_fragsh--;
05326 }
05327 }
05328 else
05329 {
05330
05331 nbp->rq_retryfrag = nbp->rq_fraghav;
05332 }
05333
05334
05335
05336 if( nbp->rq_fraghav == 0 )
05337 {
05338 nbp->rq_fragsh = 0;
05339 nbp->rq_fraglistn = 0;
05340 memset( nbp->rq_bits, 0, sizeof( nbp->rq_bits ) );
05341 }
05342
05343 nbp->rq_fragburst >>= 1;
05344 if( nbp->rq_fragburst < 4 )
05345 nbp->rq_fragburst = 4;
05346
05347 nbp->rq_burstcnt = nbp->rq_fragburst;
05348
05349 #ifdef RIO_DEBUG2
05350 m_log << "sendblockcb fragburst = burstcnt = "<< nbp->rq_fragburst
05351 << endl;
05352 #endif
05353
05354 if( !netiPtr->iSendFragNext( nbp ) )
05355 netiPtr->iSendFrag( nbp );
05356
05357 nbp->rq_retrytim <<= 1;
05358 netiPtr->iTimeNew( nbp, nbp->rq_retrytim );
05359
05360 #ifdef RIO_DEBUG1
05361 m_log << "[RioNeti - sendblockcb] Finish 2" << endl;
05362 #endif
05363
05364 return;
05365
05366 default:
05367 #ifdef RIO_DEBUG2
05368 m_log << "sendblockcb2 nbp->nb_result " << nbp->nb_result << endl;
05369 #endif
05370
05371
05372 netiPtr->qComplete( nbp, ERROR_RIONETI + 2 );
05373
05374 #ifdef RIO_DEBUG1
05375 m_log << "[RioNeti - sendblockcb] Finish 3" << endl;
05376 #endif
05377
05378 return;
05379 }
05380
05381 #ifdef RIO_DEBUG1
05382 m_log << "[RioNeti - sendblockcb] Finish 4" << endl;
05383 #endif
05384 }
05385
05386
05387 void RioNeti::prtHash( const char *s )
05388 {
05389 #ifdef RIO_DEBUG1
05390 m_log << "[RioNeti - prtHash] Start" << endl;
05391 #endif
05392
05393 int ix;
05394 NetBuf *rp;
05395
05396 m_log << " prtHash: " << s << endl;
05397 for( ix = 0; ix < XM_REQNUM; ix++ )
05398 {
05399 rp = reqarray[ ix ];
05400 while( rp != NULL )
05401 {
05402 char print[256];
05403 sprintf( print,
05404 "Hora: %d\t seq %5d\t%p nb_link %p reqid %d",
05405 (unsigned int)rp->nb_frag0arrive, ix, rp,
05406 rp->nb_link, (unsigned int)rp->nb_reqid );
05407 m_log << print << endl;
05408 rp = rp->nb_link;
05409 }
05410 }
05411
05412 #ifdef RIO_DEBUG1
05413 m_log << "[RioNeti - prtHash] Finish" << endl;
05414 #endif
05415 }
05416
05417
05418 void RioNeti::prtcnt( void )
05419 {
05420 #ifdef RIO_DEBUG1
05421 m_log << "[RioNeti - prtcnt] Start" << endl;
05422 #endif
05423
05424 m_log << " RioNeti counts --" << endl
05425 << " cnt_sndpkt " << cnt_sndpkt << ", cnt_rcvpkt " << cnt_rcvpkt
05426 << ", cnt_retry " << cnt_retry << endl
05427 << " cnt_rst1 " << cnt_rst1 << ", cnt_rst2 " << cnt_rst2 << endl
05428 << " cnt_rcvfragack " << cnt_rcvfragack << ", cnt_rcvfragknc "
05429 << cnt_rcvfragacknc << endl
05430 << " err_dupfrag " << err_dupfrag << ", err_overfrag "
05431 << err_overfrag << endl
05432 << " err_pktsize " << err_pktsize << ", err_pktmagic "
05433 << err_pktmagic << ", err_pktnoreq " << err_pktnoreq << endl
05434 << " err_pktcmd " << err_pktcmd << ", err_pktfmt " << err_pktfmt
05435 << endl;
05436
05437 #ifdef RIO_DEBUG1
05438 m_log << "[RioNeti - prtcnt] Finish" << endl;
05439 #endif
05440 }
05441
05442
05443
05444
05445
05446
05447 void RioNeti::mbinit( int num )
05448 {
05449 #ifdef RIO_DEBUG1
05450 m_log << "[RioNeti - mbinit] Single" << endl;
05451 #endif
05452
05453 mbc_num = num;
05454 }
05455
05456
05457
05458 NetBuf *RioNeti::mbget()
05459 {
05460 #ifdef RIO_DEBUG1
05461 m_log << "[RioNeti - mbget] Start" << endl;
05462 #endif
05463
05464 NetBuf *fp;
05465
05466 struct sockaddr_in Server;
05467
05468
05469 const char *UnblockSelectMsg = "UnblockSelect";
05470 int UnblockSelectSize = 13;
05471
05472 #ifdef WINDOWS
05473
05474 WaitForSingleObject( m_buf_lock, INFINITE );
05475 #else
05476
05477 pthread_mutex_lock( &m_buf_lock );
05478 #endif
05479
05480 fp = mbc_free;
05481 if( fp != NULL )
05482 {
05483 mbc_free = fp->nb_link;
05484 goto init;
05485 }
05486
05487 fp = new NetBuf;
05488
05489 if( fp != NULL )
05490 goto init;
05491
05492 m_log << " RioNeti::mbget: malloc failed, len " << sizeof( NetBuf )
05493 << endl;
05494
05495 abort();
05496
05497 #ifdef RIO_DEBUG1
05498 m_log << "[RioNeti - mbget] Finish 1" << endl;
05499 #endif
05500
05501 return 0;
05502
05503
05504 init:
05505
05506 #ifdef WINDOWS
05507
05508 ReleaseMutex( m_buf_lock );
05509 #else
05510
05511 pthread_mutex_unlock( &m_buf_lock );
05512 #endif
05513
05514 fp->nb_tidq.linkf = fp;
05515 fp->nb_tidq.linkb = fp;
05516 fp->nb_outrtn = 0;
05517 fp->nb_result = ERROR_RIONETI;
05518 fp->nb_callback = 0;
05519 fp->nb_blockp = 0;
05520 fp->nb_bufl = sizeof( fp->nb_buf );
05521 fp->nb_bufp = fp->nb_buf;
05522 fp->nb_sendp = fp->nb_buf;
05523 fp->nb_datap = fp->nb_buf;
05524 fp->nb_rioneti = this;
05525 fp->nb_unicastRequested = true;
05526
05527 fp->nb_sendack = 1;
05528
05529
05530 Server.sin_family = AF_INET;
05531 Server.sin_port = m_ipport;
05532 Server.sin_addr.s_addr = m_ipaddr;
05533 sendto( m_sock, UnblockSelectMsg, UnblockSelectSize, 0,
05534 (const struct sockaddr *) &Server, sizeof( Server ) );
05535
05536 #ifdef RIO_DEBUG1
05537 m_log << "[RioNeti - mbget] Finish 2" << endl;
05538 #endif
05539
05540 return fp;
05541 }
05542
05543 void RioNeti::mbfree( NetBuf *block )
05544 {
05545 #ifdef RIO_DEBUG1
05546 m_log << "[RioNeti - mbfree] Start" << endl;
05547 #endif
05548
05549 #ifdef WINDOWS
05550
05551 WaitForSingleObject( m_buf_lock, INFINITE );
05552 #else
05553
05554 pthread_mutex_lock( &m_buf_lock );
05555 #endif
05556
05557 block->nb_link = mbc_free;
05558 mbc_free = block;
05559
05560 #ifdef WINDOWS
05561
05562 ReleaseMutex( m_buf_lock );
05563 #else
05564
05565 pthread_mutex_unlock( &m_buf_lock );
05566 #endif
05567
05568 #ifdef RIO_DEBUG1
05569 m_log << "[RioNeti - mbfree] Finish" << endl;
05570 #endif
05571 }
05572
05573
05574
05575
05576 void RioNeti::mbterm()
05577 {
05578 #ifdef RIO_DEBUG1
05579 m_log << "[RioNeti - mbterm] Start" << endl;
05580 #endif
05581
05582 NetBuf *fp;
05583 while( mbc_free != NULL )
05584 {
05585 fp = mbc_free;
05586 mbc_free = fp->nb_link;
05587 delete fp;
05588 }
05589
05590 #ifdef RIO_DEBUG1
05591 m_log << "[RioNeti - mbterm] Finish" << endl;
05592 #endif
05593 }
05594
05595
05596 void RioNeti::dispsockopt( int fd )
05597 {
05598 #ifdef RIO_DEBUG1
05599 m_log << "[RioNeti - dispsockopt] Start" << endl;
05600 #endif
05601
05602 int opt;
05603
05604 #ifdef WINDOWS
05605
05606 int optlen = sizeof(opt);
05607 if( getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char FAR*)&opt, &optlen) )
05608 {
05609 #ifdef RIO_DEBUG1
05610 m_log << "[RioNeti - dispsockopt] Finish 1" << endl;
05611 #endif
05612
05613 return;
05614 }
05615 optlen = sizeof(opt);
05616 if( getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char FAR*)&opt, &optlen) )
05617 {
05618 #ifdef RIO_DEBUG1
05619 m_log << "[RioNeti - dispsockopt] Finish 2" << endl;
05620 #endif
05621
05622 return;
05623 }
05624 #else
05625
05626 socklen_t optlen = sizeof(opt);
05627 if( getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen) )
05628 {
05629 Rioperror( "getsockopt SO_SNDBUF" );
05630
05631 #ifdef RIO_DEBUG1
05632 m_log << "[RioNeti - dispsockopt] Finish 3" << endl;
05633 #endif
05634
05635 return;
05636 }
05637 m_log << " \tSO_SNDBUF " << opt << endl;
05638 optlen = sizeof(opt);
05639 if( getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, &optlen) )
05640 {
05641 Rioperror( "getsockopt SO_RCVBUF" );
05642
05643 #ifdef RIO_DEBUG1
05644 m_log << "[RioNeti - dispsockopt] Finish 4" << endl;
05645 #endif
05646
05647 return;
05648 }
05649 m_log << " \tSO_RCVBUF " << opt << endl;
05650 #endif
05651
05652 #ifdef RIO_DEBUG1
05653 m_log << "[RioNeti - dispsockopt] Finish 5" << endl;
05654 #endif
05655 }
05656
05657
05658 void RioNeti::mucksockopt( int fd )
05659 {
05660 #ifdef RIO_DEBUG1
05661 m_log << "[RioNeti - mucksockopt] Start" << endl;
05662 #endif
05663
05664 int opt;
05665
05666 if( m_debug & DEBUG_MISC )
05667 {
05668 m_log << " sock opt before:";
05669 dispsockopt( fd );
05670 m_log << " " << endl;
05671 }
05672
05673 #ifdef WINDOWS
05674
05675
05676 opt = 262144;
05677 if( setsockopt( fd, SOL_SOCKET, SO_SNDBUF, (char*)&opt, sizeof( opt ) ) )
05678 Rioperror( "setsockopt SO_SNDBUF" );
05679
05680 if( setsockopt( fd, SOL_SOCKET, SO_RCVBUF, (char*)&opt, sizeof( opt ) ) )
05681 Rioperror( "setsockopt SO_RCVBUF" );
05682 #else
05683
05684
05685
05686 opt = 655360;
05687 if( setsockopt( fd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof( opt ) ) )
05688 Rioperror( "setsockopt SO_SNDBUF" );
05689
05690 if( setsockopt( fd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof( opt ) ) )
05691 Rioperror( "setsockopt SO_RCVBUF" );
05692 #endif
05693
05694 if( m_debug & DEBUG_MISC )
05695 {
05696 m_log << " sock opt after: ";
05697 dispsockopt( fd );
05698 m_log << " " << endl;
05699 }
05700
05701 #ifdef RIO_DEBUG1
05702 m_log << "[RioNeti - mucksockopt] Finish" << endl;
05703 #endif
05704 }
05705
05706 void RioNeti::SocketMulticastLock( string msg )
05707 {
05708 #ifdef RIO_DEBUG2
05709 if( msg.length() )
05710 m_log << "[SocketMulticastLock] " << msg << "... " << endl;
05711 #endif
05712
05713 pthread_mutex_lock( &socketMulticastMutex );
05714
05715 #ifdef RIO_DEBUG2
05716 if( msg.length() )
05717 m_log << "..." << msg << " OK!" << endl;
05718 #endif
05719 }
05720
05721 void RioNeti::SocketMulticastUnlock( string msg )
05722 {
05723 #ifdef RIO_DEBUG2
05724 if( msg.length() )
05725 m_log << "[SocketMulticastUnlock] " << msg << endl;
05726 #endif
05727
05728 pthread_mutex_unlock( &socketMulticastMutex );
05729 }
05730
05731
05732 void RioNeti::HashMutexLock( string msg )
05733 {
05734 #ifdef RIO_DEBUG2
05735 if( msg.length() )
05736 m_log << "[HashMutexLock] " << msg << "... " << endl;
05737 #endif
05738
05739 #ifdef WINDOWS
05740
05741 WaitForSingleObject( HashMutex,INFINITE ); };
05742 #else
05743
05744 pthread_mutex_lock( &HashMutex );
05745 #endif
05746
05747 #ifdef RIO_DEBUG2
05748 if( msg.length() )
05749 m_log << "..." << msg << " OK!" << endl;
05750 #endif
05751 }
05752
05753 void RioNeti::HashMutexUnlock( string msg )
05754 {
05755 #ifdef RIO_DEBUG2
05756 if( msg.length() )
05757 m_log << "[HashMutexUnlock] " << msg << endl;
05758 #endif
05759
05760 #ifdef WINDOWS
05761
05762 ReleaseMutex( HashMutex ); };
05763 #else
05764
05765 pthread_mutex_unlock( &HashMutex );
05766 #endif
05767 }
05768
05769
05770 #ifdef RIO_DEBUG2
05771 void RioNeti::printDebug( short debugCode, string message )
05772 {
05773 #ifdef RIO_DEBUG1
05774 m_log << "[RioNeti - printDebug] Start" << endl;
05775 #endif
05776
05777 switch( debugCode )
05778 {
05779 case 1:
05780 m_log << "[RioNeti] Grupos a que faço parte: ";
05781 for( int ii = 0; ii < MULTICAST_SOCKETS; ii++ )
05782 if( multicastInfo[ii].isConnected )
05783 m_log << ntohs( multicastInfo[ii].myaddr_in.sin_port )
05784 << " - ";
05785 m_log << endl;
05786 m_log << message.c_str() << endl;
05787 break;
05788
05789 default:
05790 m_log << "debugCode = " << debugCode << endl;
05791 m_log << "valor para debugCode sem tratamento particular" << endl;
05792 m_log << message.c_str() << endl;
05793 }
05794
05795 #ifdef RIO_DEBUG1
05796 m_log << "[RioNeti - printDebug] Finish" << endl;
05797 #endif
05798
05799 return;
05800 }
05801 #endif
05802
05803
05804
05805
05806
05807 void *RioNeti::KeepaliveMapNAT( void *Param )
05808 {
05809 RioNeti *PRioNeti;
05810 struct sockaddr_in Server;
05811
05812
05813 const char *FakeMsg = "f";
05814 int FakeMsgSize = 1;
05815 unsigned int i;
05816
05817 #ifdef RIO_DEBUG1
05818 m_log << "[RioNeti - KeepaliveMapNAT] Start" << endl;
05819 #endif
05820
05821 #ifdef RIO_DEBUG2
05822 struct in_addr ip;
05823 #endif
05824 PRioNeti = (RioNeti *) Param;
05825
05826 if( ( PRioNeti->m_MappingInfo == NULL ) ||
05827 ( PRioNeti->m_MappingInfoSize == 0 ) )
05828 {
05829
05830 m_log << "RioNeti::KeepaliveMapNAT Erro interno na thread "
05831 << (int)PRioNeti->m_threadmap << ": m_MappingInfo = " << hex
05832 << PRioNeti->m_MappingInfo << dec
05833 << ", m_MappingInfoSize = " << PRioNeti->m_MappingInfoSize
05834 << endl;
05835
05836 #ifdef RIO_DEBUG1
05837 m_log << "[RioNeti - KeepaliveMapNAT] Finish1" << endl;
05838 #endif
05839
05840 pthread_exit( NULL );
05841 }
05842 else
05843 {
05844
05845
05846
05847 while( 1 )
05848 {
05849
05850
05851 for( i = 0; i < PRioNeti->m_MappingInfoSize; i++ )
05852 {
05853
05854 if( PRioNeti->m_MappingInfo[ i ].IsEnabled )
05855 {
05856 #ifdef RIO_DEBUG2
05857 ip.s_addr = PRioNeti->m_MappingInfo[ i ].IP;
05858 m_log << "KeepaliveMapNAT - Sending FAKE message to "
05859 << inet_ntoa( ip )<< ":"
05860 << ntohs(PRioNeti->m_MappingInfo[ i ].port) << "("
05861 << i << "/" << PRioNeti->m_MappingInfoSize << ")"
05862 << endl;
05863 #endif
05864 Server.sin_family = AF_INET;
05865 Server.sin_port = PRioNeti->m_MappingInfo[ i ].port;
05866 Server.sin_addr.s_addr = PRioNeti->m_MappingInfo[ i ].IP;
05867 sendto( PRioNeti->m_sock, FakeMsg, FakeMsgSize, 0,
05868 (const struct sockaddr *)&Server, sizeof( Server ) );
05869 }
05870 }
05871
05872
05873
05874 usleep( FAKEMSGINTERVAL );
05875 }
05876 }
05877
05878
05879 #ifdef RIO_DEBUG1
05880 m_log << "[RioNeti - KeepaliveMapNAT] Finish2" << endl;
05881 #endif
05882
05883 return NULL;
05884 }
05885
05886 int RioNeti::CreateKeepaliveThread( void )
05887 {
05888 pthread_attr_t attribKeepalive;
05889 int erro;
05890 unsigned int i;
05891 struct sockaddr_in Server;
05892
05893
05894 const char *FakeMsg = "f";
05895 int FakeMsgSize = 1;
05896
05897 #ifdef RIO_DEBUG1
05898 m_log << "[RioNeti - CreateKeepaliveThread] Start" << endl;
05899 #endif
05900
05901
05902 for( i = 0; i < m_MappingInfoSize; i++ )
05903 {
05904 if( m_MappingInfo[ i ].IsEnabled )
05905 {
05906 Server.sin_family = AF_INET;
05907 Server.sin_port = m_MappingInfo[ i ].port;
05908 Server.sin_addr.s_addr = m_MappingInfo[ i ].IP;
05909 sendto( m_sock, FakeMsg, FakeMsgSize, 0,
05910 (const struct sockaddr *)&Server, sizeof( Server ) );
05911 }
05912 }
05913
05914 erro = pthread_attr_init( &attribKeepalive );
05915 if( erro )
05916 {
05917 m_log << "RioNeti::CreateKeepaliveThread erro ao executar a funcao "
05918 << "pthread_attr_init, erro = " << erro << endl;
05919
05920 #ifdef RIO_DEBUG1
05921 m_log << "[RioNeti - CreateKeepaliveThread] Finish1" << endl;
05922 #endif
05923
05924 return erro;
05925 }
05926
05927 erro = pthread_attr_setstacksize( &attribKeepalive, 2*PTHREAD_STACK_MIN );
05928 if( erro )
05929 {
05930 m_log << "RioNeti::CreateKeepaliveThread erro ao executar a funcao "
05931 << "pthread_attr_setstacksize, erro = " << erro << endl;
05932
05933 #ifdef RIO_DEBUG1
05934 m_log << "[RioNeti - CreateKeepaliveThread] Finish2" << endl;
05935 #endif
05936
05937 return erro;
05938 }
05939
05940
05941 erro = pthread_create( &m_threadmap, &attribKeepalive,
05942 KeepaliveMapNAT, (void *)this );
05943 if( erro )
05944 {
05945 m_log << "RioNeti::CreateKeepaliveThread erro ao executar a funcao "
05946 << "pthread_create para criar a thread, erro = " << erro
05947 << endl;
05948
05949 #ifdef RIO_DEBUG1
05950 m_log << "[RioNeti - CreateKeepaliveThread] Finish3" << endl;
05951 #endif
05952
05953 return erro;
05954 }
05955
05956 #ifdef RIO_DEBUG1
05957 m_log << "[RioNeti - CreateKeepaliveThread] Finish4" << endl;
05958 #endif
05959
05960 return 0;
05961 }
05962
05963 bool RioNeti::IsBehindNAT()
05964 {
05965 #ifdef RIO_DEBUG1
05966 m_log << "[RioNeti - IsBehindNAT] Single" << endl;
05967 #endif
05968
05969 return( m_behindnat );
05970 }
05971
05972 void RioNeti::RequestMapping( int ipaddr, int port, int server,
05973 callback_t callback,
05974 void *callbackparm )
05975 {
05976 #ifdef RIO_DEBUG1
05977 m_log << "[RioNeti - RequestMapping] Start" << endl;
05978 #endif
05979
05980 HashMutexLock( "requestmapping" );
05981
05982
05983
05984
05985 NetBuf *nbp = pSend( ipaddr, port, server, RioNeti::TYPE_SENDMAP );
05986
05987 nbp->nb_usercall = callback;
05988 nbp->nb_userparm = callbackparm;
05989
05990 nbp->nb_callback = &sendcmdcb;
05991 nbp->rq_retry = 3;
05992 nbp->rq_retrytim = 1 << (TimeShift - 1);
05993 iTimeNew( nbp, nbp->rq_retrytim );
05994
05995
05996
05997
05998
05999
06000 m_MappingInfo[ server ].reqid = nbp->nb_reqid;
06001 m_MappingInfo[ server ].IP = ipaddr;
06002 m_MappingInfo[ server ].port = port;
06003
06004 m_MappingInfo[ server ].IsEnabled = false;
06005
06006 #ifdef RIO_DEBUG2
06007 struct in_addr clientip;
06008 clientip.s_addr = ipaddr;
06009 m_log << " RioNeti : RequestMapping ip addr " << inet_ntoa( clientip )
06010 << " port " << ntohs( port ) << " reqid " << nbp->nb_reqid << endl;
06011 #endif
06012
06013 qSend( nbp );
06014 HashMutexUnlock( "requestmapping" );
06015
06016 #ifdef RIO_DEBUG1
06017 m_log << "[RioNeti - RequestMapping] Finish" << endl;
06018 #endif
06019 }
06020
06021 void RioNeti::iSendMapAck( int ipaddr, int port, int reqid )
06022 {
06023 #ifdef RIO_DEBUG1
06024 m_log << "[RioNeti - iSendMapAck] Start" << endl;
06025 #endif
06026
06027 NetBuf *nbp;
06028 RioPkt::pktpmap *p;
06029
06030 if( m_debug & DEBUG_CMDS)
06031 {
06032 char print[256];
06033 sprintf( print, " SendMapAck: %8.8x %8.8x %8.8x" ,
06034 (unsigned)ipaddr, (unsigned)port, (unsigned)reqid);
06035 m_log << print << endl;
06036 }
06037
06038 nbp = mbget();
06039
06040 nbp->nb_toipaddr = ipaddr;
06041 nbp->nb_toipport = port;
06042 nbp->nb_sendp = nbp->nb_buf;
06043
06044
06045
06046
06047 p = (RioPkt::pktpmap *) nbp->nb_sendp;
06048 p->type = TYPE_MAPACK;
06049 p->IP = ipaddr;
06050 p->port = port;
06051 p->reqid = reqid;
06052 nbp->nb_sendl = sizeof( RioPkt::pktpmap );
06053 p->auth.makeauth( nbp->nb_sendp, nbp->nb_sendl, nbp );
06054 fSend( nbp );
06055
06056 #ifdef RIO_DEBUG1
06057 m_log << "[RioNeti - iSendMapAck] Finish" << endl;
06058 #endif
06059 }
06060
06061
06062
06063
06064
06065
06066
06067 void RioNeti::setLogRotation( CLogRotation *LogRotation )
06068 {
06069 #ifdef RIO_DEBUG1
06070 m_log << "[RioNeti - setLogRotation] Single" << endl;
06071 #endif
06072
06073
06074
06075 m_LogRotation = LogRotation;
06076 }
06077
06078
06079
06080
06081
06082
06083
06084 bool RioNeti::CancelCopyBlock( int reqid, int result )
06085 {
06086 #ifdef RIO_DEBUG1
06087 m_log << "[RioNeti - CancelCopyBlock] Start" << endl;
06088 #endif
06089
06090 NetBuf *nbp;
06091 bool status;
06092
06093 HashMutexLock( "cancelcopyblock" );
06094
06095
06096 nbp = iHashFind( reqid );
06097
06098 if( nbp != NULL )
06099 {
06100
06101 qComplete( nbp, result );
06102 status = true;
06103 }
06104 else
06105 status = false;
06106
06107 HashMutexUnlock( "cancelcopyblock" );
06108
06109 #ifdef RIO_DEBUG1
06110 m_log << "[RioNeti - CancelCopyBlock] Finish" << endl;
06111 #endif
06112
06113 return status;
06114 }
06115
06116
06117
06118
06119
06120
06121 RioNetiReqIdItemData::RioNetiReqIdItemData( int p_reqid, int l_reqid )
06122 {
06123 #ifdef RIO_DEBUG1
06124 RioErr << "[RioNetiReqIdItemData - Constructor] Start" << endl;
06125 #endif
06126
06127 buf = NULL;
06128 SetPreqid( p_reqid );
06129 SetLreqid( l_reqid );
06130
06131 #ifdef RIO_DEBUG1
06132 RioErr << "[RioNetiReqIdItemData - Constructor] Finish" << endl;
06133 #endif
06134 }
06135
06136
06137 void RioNetiReqIdItemData::SetPreqid( int p_reqid )
06138 {
06139 #ifdef RIO_DEBUG1
06140 RioErr << "[RioNetiReqIdItemData - SetPreqid] Single" << endl;
06141 #endif
06142
06143 this->p_reqid = p_reqid;
06144 }
06145
06146
06147 void RioNetiReqIdItemData::SetLreqid( int l_reqid )
06148 {
06149 #ifdef RIO_DEBUG1
06150 RioErr << "[RioNetiReqIdItemData - SetLreqid] Single" << endl;
06151 #endif
06152
06153 this->l_reqid = l_reqid;
06154 }
06155
06156
06157 void RioNetiReqIdItemData::SetBuf( char *buf )
06158 {
06159 #ifdef RIO_DEBUG1
06160 RioErr << "[RioNetiReqIdItemData - SetBuf] Single" << endl;
06161 #endif
06162
06163 this->buf = buf;
06164 }
06165
06166
06167 int RioNetiReqIdItemData::GetPreqid()
06168 {
06169
06170
06171
06172
06173 return this->p_reqid;
06174 }
06175
06176
06177 int RioNetiReqIdItemData::GetLreqid()
06178 {
06179 #ifdef RIO_DEBUG1
06180 RioErr << "[RioNetiReqIdItemData - GetLreqid] Single" << endl;
06181 #endif
06182
06183 return this->l_reqid;
06184 }
06185
06186
06187 char *RioNetiReqIdItemData::GetBuf()
06188 {
06189 #ifdef RIO_DEBUG1
06190 RioErr << "[RioNetiReqIdItemData - GetBuf] Single" << endl;
06191 #endif
06192
06193 return this->buf;
06194 }
06195
06196
06197 RioNetiReqIdItem::RioNetiReqIdItem( int p_reqid, int l_reqid )
06198 {
06199 #ifdef RIO_DEBUG1
06200 RioErr << "[RioNetiReqIdItem - Constructor] Start" << endl;
06201 #endif
06202
06203 SetNext( NULL );
06204 SetPreqid( p_reqid );
06205 SetLreqid( l_reqid );
06206
06207 #ifdef RIO_DEBUG1
06208 RioErr << "[RioNetiReqIdItem - Constructor] Finish" << endl;
06209 #endif
06210 }
06211
06212
06213 void RioNetiReqIdItem::SetNext( RioNetiReqIdItem *next )
06214 {
06215 #ifdef RIO_DEBUG1
06216 RioErr << "[RioNetiReqIdItem - SetNext] Single" << endl;
06217 #endif
06218
06219 this->next = next;
06220 }
06221
06222
06223 RioNetiReqIdItem *RioNetiReqIdItem::GetNext()
06224 {
06225
06226
06227
06228
06229 return next;
06230 }
06231
06232
06233
06234
06235 RioNetiReqIdTable::RioNetiReqIdTable()
06236 {
06237 #ifdef RIO_DEBUG1
06238 RioErr << "[RioNetiReqIdTable - Constructor] Single" << endl;
06239 #endif
06240
06241 SetFirst( NULL );
06242 }
06243
06244
06245 int RioNetiReqIdTable::Insert( RioNetiReqIdItem *item )
06246 {
06247 #ifdef RIO_DEBUG1
06248 RioErr << "[RioNetiReqIdTable - Insert] Start" << endl;
06249 #endif
06250
06251 if( item == NULL )
06252 {
06253 #ifdef RIO_DEBUG1
06254 RioErr << "[RioNetiReqIdTable - Insert] Finish 1" << endl;
06255 #endif
06256
06257 return RESULT_NETIREQT_INSERT_ERROR;
06258 }
06259
06260 RioNetiReqIdItem *current = NULL,
06261 *previous = NULL;
06262
06263 int result = Search( item->GetPreqid(), ¤t, &previous );
06264
06265 if( result == RESULT_NETIREQT_SEARCH_EMPTY_LIST )
06266 {
06267 SetFirst( item );
06268
06269 #ifdef RIO_DEBUG1
06270 RioErr << "[RioNetiReqIdTable - Insert] Finish 2" << endl;
06271 #endif
06272
06273 return RESULT_NETIREQT_INSERT_OK;
06274 }
06275
06276 if( result == RESULT_NETIREQT_SEARCH_NOT_FOUND )
06277 {
06278 item->SetNext( current );
06279 if( previous )
06280 previous->SetNext( item );
06281 else
06282 SetFirst( item );
06283
06284 #ifdef RIO_DEBUG1
06285 RioErr << "[RioNetiReqIdTable - Insert] Finish 3" << endl;
06286 #endif
06287
06288 return RESULT_NETIREQT_INSERT_OK;
06289 }
06290
06291 if( result == RESULT_NETIREQT_SEARCH_FOUND )
06292 {
06293 #ifdef RIO_DEBUG1
06294 RioErr << "[RioNetiReqIdTable - Insert] Finish 4" << endl;
06295 #endif
06296
06297 return RESULT_NETIREQT_INSERT_ALREADY_EXIST;
06298 }
06299
06300 #ifdef RIO_DEBUG1
06301 RioErr << "[RioNetiReqIdTable - Insert] Finish 5" << endl;
06302 #endif
06303
06304 return RESULT_NETIREQT_INSERT_ERROR;
06305 }
06306
06307
06308 int RioNetiReqIdTable::Remove( int p_reqid )
06309 {
06310 #ifdef RIO_DEBUG1
06311 RioErr << "[RioNetiReqIdTable - Remove] Start" << endl;
06312 #endif
06313
06314 RioNetiReqIdItem *current = NULL,
06315 *previous = NULL;
06316
06317 int result = Search( p_reqid, ¤t, &previous );
06318
06319 if( result == RESULT_NETIREQT_SEARCH_NOT_FOUND )
06320 {
06321 #ifdef RIO_DEBUG1
06322 RioErr << "[RioNetiReqIdTable - Remove] Finish 1" << endl;
06323 #endif
06324
06325 return RESULT_NETIREQT_REMOVE_NOT_FOUND;
06326 }
06327 else if( result == RESULT_NETIREQT_SEARCH_FOUND )
06328 {
06329 if( previous == NULL )
06330 SetFirst( current->GetNext() );
06331 else
06332 previous->SetNext( current->GetNext() );
06333
06334 delete current;
06335
06336 #ifdef RIO_DEBUG1
06337 RioErr << "[RioNetiReqIdTable - Remove] Finish 2" << endl;
06338 #endif
06339
06340 return RESULT_NETIREQT_REMOVE_OK;
06341 }
06342
06343 #ifdef RIO_DEBUG1
06344 RioErr << "[RioNetiReqIdTable - Remove] Finish 333n";
06345 #endif
06346
06347 return RESULT_NETIREQT_REMOVE_ERROR;
06348 }
06349
06350
06351 int RioNetiReqIdTable::Search( int p_reqid, RioNetiReqIdItem **current,
06352 RioNetiReqIdItem **previous )
06353 {
06354 #ifdef RIO_DEBUG1
06355 RioErr << "[RioNetiReqIdTable - Search] Start" << endl;
06356 #endif
06357
06358 if( current == NULL || previous == NULL )
06359 {
06360 #ifdef RIO_DEBUG1
06361 RioErr << "[RioNetiReqIdTable - Search] Finish 1" << endl;
06362 #endif
06363
06364 return RESULT_NETIREQT_SEARCH_NULL_REFERENCE;
06365 }
06366
06367 bool found = false;
06368
06369 RioNetiReqIdItem *aux = this->first;
06370 RioNetiReqIdItem *auxPrevious = NULL;
06371
06372 if( aux == NULL )
06373 {
06374 #ifdef RIO_DEBUG1
06375 RioErr << "[RioNetiReqIdTable - Search] Finish 2" << endl;
06376 #endif
06377
06378 return RESULT_NETIREQT_SEARCH_EMPTY_LIST;
06379 }
06380
06381 while( aux != NULL && !found )
06382 {
06383 if( p_reqid == aux->GetPreqid() )
06384 {
06385 found = true;
06386 }
06387 else
06388 {
06389 auxPrevious = aux;
06390 aux = aux->GetNext();
06391 }
06392 }
06393
06394 *previous = auxPrevious;
06395 *current = aux;
06396
06397 if( aux == NULL )
06398 {
06399 #ifdef RIO_DEBUG1
06400 RioErr << "[RioNetiReqIdTable - Search] Finish 3" << endl;
06401 #endif
06402
06403 return RESULT_NETIREQT_SEARCH_NOT_FOUND;
06404 }
06405
06406 #ifdef RIO_DEBUG1
06407 RioErr << "[RioNetiReqIdTable - Search] Finish 4" << endl;
06408 #endif
06409
06410 return RESULT_NETIREQT_SEARCH_FOUND;
06411 }
06412
06413 int RioNetiReqIdTable::SearchL( int l_reqid, RioNetiReqIdItem **current,
06414 RioNetiReqIdItem **previous )
06415 {
06416 #ifdef RIO_DEBUG1
06417 RioErr << "[RioNetiReqIdTable - Search] Start" << endl;
06418 #endif
06419
06420 if( current == NULL || previous == NULL )
06421 {
06422 #ifdef RIO_DEBUG1
06423 RioErr << "[RioNetiReqIdTable - Search] Finish 1" << endl;
06424 #endif
06425
06426 return RESULT_NETIREQT_SEARCH_NULL_REFERENCE;
06427 }
06428
06429 bool found = false;
06430
06431 RioNetiReqIdItem *aux = this->first;
06432 RioNetiReqIdItem *auxPrevious = NULL;
06433
06434 if( aux == NULL )
06435 {
06436 #ifdef RIO_DEBUG1
06437 RioErr << "[RioNetiReqIdTable - Search] Finish 2" << endl;
06438 #endif
06439
06440 return RESULT_NETIREQT_SEARCH_EMPTY_LIST;
06441 }
06442
06443 while( aux != NULL && !found )
06444 {
06445 if( l_reqid == aux->GetLreqid() )
06446 {
06447 found = true;
06448 }
06449 else
06450 {
06451 auxPrevious = aux;
06452 aux = aux->GetNext();
06453 }
06454 }
06455
06456 *previous = auxPrevious;
06457 *current = aux;
06458
06459 if( aux == NULL )
06460 {
06461 #ifdef RIO_DEBUG1
06462 RioErr << "[RioNetiReqIdTable - Search] Finish 3" << endl;
06463 #endif
06464
06465 return RESULT_NETIREQT_SEARCH_NOT_FOUND;
06466 }
06467
06468 #ifdef RIO_DEBUG1
06469 RioErr << "[RioNetiReqIdTable - Search] Finish 4" << endl;
06470 #endif
06471
06472 return RESULT_NETIREQT_SEARCH_FOUND;
06473 }
06474
06475
06476 void RioNetiReqIdTable::SetFirst( RioNetiReqIdItem *item )
06477 {
06478 #ifdef RIO_DEBUG1
06479 RioErr << "[RioNetiReqIdTable - SetFirst] Single" << endl;
06480 #endif
06481
06482 this->first = item;
06483 }
06484
06485
06486 RioNetiReqIdItem *RioNetiReqIdTable::GetFirst()
06487 {
06488 #ifdef RIO_DEBUG1
06489 RioErr << "[RioNetiReqIdTable - GetFirst] Single" << endl;
06490 #endif
06491
06492 return this->first;
06493 }
06494
06495
06496 NetBuf::NetBuf()
06497 {
06498 #ifdef RIO_DEBUG1
06499 RioErr << "[NetBuf - Constructor] Start" << endl;
06500 #endif
06501
06502 nb_link = NULL;
06503 nb_reqid = 0;
06504 rq_fragsh = 0;
06505 rq_fragburst = 0;
06506 rq_fragburst = 0;
06507 nb_outnext = NULL;
06508 rq_fraglistn = 0;
06509 nb_sendack = 1;
06510 nb_hisreqid = 0;
06511 nb_frag0arrive = 0;
06512 rq_maxpktl = 0;
06513 rq_fraglen0 = 0;
06514 rq_fraglen = 0;
06515 rq_fragmax = 0;
06516 rq_fraghav = 0;
06517 rq_fragDiscarded = 0;
06518 rq_fraghigh = 0;
06519 rq_fraglow = 0;
06520 rq_needack = 0;
06521 rq_fragackseq = 0;
06522 rq_retry = 0;
06523 rq_retryfrag = 0;
06524 rq_retrytim = 0;
06525 nb_result = 0;
06526 nb_rioneti = NULL;
06527 nb_blockp = NULL;
06528 nb_blockl = 0;
06529 nb_toipaddr = 0;
06530 nb_toipport = 0;
06531 nb_bufp = NULL;
06532 nb_bufl = 0;
06533 nb_recvl = 0;
06534 nb_datap = NULL;
06535 nb_datalm = NULL;
06536 nb_sendp = NULL;
06537 nb_sendl = 0;
06538 nb_userparm = NULL;
06539 memset( &rq_bits, 0, sizeof( rq_bits ) );
06540 memset( &nb_buf, 0, sizeof( nb_buf ) );
06541
06542 #ifdef RIO_DEBUG1
06543 RioErr << "[NetBuf - Constructor] Finish" << endl;
06544 #endif
06545
06546
06547 nb_timebetweencredits = ~0;
06548 }
06549
06550
06551
06552
06553
06554 void NetBuf::GetIPAndPort( int *ip, int *port )
06555 {
06556 *ip = nb_toipaddr;
06557 *port = nb_toipport;
06558 }
06559
06560 unsigned long long int NetBuf::GetTimeBetweenCredits()
06561 {
06562 return nb_timebetweencredits;
06563 }
06564
06565 int NetBuf::GetFragmentInfo( char **FragmentData, int *FragmentSize )
06566 {
06567 *FragmentData = new char[ nb_sendl ];
06568 if( *FragmentData == NULL )
06569 return ERROR_RIONETI + ERROR_MEMORY;
06570 memcpy( *FragmentData, nb_sendp, nb_sendl );
06571 *FragmentSize = nb_sendl;
06572 return S_OK;
06573 }