00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <routerinterface.h>
00026 #include <../interface/RioError.h>
00027
00028 #include <sserror.h>
00029 #include <extern.h>
00030
00031 #include <iostream>
00032 using namespace std;
00033
00034 #include <string.h>
00035
00036 #include <netinet/in.h>
00037 #include <arpa/inet.h>
00038 #include <sys/socket.h>
00039 #include <stdlib.h>
00040
00041
00042 #include <unistd.h>
00043 #include <sys/syscall.h>
00044
00045 const int InterfaceInactive = 0;
00046 const int InterfaceActive = 1;
00047
00048
00049
00050
00051
00052
00053
00054 CRouterInterface::CRouterInterface()
00055 {
00056 m_Status = InterfaceInactive;
00057 m_Socket = NULL;
00058 m_ConnectionActive = 0;
00059 m_DisconnectFlag = 0;
00060
00061
00062 m_InitializedRequestManager = false;
00063
00064 m_LogsDirectory = NULL;
00065 }
00066
00067
00068 CRouterInterface::~CRouterInterface()
00069 {
00070
00071 if( m_LogsDirectory != NULL )
00072 delete[] m_LogsDirectory;
00073
00074 }
00075
00076
00077
00078
00079
00080 int CRouterInterface::Initialize( u16 LocalPort, char *LogsDirectory )
00081 {
00082 if(m_Status == InterfaceActive)
00083 {
00084 return ERROR_SS_PROGRAM_ERROR;
00085 }
00086
00087 vsiIPaddress MyAddress;
00088
00089 MyAddress.Host = VSI_SOCKET_UNDEFINED_IP_ADDRESS;
00090 MyAddress.Port = LocalPort;
00091
00092 int stat;
00093
00094 stat = m_Connection.Initialize(&MyAddress,1);
00095 if(stat < 0)
00096 {
00097 RioErr << "CRouterInterface.Initialize - "
00098 << "Failed to initialize TCP socket:"
00099 << stat
00100 << " HOST: " << MyAddress.Host
00101 << " PORT: " << MyAddress.Port
00102 << endl;
00103 Stop();
00104 return ERROR_SS_TCP_INITIALIZATION;
00105 }
00106
00107 m_Status = InterfaceActive;
00108
00109 stat = m_WaitThread.Create(WaitThread,(void*) this);
00110 if(stat < 0)
00111 {
00112 RioErr << "Failed to create Wait Thread: "
00113 << stat << endl;
00114 return ERROR_SS_THREAD_CREATE;
00115 }
00116
00117
00118 m_LogsDirectory = new char[ strlen( LogsDirectory ) + 1];
00119 if( m_LogsDirectory == NULL )
00120 return ERROR_SS_MEMORY;
00121
00122 strcpy( m_LogsDirectory, LogsDirectory );
00123
00124 return (0);
00125 }
00126
00127
00128
00129 void CRouterInterface::Stop()
00130 {
00131
00132 m_Status = InterfaceInactive;
00133
00134
00135 m_Connection.Close();
00136
00137 m_DisconnectCondition.Lock();
00138 if(m_ConnectionActive)
00139 {
00140 m_ConnectionActive = 0;
00141 m_Socket->Close();
00142 }
00143 m_DisconnectCondition.Unlock();
00144
00145
00146
00147
00148 m_TxThread.Join(0);
00149 m_RxThread.Join(0);
00150 }
00151
00152
00153
00154
00155
00156 void CRouterInterface::Send(StrMsg* Elem)
00157 {
00158 m_TxQueue.Put(Elem);
00159 }
00160
00161
00162
00163 void* CRouterInterface::WaitThread(void* Param)
00164 {
00165
00166 CRouterInterface* Interface = (CRouterInterface*) Param;
00167 CvsiThread &m_TxThread = Interface->m_TxThread;
00168 CvsiThread &m_RxThread = Interface->m_RxThread;
00169 int &m_Status = Interface->m_Status;
00170 int &m_ConnectionActive = Interface->m_ConnectionActive;
00171 int &m_DisconnectFlag = Interface->m_DisconnectFlag;
00172 CvsiTCPwait &m_Connection = Interface->m_Connection;
00173 CvsiTCPsocket* &m_Socket = Interface->m_Socket;
00174 CvsiCondition &m_ConnectCondition = Interface->m_ConnectCondition;
00175 CvsiCondition &m_DisconnectCondition = Interface->m_DisconnectCondition;
00176
00177 CvsiTCPsocket* sock;
00178
00179 int stat;
00180 int errorcount = 0;
00181
00182 m_ConnectionActive = 0;
00183 m_DisconnectFlag = 1;
00184
00185 RioErr << "WAITTHREADID " << syscall( SYS_gettid ) << endl;
00186
00187 stat = m_TxThread.Create(TxThread,Param);
00188 if(stat < 0)
00189 {
00190 RioErr << "ERROR on Wait thread: Failed to create Tx thread: "
00191 << stat << endl;
00192 RioErr << "Aborting thread ..." << endl;
00193 Interface->Stop();
00194 return (void*) 0;
00195 }
00196
00197 stat = m_RxThread.Create(RxThread,(void*) Param);
00198 if(stat < 0)
00199 {
00200 RioErr << "ERROR on Wait thread: Failed to create Rx thread: " << stat
00201 << endl;
00202 RioErr << "Aborting thread ..." << endl;
00203 Interface->Stop();
00204 return (void*) 0;
00205 }
00206
00207 while(1)
00208 {
00209 sock = new CvsiTCPsocket;
00210
00211
00212 stat = m_Connection.Accept(sock,NULL);
00213
00214 RioErr << "Connection.Accept(): " << stat << endl;
00215
00216
00217 if(m_Status != InterfaceActive)
00218 {
00219 Interface->Stop();
00220 RioErr << "Exiting Wait thread ..." << endl;
00221 return (void*) 0;
00222 }
00223
00224
00225 if( stat < 0 )
00226 {
00227 errorcount++;
00228 RioErr << "Unexpected Error on TCP connection Accept()" << endl;
00229 if( errorcount > 4 )
00230 {
00231 RioErr << "Too many TCP connection errors: Aborting wait Thread"
00232 << endl;
00233 Interface->Stop();
00234 return (void *)0;
00235 }
00236 }
00237 else
00238 {
00239
00240
00241 errorcount = 0;
00242
00243
00244 m_DisconnectCondition.Lock();
00245
00246 if( m_ConnectionActive )
00247 {
00248 m_ConnectionActive = 0;
00249 m_Socket->Close();
00250 }
00251 while(m_DisconnectFlag == 0)
00252 {
00253 m_DisconnectCondition.Wait();
00254 }
00255 m_Socket = sock;
00256 m_DisconnectCondition.Unlock();
00257
00258
00259 m_ConnectCondition.Lock();
00260 m_ConnectionActive = 1;
00261 m_ConnectCondition.Signal();
00262 m_ConnectCondition.Unlock();
00263 }
00264 }
00265 return 0;
00266 }
00267
00268
00269
00270 void* CRouterInterface::TxThread(void* Param)
00271 {
00272 int stat;
00273
00274
00275 StrMsg* Msg;
00276
00277
00278 CRouterInterface* Interface = (CRouterInterface*) Param;
00279
00280
00281 CMsgQueue &m_TxQueue = Interface->m_TxQueue;
00282 int &m_Status = Interface->m_Status;
00283 CvsiTCPsocket* &m_Socket = Interface->m_Socket;
00284
00285 RioErr << "TXTHREADID " << syscall( SYS_gettid ) << endl;
00286
00287
00288 while(1)
00289 {
00290
00291 Msg = m_TxQueue.Get();
00292
00293
00294
00295 if(m_Status != InterfaceActive)
00296 {
00297 RioErr << "TxThread finished." << endl;
00298 return ( (void*) 0 );
00299 }
00300
00301 #ifdef RIO_DEBUG2
00302
00303 RioErr << "CRouterInterface::TxThread informacao sobre a mensagem com "
00304 << " o tipo " << Msg->Msg.Header.Type << " e com o tamanho de "
00305 << Msg->Msg.Header.Size << endl;
00306 #endif
00307
00308
00309 Msg->Msg.Header.Token = RSS_TOKEN_STORAGE;
00310
00311
00312 stat = m_Socket->Send((char*)& (Msg->Msg), Msg->Msg.Header.Size);
00313 if(stat < 0)
00314 {
00315 RioErr << "Unexpected Error on TCP socket Send()" << endl;
00316 }
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 MsgManager.Free (Msg);
00328 }
00329
00330 }
00331
00332
00333
00334 void* CRouterInterface::RxThread(void* Param)
00335 {
00336 int res;
00337
00338 StrMsg* Elem;
00339 MsgRSSrouter* Msg;
00340
00341
00342 CRouterInterface* Interface = (CRouterInterface*) Param;
00343
00344
00345 CvsiTCPsocket* &m_Socket = Interface->m_Socket;
00346 int &m_ConnectionActive = Interface->m_ConnectionActive;
00347 int &m_DisconnectFlag = Interface->m_DisconnectFlag;
00348 CvsiCondition &m_ConnectCondition = Interface->m_ConnectCondition;
00349 CvsiCondition &m_DisconnectCondition = Interface->m_DisconnectCondition;
00350
00351 int& m_Status = Interface->m_Status;
00352
00353 RioErr << "RXTHREADID " << syscall( SYS_gettid ) << endl;
00354
00355 m_ConnectCondition.Lock();
00356 while( m_ConnectionActive == 0)
00357 {
00358 m_ConnectCondition.Wait();
00359 }
00360 m_DisconnectFlag = 0;
00361 m_ConnectCondition.Unlock();
00362
00363 while(1)
00364 {
00365
00366 Elem = MsgManager.New();
00367 Msg = &(Elem->Msg.Router);
00368
00369
00370 res = m_Socket->Receive ( (char*)Msg, 4 );
00371
00372
00373 if(m_Status != InterfaceActive)
00374 {
00375
00376 MsgManager.Free(Elem);
00377 RioErr << "Exiting Rx thread ..." << endl;
00378 return (void*) 0;
00379 }
00380
00381
00382 if( m_ConnectionActive == 0 )
00383 {
00384
00385 MsgManager.Free(Elem);
00386
00387
00388 m_DisconnectCondition.Lock();
00389 m_DisconnectFlag = 1;
00390 m_DisconnectCondition.Signal();
00391 m_DisconnectCondition.Unlock();
00392
00393 m_ConnectCondition.Lock();
00394 while( m_ConnectionActive == 0)
00395 {
00396 m_ConnectCondition.Wait();
00397 }
00398 m_DisconnectFlag = 0;
00399 m_ConnectCondition.Unlock();
00400 }
00401 else
00402 {
00403 if( res < 0 )
00404 {
00405
00406 MsgManager.Free(Elem);
00407 RioErr << "Unexpected error on TCP receive(): " << res << endl;
00408
00409
00410 m_DisconnectCondition.Lock();
00411 m_ConnectionActive = 0;
00412 m_DisconnectFlag = 1;
00413 m_DisconnectCondition.Signal();
00414 m_DisconnectCondition.Unlock();
00415 }
00416 else
00417 {
00418 #ifdef RIO_DEBUG2
00419
00420 RioErr << "CRouterInterface::RxThread informacao sobre a "
00421 << "mensagem com o tipo " << Msg->Header.Type
00422 << " e com o tamanho de " << Msg->Header.Size << endl;
00423 #endif
00424
00425 u16 Size = Msg->Header.Size;
00426
00427
00428 if( (Size > SizeMsgRSS) || (Size < 4) )
00429 {
00430 int n,nr;
00431 RioErr << "Received message with invalid size: Type="
00432 << Msg->Header.Type << " Size:" << Msg->Header.Size
00433 << endl;
00434 nr = 4;
00435 while( nr < Size )
00436 {
00437 n = Size - nr;
00438 if( n > SizeMsgRSS )
00439 n = SizeMsgRSS;
00440 res = m_Socket->Receive( (char*)Msg,n );
00441
00442
00443 if(res < 0)
00444 {
00445 RioErr << "Unexpected Error on TCP "
00446 << "connection Receive()"
00447 << endl;
00448 nr = Size;
00449 }
00450 else
00451 {
00452 nr += n;
00453 }
00454 }
00455 MsgManager.Free(Elem);
00456 }
00457
00458 else
00459 {
00460
00461 if(Size > 4)
00462 {
00463 m_Socket->Receive (((char*)Msg)+4,Size-4);
00464 }
00465
00466 #ifdef RIO_DEBUG2
00467
00468 #endif
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479 Interface->ProcessMsg(Elem);
00480 }
00481 }
00482 }
00483 }
00484 }
00485
00486
00487
00488
00489
00490 void CRouterInterface::ProcessMsg(StrMsg* Elem)
00491 {
00492 MsgRSSrouter* Msg = &(Elem->Msg.Router);
00493
00494
00495 u16 Type = Msg->Header.Type & 0xff0f;
00496
00497 #ifdef RIO_DEBUG2
00498
00499
00500
00501 #endif
00502
00503
00504
00505
00506
00507
00508 if( m_InitializedRequestManager )
00509 {
00510
00511
00512 switch( Type )
00513 {
00514 case MSG_RSS_READ:
00515 case MSG_RSS_WRITE:
00516 case MSG_RSS_FETCH:
00517 case MSG_RSS_RECEIVE:
00518 ProcessNew ( Elem );
00519 break;
00520 case MSG_RSS_SEND:
00521 case MSG_RSS_FLUSH:
00522 #ifdef RIO_DEBUG2
00523 RioErr << "processpending" << endl;
00524 #endif
00525
00526 ProcessPending ( Elem );
00527 break;
00528 case MSG_RSS_CANCEL:
00529
00530 #ifdef RIO_DEBUG2
00531 RioErr << "processcancel" << endl;
00532 #endif
00533
00534 ProcessCancel ( Elem );
00535 break;
00536
00537
00538 case MSG_RSS_NODEINFO_REQ:
00539 #ifdef RIO_DEBUG2
00540 RioErr << "sendnodeinfo" << endl;
00541 #endif
00542
00543 SendNodeInfo( Elem );
00544 break;
00545
00546 case MSG_RSS_DISKINFO_REQ:
00547 #ifdef RIO_DEBUG2
00548 RioErr << "sendiskinfo" << endl;
00549 #endif
00550
00551 SendDiskInfo(Elem, Msg->DiskInfo.Disk);
00552 break;
00553 case MSG_RSS_DISKSERVICETIMEINFO_REQ:
00554
00555 #ifdef RIO_DEBUG2
00556 RioErr << "routerinterface - sendiskservicetimeinfo" << endl;
00557 #endif
00558 SendDiskServiceTimeInfo(Elem, Msg->DiskServiceTimeInfo.Disk,
00559 Msg->DiskServiceTimeInfo.DiskId );
00560 break;
00561 case MSG_RSS_INITIALIZATION_REQ:
00562
00563
00564
00565
00566
00567 ProcessInitialization( Elem, true );
00568 break;
00569
00570
00571
00572
00573 case MSG_RSS_START_SEARCH_IN_LOGS_REQ:
00574 #ifdef RIO_DEBUG2
00575 RioErr << "startsearchinlogs" << endl;
00576 #endif
00577
00578 ProcessStartSearchInLogs( Elem );
00579 break;
00580
00581
00582
00583 case MSG_RSS_BLOCK_SEARCH_FILE_REQ:
00584 #ifdef RIO_DEBUG2
00585 RioErr << "blocksearchfile" << endl;
00586 #endif
00587
00588 ProcessBlockSearchFile( Elem );
00589 break;
00590
00591
00592
00593 case MSG_RSS_REMOVE_SEARCH_FILE_REQ:
00594 #ifdef RIO_DEBUG2
00595 RioErr << "removesearchfile" << endl;
00596 #endif
00597
00598 ProcessRemoveSearchFile( Elem );
00599 break;
00600
00601
00602
00603 default:
00604 MsgManager.Free (Elem);
00605 RioErr << "Received invalid message type from router: "
00606 << Msg->Header.Type << " " << Type << endl;
00607 }
00608 }
00609 else
00610 {
00611 if( Type == MSG_RSS_INITIALIZATION_REQ ) {
00612 #ifdef RIO_DEBUG2
00613 RioErr << "routerinterface - initialization" << endl;
00614 #endif
00615
00616
00617
00618
00619 ProcessInitialization( Elem, false );
00620 }
00621 else
00622 {
00623 MsgManager.Free( Elem );
00624 RioErr << "Received a message type from router: "
00625 << Msg->Header.Type << " " << Type << " after initialization"
00626 << endl;
00627 }
00628 }
00629 }
00630
00631
00632
00633 void CRouterInterface::ProcessNew(StrMsg* Elem)
00634 {
00635
00636
00637 MsgRSSnewRequest* Msg = &(Elem->Msg.Router.Request.New);
00638
00639
00640 u16 Command = Msg->Type &0xf;
00641 u16 Reply;
00642 switch (Command)
00643 {
00644 case MSG_RSS_WRITE:
00645 Reply = MSG_RSS_WRITECOMPLETE;
00646 break;
00647 case MSG_RSS_RECEIVE:
00648 Reply = MSG_RSS_RECEIVECOMPLETE;
00649 break;
00650 case MSG_RSS_READ:
00651 Reply = MSG_RSS_SENDCOMPLETE;
00652 break;
00653 case MSG_RSS_FETCH:
00654 Reply = MSG_RSS_READCOMPLETE;
00655 break;
00656 default:
00657 MsgManager.Free(Elem);
00658 #ifdef RIO_DEBUG2
00659 RioErr << "UNEXPECTED ERROR: Processing new request. "
00660 << "Request has unexpected type:" << Command << endl;
00661 #endif
00662 return;
00663 }
00664
00665
00666 unsigned int disk = Msg->Disk;
00667 if(disk >= Config.nDisks)
00668 {
00669 #ifdef RIO_DEBUG2
00670 RioErr << "ProcessNew:Request invalid disk!!!"<<endl;
00671 #endif
00672 SendStatus( Elem,
00673 Reply,
00674 Msg->RouterId,
00675 ERROR_RSS_INVALID_DISK );
00676 return;
00677 }
00678
00679
00680 if( ( Msg->Pos + Msg->DataSize) > Config.Disk[disk].Size )
00681 {
00682 #ifdef RIO_DEBUG2
00683 RioErr << "ProcessNew:Request invalid position!!!"<<endl;
00684 #endif
00685 SendStatus( Elem,
00686 Reply,
00687 Msg->RouterId,
00688 ERROR_RSS_INVALID_POSITION );
00689 return;
00690 }
00691
00692
00693 if( ( Msg->DataSize > (Config.MaxReqSize * Config.FragSize )) ||
00694 ( Msg->DataSize == 0 ))
00695 {
00696 #ifdef RIO_DEBUG2
00697 RioErr << "ProcessNew:Request invalid data size!!!"<<endl;
00698 #endif
00699 SendStatus( Elem,
00700 Reply,
00701 Msg->RouterId,
00702 ERROR_RSS_INVALID_DATASIZE );
00703 return;
00704 }
00705
00706 StrRequest* Request = RequestManager.New();
00707
00708 if( Request == 0 )
00709 {
00710 #ifdef RIO_DEBUG2
00711 RioErr << "ProcessNew:Request not available!!!"<<endl;
00712 #endif
00713 SendStatus( Elem,
00714 Reply,
00715 Msg->RouterId,
00716 ERROR_RSS_MAX_REQUEST );
00717 return;
00718 }
00719
00720
00721 Request->Command = Msg->Type;
00722 Request->Status = RequestStatusAccepted;
00723 Request->ClientId = Msg->ClientId;
00724 Request->RouterId = Msg->RouterId;
00725 Request->IPaddress = Msg->IPaddr;
00726 Request->Port = Msg->Port;
00727 Request->Disk = Msg->Disk;
00728 Request->Pos = Msg->Pos;
00729 Request->Size = Msg->DataSize;
00730
00731 Request->StreamTraffic = Msg->StreamTraffic;
00732
00733 Request->VideoRate = Msg->VideoRate;
00734
00735 #ifdef RIO_DEBUG2
00736 struct in_addr client_IP;
00737 client_IP.s_addr = Request->IPaddress;
00738 struct sockaddr_in client_PORT;
00739 client_PORT.sin_port = Request->Port;
00740
00741 RioErr << " ------------------------------- " << endl;
00742 RioErr << " ClientId: " << Request->ClientId << endl;
00743 RioErr << " IPaddress " << inet_ntoa( client_IP ) << endl;
00744 RioErr << " Port " << htons(client_PORT.sin_port)<< endl;
00745 RioErr << " Request->Disk " << Request->Disk << endl;
00746 #endif
00747
00748
00749
00750 MsgManager.Free(Elem);
00751
00752 switch (Command)
00753 {
00754 case MSG_RSS_READ:
00755 case MSG_RSS_FETCH:
00756 #ifdef RIO_DEBUG2
00757 RioErr << " RouterInterface: Device : ProcessRequest " << endl;
00758 #endif
00759 Device[Request->Disk].ProcessRequest( Request );
00760 break;
00761 case MSG_RSS_WRITE:
00762 case MSG_RSS_RECEIVE:
00763 #ifdef RIO_DEBUG2
00764 RioErr << " RouterInterface: sending to client " << endl;
00765 #endif
00766 Client.Receive(Request);
00767 break;
00768 default:
00769 RioErr << "UNEXPECTED ERROR: Router interface processing a "
00770 << "new request and found invalid request type: "
00771 << Command << endl;
00772 }
00773 }
00774
00775
00776
00777 void CRouterInterface::ProcessPending(StrMsg* Elem)
00778 {
00779 MsgRSSpendRequest* Msg = & (Elem->Msg.Router.Request.Pending);
00780
00781
00782 u16 Command = Msg->Type &0xf;
00783 u16 Reply;
00784 switch (Command)
00785 {
00786 case MSG_RSS_FLUSH:
00787 Reply = MSG_RSS_WRITECOMPLETE;
00788 break;
00789 case MSG_RSS_SEND:
00790 Reply = MSG_RSS_SENDCOMPLETE;
00791 break;
00792 default:
00793 MsgManager.Free(Elem);
00794 RioErr << "UNEXPECTED ERROR: Processing pending request. "
00795 << "Request has unexpected type:" << Command << endl;
00796 return;
00797 }
00798
00799
00800
00801
00802 StrRequest* Request = RequestManager.Get(Msg->StorageId);
00803
00804
00805 if( Request == 0 )
00806 {
00807 RioErr << "ProcessPending:Request " << Msg->StorageId
00808 << " not found !!!"<<endl;
00809 SendStatus( Elem,
00810 Reply,
00811 Msg->RouterId,
00812 ERROR_RSS_INVALID_REQUESTID );
00813 return;
00814 }
00815
00816
00817 if( Request->RouterId != Msg->RouterId)
00818 {
00819 RioErr << "ProcessPending:Request " << Msg->RouterId
00820 << " != of " << Request->RouterId << "!!!"<<endl;
00821 SendStatus( Elem,
00822 Reply,
00823 Msg->RouterId,
00824 ERROR_RSS_INVALID_REQUESTID );
00825 return;
00826 }
00827
00828
00829
00830 if( (( Command == MSG_RSS_FLUSH ) &&
00831 ( Request->Status != RequestStatusWaitFlushCommand ))
00832 || (( Command == MSG_RSS_SEND ) &&
00833 ( Request->Status != RequestStatusWaitSendCommand )) )
00834 {
00835 RioErr << "ProcessPending:Request invalid !!!"<<endl;
00836 SendStatus( Elem,
00837 Reply,
00838 Msg->RouterId,
00839 ERROR_RSS_INVALID_REQUESTID );
00840 return;
00841 }
00842
00843
00844
00845 Request->Command = Msg->Type;
00846
00847
00848
00849
00850
00851
00852 Request->ClientId = Msg->ClientId;
00853 Request->IPaddress = Msg->IPaddr;
00854 Request->Port = Msg->Port;
00855
00856 Request->StreamTraffic = Msg->StreamTraffic;
00857
00858 Request->VideoRate = Msg->VideoRate;
00859
00860
00861
00862
00863 MsgManager.Free(Elem);
00864
00865
00866 switch (Command)
00867 {
00868 case MSG_RSS_FLUSH:
00869 Device[Request->Disk].ProcessRequest(Request);
00870 break;
00871 case MSG_RSS_SEND:
00872 RioErr << "YJYJYJYJYJYJYJYJY" <<endl;
00873 Client.Send(Request);
00874 break;
00875 default:
00876 RioErr << "UNEXPECTED ERROR: Router interface processing a "
00877 << "pending request "
00878 << "and found invalid request type "<< Command << endl;
00879 }
00880
00881 }
00882
00883
00884 void CRouterInterface::ProcessCancel(StrMsg* Elem)
00885 {
00886 MsgRSSpendRequest* Msg = & (Elem->Msg.Router.Request.Pending);
00887
00888
00889 u16 Command = Msg->Type &0xf;
00890 u16 Reply;
00891 switch (Command)
00892 {
00893 case MSG_RSS_CANCEL:
00894 Reply = MSG_RSS_CANCELCOMPLETE;
00895 break;
00896 default:
00897 MsgManager.Free(Elem);
00898 RioErr << "UNEXPECTED ERROR: Processing cancel request. "
00899 << "Request has unexpected type:" << Command << endl;
00900 return;
00901 }
00902
00903
00904
00905 StrRequest* Request = RequestManager.Get(Msg->StorageId);
00906
00907
00908 if( Request == 0 )
00909 {
00910 RioErr << "ProcessCancel:Request " << Msg->StorageId
00911 << " not found !!!"<<endl;
00912 SendStatus( Elem,
00913 Reply,
00914 Msg->RouterId,
00915 ERROR_RSS_INVALID_REQUESTID );
00916 return;
00917 }
00918
00919
00920 if( Request->RouterId != Msg->RouterId)
00921 {
00922 RioErr << "ProcessCancel:Request " << Msg->RouterId
00923 << " != of " << Request->RouterId << "!!!"<<endl;
00924 SendStatus( Elem,
00925 Reply,
00926 Msg->RouterId,
00927 ERROR_RSS_INVALID_REQUESTID );
00928 return;
00929 }
00930
00931
00932
00933 if( ( Command == MSG_RSS_CANCEL ) &&
00934 ( Request->Status != RequestStatusWaitSendCommand ))
00935 {
00936 RioErr << "ProcessCancel:Request invalid !!!"<<endl;
00937 SendStatus( Elem,
00938 Reply,
00939 Msg->RouterId,
00940 ERROR_RSS_INVALID_REQUESTID );
00941 return;
00942 }
00943
00944 #ifdef RIO_DEBUG2
00945 RioErr << "ProcessCancel:Request canceled ( " << Msg->StorageId << " )!!!"
00946 <<endl;
00947 #endif
00948
00949 RequestManager.Free(Request);
00950
00951
00952 SendStatus( Elem, Reply, Msg->RouterId, 0 );
00953 }
00954
00955
00956
00957 void CRouterInterface::SendStatus( StrMsg* Elem,
00958 u16 Type,
00959 u32 RouterId,
00960 s32 Error)
00961 {
00962 MsgRSSstatus* Msg = &(Elem->Msg.Storage.Status);
00963 Msg->Type = Type;
00964 Msg->Size = SizeMsgRSSstatus;
00965 Msg->RouterId = RouterId;
00966 Msg->StorageId = 0;
00967 Msg->Error = Error;
00968 Send(Elem);
00969 }
00970
00971
00972
00973 void CRouterInterface::SendNodeInfo(StrMsg* Elem)
00974 {
00975 MsgRSSnodeInfo* Msg = &(Elem->Msg.Storage.NodeInfo);
00976
00977 Msg->Type = MSG_RSS_NODEINFO;
00978 Msg->Size = SizeMsgRSSnodeInfo;
00979 Msg->nDisks = Config.nDisks;
00980 Msg->FragSize = Config.FragSize;
00981 Msg->BufferSize = Config.BufferSize;
00982 Msg->MaxReqSize = Config.MaxReqSize;
00983 Msg->MaxReq = Config.MaxReq;
00984
00985 Send (Elem);
00986 }
00987
00988
00989
00990 void CRouterInterface::SendDiskInfo(StrMsg* Elem, int Disk)
00991 {
00992 MsgRSSdiskInfo* Msg = &(Elem->Msg.Storage.DiskInfo);
00993 if( Disk >= ((int)Config.nDisks ))
00994 {
00995 RioErr << "Unexpected ERROR on CRouterInterface::SendDiskInfo : "
00996 << "Invalid Disk: " << Disk << endl;
00997 MsgManager.Free (Elem);
00998 }
00999
01000 Msg->Type = MSG_RSS_DISKINFO;
01001 Msg->Size = SizeMsgRSSdiskInfo;
01002 Msg->Disk = Disk;
01003 Msg->DiskSize = Config.Disk[Disk].Size;
01004 Msg->SectorSize = Config.Disk[Disk].SectorSize;
01005
01006 strcpy( Msg->DiskName, Config.Disk[Disk].Name );
01007
01008 Send (Elem);
01009 }
01010
01011
01012
01013 void CRouterInterface::SendDiskServiceTimeInfo( StrMsg* Elem, int Disk,
01014 int DiskId )
01015 {
01016 MsgRSSdiskServiceTimeInfo* Msg = &(Elem->Msg.Storage.DiskServiceTimeInfo);
01017 double etaccthreads[301];
01018 double estimatedtime;
01019
01020 if( Disk >= ((int)Config.nDisks ))
01021 {
01022 RioErr << "Unexpected ERROR on CRouterInterface::"
01023 << "SendDiskServiceTimeInfo."
01024 << "Invalid Disk: " << Disk << endl;
01025 MsgManager.Free (Elem);
01026 return;
01027 }
01028
01029 memset(etaccthreads,0,sizeof(etaccthreads));
01030 Msg->Type = MSG_RSS_DISKSERVICETIMEINFO;
01031 Msg->Size = SizeMsgRSSdiskServiceTimeInfo;
01032 Msg->Disk = Disk;
01033 Msg->DiskId = DiskId;
01034 Device[ Disk ].GetDiskServiceTime( etaccthreads );
01035 estimatedtime = Device[ Disk ].GetEstimatedTime();
01036 Msg->EstimatedDiskServTime = estimatedtime;
01037
01038
01039 memcpy( Msg->EstimatedServTimeAccT,etaccthreads,sizeof(etaccthreads));
01040
01041
01042 #ifdef RIO_DEBUG2
01043 RioErr << "Sending disk service time info of " << Disk
01044 << " DiskId " << DiskId << endl;
01045 RioErr << "EstimatedTime " << Msg->EstimatedDiskServTime << " msec"<< endl;
01046 #endif
01047 Send (Elem);
01048 }
01049
01050
01051
01052 void CRouterInterface::ProcessInitialization( StrMsg* Elem, bool Restart )
01053 {
01054
01055 MsgRSSInitializationReq* Msg = &( Elem->Msg.Router.Initialization );
01056 if( Restart )
01057 {
01058
01059
01060
01061
01062
01063
01064 RioErr << "Received a duplicated initialization message. Block size = "
01065 << Msg->BlockSize << endl;
01066 if( Msg->BlockSize != Config.FragSize )
01067 {
01068 RioErr << "Fatal error: new block size " << Msg->BlockSize
01069 << " is different of actual block size " << Config.FragSize
01070 << endl;
01071 exit( -1 );
01072
01073 }
01074 }
01075 else
01076 {
01077
01078
01079 Config.FragSize = Msg->BlockSize;
01080
01081 int stat = RequestManager.Initialize( Config.MaxReq, Config.FragSize );
01082 if( stat < 0 )
01083 {
01084 RioErr << "Error when initializing Request Manager: " << stat << endl;
01085 exit( 1 );
01086 }
01087
01088 m_InitializedRequestManager = true;
01089
01090 RioErr << "Request Manager started. Max number of requests: "
01091 << Config.MaxReq << " (array created). Block size: "
01092 << Config.FragSize << endl;
01093
01094
01095 stat = SearchPackagesLogs.Initialize( &SentPackagesLog, &NetManager,
01096 m_LogsDirectory, "Storage-",
01097 Config.FragSize, &NetInterface );
01098 if( stat != S_OK )
01099 {
01100 RioErr << "Error when initializing SearchPackagesLogs: " << stat
01101 << endl;
01102 exit( 1 );
01103 }
01104 }
01105
01106
01107
01108 SendConfirmation( Elem );
01109 }
01110
01111
01112
01113 void CRouterInterface::SendConfirmation( StrMsg* Elem )
01114 {
01115 MsgRSSConfirmation* Msg = &( Elem->Msg.Storage.Confirmation );
01116
01117 Msg->Type = MSG_RSS_CONFIRMATION;
01118 Msg->Size = SizeMsgRSSConfirmation;
01119
01120 Send( Elem );
01121 }
01122
01123 void CRouterInterface::ProcessStartSearchInLogs( StrMsg* Elem )
01124 {
01125 int status;
01126 MsgRSSStartSearchInLogsReq* Msg = &( Elem->Msg.Router.StartSearchInLogs );
01127 #ifdef RIO_DEBUG2
01128 struct in_addr ip;
01129 ip.s_addr = Msg->IPaddr;
01130 RioErr << "CRouterInterface::ProcessStartSearchInLogs recebida uma "
01131 << "requisicao de busca do tipo " << Msg->SearchType
01132 << ", do tempo " << Msg->StartTime << " ate o tempo "
01133 << Msg->EndTime << " para o cliente com o IP " << inet_ntoa( ip )
01134 << " e a porta " << ntohs( Msg->Port ) << ", usando a Id "
01135 << Msg->ClientId << endl;
01136 #endif
01137 status = SearchPackagesLogs.SearchLogsRequest( Msg->StartTime, Msg->EndTime,
01138 Msg->IPaddr, Msg->Port,
01139 Msg->ClientId );
01140 #ifdef RIO_DEBUG2
01141 if( status != S_OK )
01142 {
01143 RioErr << "CRouterInterface::ProcessStartSearchInLogs erro " << status
01144 << "(" << GetErrorDescription( status ) << ") ao processar a "
01145 << " solicitacao de busca." << endl;
01146 SendSearchLogsStatus( Elem, Msg->IPaddr, Msg->Port, Msg->ClientId,
01147 MSG_RSS_START_SEARCH_IN_LOGS_REQ,
01148 ERROR_SS_SEARCH_IN_LOGS );
01149 }
01150 else
01151 SendSearchLogsStatus( Elem, Msg->IPaddr, Msg->Port, Msg->ClientId,
01152 MSG_RSS_START_SEARCH_IN_LOGS_REQ, S_OK );
01153 #endif
01154 }
01155
01156 void CRouterInterface::ProcessBlockSearchFile( StrMsg* Elem )
01157 {
01158 int status;
01159 MsgRSSBlockSearchFileReq* Msg = &( Elem->Msg.Router.BlockSearchFile );
01160 #ifdef RIO_DEBUG2
01161 struct in_addr ip;
01162 ip.s_addr = Msg->IPaddr;
01163 RioErr << "CRouterInterface::ProcessBlockSearchFile recebida uma "
01164 << "requisicao para o pedido do bloco " << Msg->Block
01165 << " do arquivo com a ID " << Msg->FileId << " do cliente com o IP "
01166 << inet_ntoa( ip ) << " e a porta " << ntohs( Msg->Port )
01167 << ", usando a Id " << Msg->ClientId << endl;
01168 #endif
01169 status = SearchPackagesLogs.SearchResultDataRequest( Msg->IPaddr, Msg->Port,
01170 Msg->FileId,
01171 Msg->Block,
01172 Msg->ClientId );
01173
01174 #ifdef RIO_DEBUG2
01175 if( status != S_OK )
01176 {
01177 RioErr << "CRouterInterface::ProcessRemoveSearchFile erro " << status
01178 << "(" << GetErrorDescription( status ) << ") ao remover o "
01179 << "arquivo com o resultado da busca." << endl;
01180 SendSearchLogsStatus( Elem, Msg->IPaddr, Msg->Port, Msg->ClientId,
01181 MSG_RSS_BLOCK_SEARCH_FILE_REQ,
01182 ERROR_SS_REMOVE_SEARCH_FILE );
01183 }
01184 else
01185 SendSearchLogsStatus( Elem, Msg->IPaddr, Msg->Port, Msg->ClientId,
01186 MSG_RSS_BLOCK_SEARCH_FILE_REQ, S_OK );
01187 #endif
01188 }
01189
01190 void CRouterInterface::ProcessRemoveSearchFile( StrMsg* Elem )
01191 {
01192 int status;
01193 MsgRSSRemoveSearchFileReq* Msg = &( Elem->Msg.Router.RemoveSearchFile );
01194 #ifdef RIO_DEBUG2
01195 RioErr << "CRouterInterface::ProcessRemoveSearchFile recebida uma "
01196 << "requisicao para remover o arquivo com a ID " << Msg->FileId
01197 << endl;
01198 #endif
01199 status = SearchPackagesLogs.RemoveSearchResultFile( Msg->FileId );
01200 #ifdef RIO_DEBUG2
01201 if( status != S_OK )
01202 {
01203 RioErr << "CRouterInterface::ProcessRemoveSearchFile erro " << status
01204 << "(" << GetErrorDescription( status ) << ") ao remover o "
01205 << "arquivo com o resultado da busca." << endl;
01206 SendSearchLogsStatus( Elem, 0, 0, 0, MSG_RSS_REMOVE_SEARCH_FILE_REQ,
01207 ERROR_SS_REMOVE_SEARCH_FILE );
01208 }
01209 else
01210 SendSearchLogsStatus( Elem, 0, 0, 0, MSG_RSS_REMOVE_SEARCH_FILE_REQ,
01211 S_OK );
01212 #endif
01213 }
01214
01215 #ifdef RIO_DEBUG2
01216 void CRouterInterface::SendSearchLogsStatus( StrMsg* Elem, u32 ClientIP,
01217 u16 ClientPort, u32 ClientId,
01218 u16 ResultType, s32 Status )
01219 {
01220 MsgRSSSearchLogsStatus* Msg = &( Elem->Msg.Storage.SearchLogsStatus );
01221
01222 RioErr << "CRouterInterface::SendSearchLogsStatus enviando a mensagem de "
01223 << "status de um dos pedidos relacionados a busca nos logs: ";
01224 if( ResultType != MSG_RSS_REMOVE_SEARCH_FILE_REQ )
01225 {
01226 struct in_addr ip;
01227 ip.s_addr = ClientIP;
01228 RioErr << "IP do cliente = " << inet_ntoa( ip ) << ", porta do "
01229 << "cliente = " << ntohs( ClientPort ) << ", ";
01230 }
01231 RioErr << "ResultType = " << ResultType << ", Error = " << Status << endl;
01232
01233 Msg->Type = MSG_RSS_SEARCHLOGS_STATUS;
01234 Msg->Size = SizeMsgRSSSearchLogsStatus;
01235 Msg->IPaddr = ClientIP;
01236 Msg->Port = ClientPort;
01237 Msg->ClientId = ClientId;
01238 Msg->StatusType = ResultType;
01239 Msg->Error = Status;
01240
01241 Send( Elem );
01242 }
01243 #endif
01244