00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "DiskMgr.h"
00036 #include "token.h"
00037 #include "Random.h"
00038 #include "RioError.h"
00039 #include "td4list.h"
00040 #include "Event.h"
00041
00042 #include "SystemManager.h"
00043 #include "MonitorTable.h"
00044
00045 #include <stdlib.h>
00046 #include <pthread.h>
00047 #include <netinet/in.h>
00048 #include <string.h>
00049 #include <errno.h>
00050 #include <unistd.h>
00051 #include <math.h>
00052
00053 #include <sys/socket.h>
00054 #include <netinet/in.h>
00055 #include <arpa/inet.h>
00056
00057 #include <sys/types.h>
00058 #include <sys/stat.h>
00059 #include <fcntl.h>
00060
00061
00062 #include <sys/syscall.h>
00063
00064 const char* const LOGFILE = "RIODiskMgr.log";
00065
00066 const char* const CONFIGNAME = "RIOdisk.cfg";
00067
00068 extern CEventManager EventManager;
00069
00070
00071 DiskMgr::DiskMgr()
00072 {
00073 m_disks = 0;
00074
00075 m_BlockSize = 0;
00076 m_MaxDisks = 0;
00077 m_MetaRoot = NULL;
00078 m_Router = NULL;
00079 m_SystemManager = NULL;
00080 m_thread = 0;
00081 m_termflag = 0;
00082
00083
00084
00085 m_numDisks = 1;
00086 m_MaxReps = 0;
00087
00088 m_cntAllocMult = m_cntAllocRetry = m_cntAllocFail = 0;
00089
00090 pthread_mutex_init( &m_mutex, NULL );
00091
00092 m_diska = 0;
00093 m_diskanum = 0;
00094
00095 m_disklist = 0;
00096 m_nodelist = 0;
00097
00098
00099 m_NumberOfActiveDisks = 0;
00100 m_NumberOfStorageNodes = 0;
00101
00102 }
00103
00104 DiskMgr::~DiskMgr()
00105 {
00106 int i;
00107 SNode *np;
00108
00109
00110 if( m_disks )
00111 {
00112 for( i = 0; i <= m_MaxDisks; i++ )
00113 {
00114 if( m_disks[i] )
00115 {
00116 delete m_disks[i];
00117 m_disks[i] = 0;
00118 }
00119 }
00120 delete [] m_disks;
00121 m_disks = 0;
00122 }
00123 if( m_diska )
00124 {
00125 delete [] m_diska;
00126 m_diska = 0;
00127 }
00128
00129
00130 while( m_nodelist != NULL )
00131 {
00132 np = m_nodelist->sn_link;
00133 delete m_nodelist;
00134 m_nodelist = np;
00135 }
00136
00137 pthread_mutex_destroy( &m_mutex );
00138
00139 if( m_log.is_open() ) m_log.close();
00140
00141 }
00142
00143 int DiskMgr::CleanUp()
00144 {
00145 return 0;
00146 }
00147
00148
00149
00150
00151 int DiskMgr::Initialize( DiskMgrConfig *Config )
00152 {
00153 int i;
00154 int rc;
00155
00156 if( Config->GenerateLogs )
00157 {
00158
00159 char LogFileName[ MaxPathSize ];
00160
00161 strcpy( LogFileName, Config->LogsDirectory );
00162 strcat( LogFileName, LOGFILE );
00163 m_log.open( LogFileName );
00164 if( !m_log.is_open() )
00165 {
00166 return ERROR_OBJECTMANAGER + ERROR_LOGFILE;
00167 }
00168 }
00169
00170 if( m_log.is_open() )
00171 m_log << " --- DiskMgr.Initialize" << endl;
00172
00173 m_BlockSize = Config->BlockSize;
00174 m_MaxDisks = Config->MaxDisks;
00175 m_MetaRoot = Config->MetaRoot;
00176 m_Router = Config->router;
00177 m_SystemManager = Config->manager;
00178 m_MaxReps = Config->MaxReps;
00179
00180
00181
00182
00183 m_ConfigsDirectory = Config->ConfigsDirectory;
00184
00185
00186
00187 m_MaxAttempts = Config->MaxAttempts;
00188 m_TimeBetweenAttempts = Config->TimeBetweenAttempts;
00189
00190
00191 m_IsFormatting = Config->IsFormatting;
00192
00193 m_disks = new Disk*[m_MaxDisks + 1];
00194 if( m_disks == NULL )
00195 {
00196 if( m_log.is_open() )
00197 m_log << "new failed " << endl;
00198
00199 abort();
00200 }
00201
00202 m_diska = new Disk*[m_MaxDisks + 1];
00203 if( m_diska == NULL )
00204 {
00205 if( m_log.is_open() )
00206 m_log << "new failed " << endl;
00207
00208 abort();
00209 }
00210 for( i = 0; i <= m_MaxDisks; i++ )
00211 {
00212 m_disks[i] = 0;
00213 m_diska[i] = 0;
00214 }
00215
00216 rc = ReadConfiguration();
00217
00218
00219 if( rc )
00220 {
00221 m_log << "Error reading configuration." << endl;
00222 return rc;
00223 }
00224
00225
00226
00227
00228
00229
00230
00231 int numactivedisks, prevdisks;
00232 Disk *dp;
00233 numactivedisks = 0;
00234 do {
00235 prevdisks = numactivedisks;
00236 sleep(1);
00237 numactivedisks = 0;
00238 for( dp = m_disklist; dp != 0; dp = dp->d_link )
00239 {
00240 if( dp->d_status & Disk::STAT_ALLOC )
00241 {
00242 numactivedisks++;
00243 }
00244 }
00245 if( m_log.is_open() ) m_log << "diskmgr.init delaying - disks so far: "
00246 << numactivedisks << " previous " << prevdisks
00247 << endl;
00248
00249 } while( numactivedisks != prevdisks );
00250 RioErr << " Number of active disks: " << numactivedisks << "."
00251 << endl;
00252 m_NumberOfActiveDisks = numactivedisks;
00253
00254
00255 return rc;
00256 }
00257
00258
00259
00260
00261
00262
00263
00264
00265 int DiskMgr::GetMaxNumberOfDisks()
00266 {
00267 return m_numDisks;
00268 }
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291 enum keys {k_disk=1, k_node };
00292
00293 static struct token_kw mykeys[] = {
00294 {"disk", k_disk},
00295 {"node", k_node},
00296 {NULL, 0},
00297 };
00298
00299 int DiskMgr::ReadConfiguration()
00300 {
00301 int rc = 0;
00302 char *nodename = 0;
00303
00304
00305 char ConfigFileName[ MaxPathSize ];
00306
00307
00308 int rc2;
00309 unsigned int Attempts;
00310
00311
00312
00313
00314
00315 unsigned int NumStorages;
00316
00317 cout << endl << " Reading DiskMgr configuration "
00318 << CONFIGNAME << " ... ";
00319
00320 token tok;
00321
00322
00323
00324
00325
00326 strcpy( ConfigFileName, m_ConfigsDirectory );
00327 strcat( ConfigFileName, CONFIGNAME );
00328
00329 if( tok.openfile( ConfigFileName ) )
00330 {
00331 m_log << "erro no tok.openfile " << CONFIGNAME << endl;
00332 return 1;
00333 }
00334
00335 NumStorages = 0;
00336
00337 int word;
00338 while( ( word = tok.parseline( mykeys ) ) >= 0 )
00339 {
00340 switch( word )
00341 {
00342 case k_disk:
00343 m_log << "erro k_disk." << endl;
00344 tok.emsg("not yet implemented");
00345 break;
00346
00347 case k_node:
00348 if( tok.getstrnew( &nodename ) )
00349 {
00350 tok.emsg("missing storage node hostname");
00351 break;
00352 }
00353
00354
00355 Attempts = 0;
00356 do
00357 {
00358 Attempts++;
00359 RioErr << "Trying to connect for " << Attempts + 1 << " time. Start" << endl;
00360 rc2 = BuildNode( nodename, true );
00361 RioErr << "Trying to connect for " << Attempts + 1 << " time. End" << endl;
00362 if( ( Attempts < m_MaxAttempts ) && ( rc2 ) )
00363 {
00364 RioErr << "Error connecting to storage server \""
00365 << nodename << "\" (attempt " << Attempts + 1
00366 << " of " << m_MaxAttempts << ")." << endl;
00367 RioErr << "Waiting for " << m_TimeBetweenAttempts << " seconds." << endl;
00368 sleep( m_TimeBetweenAttempts );
00369 }
00370 } while ( ( Attempts < m_MaxAttempts ) && ( rc2 ) );
00371
00372
00373 if( rc2 )
00374 {
00375 if( m_IsFormatting )
00376 RioErr << "Failed to connect to storage server \""
00377 << nodename << "\" when formatting file system. "
00378 << "Aborting. " << endl;
00379 else
00380 {
00381 RioErr << "Failed to connect to storage server \""
00382 << nodename << "\". Creating only the node for "
00383 << "this server" << endl;
00384 rc2 = BuildNode( nodename, false );
00385 }
00386 }
00387 rc |= rc2;
00388 m_log << "erro k_node. rc = " << rc << endl;
00389 NumStorages++;
00390 if( NumStorages > MAXNUMSTORAGES )
00391 {
00392 m_log << "Too many storage servers (max = "
00393 << MAXNUMSTORAGES << endl;
00394 return 1;
00395 }
00396
00397 delete[] nodename;
00398 break;
00399
00400 default:
00401 m_log << "erro invalid word." << endl;
00402 tok.emsg("{default} invalid word");
00403 continue;
00404 }
00405 tok.ckend();
00406 }
00407 tok.closefile();
00408
00409 if( rc == 0 )
00410 {
00411 cout << " OK. " << endl;
00412 }
00413 else
00414 cout << " Error! " << endl;
00415 return rc;
00416 }
00417
00418
00419
00420
00421 int DiskMgr::BuildNode( char *hostname, bool Connect )
00422 {
00423 SNode *np;
00424 int rc;
00425 EventStorageRequest *event;
00426 pthread_attr_t attribtx;
00427 pthread_attr_t attribrx;
00428
00429 #ifdef RIO_DEBUG1
00430 RioErr << "### [DiskMgr - BuildNode] Start" << endl;
00431 #endif
00432
00433
00434
00435 for( np = m_nodelist; np != 0; np = np->sn_link )
00436 {
00437 if( strcmp( np->sn_hostname, hostname ) == 0 )
00438 {
00439 if( m_log.is_open() )
00440 m_log <<"DiskMgr.BuildNode duplicate node for "
00441 << hostname << endl;
00442
00443
00444 m_log << "### [DiskMgr - BuildNode] Finish1" << endl;
00445
00446
00447 return -1;
00448 }
00449 }
00450
00451 np = new SNode( this, hostname );
00452
00453 if( ( np == 0 ) || ( np->sn_hostname == 0 ) )
00454 {
00455 RioErr << " error memory (could not create object)." << endl;
00456 delete np;
00457
00458 #ifdef RIO_DEBUG1
00459 RioErr << "### [DiskMgr - BuildNode] Finish2" << endl;
00460 #endif
00461
00462 return ERROR_MEMORY;
00463 }
00464
00465 np->sn_IPaddress.Host = vsiGetIPaddress( hostname );
00466 np->sn_IPaddress.Port = htons( 5101 );
00467
00468
00469
00470 if( Connect )
00471 {
00472
00473 rc = np->sn_socket.Connect( &np->sn_IPaddress );
00474
00475 if( rc )
00476 {
00477 RioErr << " connect failed for " << np->sn_hostname << ": "
00478 << GetErrorDescription(rc) << endl;
00479
00480 delete np;
00481
00482 RioErr << "### [DiskMgr - BuildNode] Depois do delete" << endl;
00483
00484 #ifdef RIO_DEBUG1
00485 RioErr << "### [DiskMgr - BuildNode] Finish3" << endl;
00486 #endif
00487
00488 return -1;
00489 }
00490
00491
00492 np->sn_isenabled = true;
00493
00494 np->sn_sendevent = false;
00495
00496 }
00497 else
00498 {
00499
00500 np->sn_isenabled = false;
00501
00502 np->sn_sendevent = true;
00503
00504
00505
00506
00507 EventStorageDown* event;
00508 event = ( EventStorageDown * ) EventManager.New( EventTypeStorageDown );
00509
00510 event->StorageId = ( m_numDisks - 1 ) / SNode::sn_maxdisks;
00511
00512
00513 event->EmptyStorageQueue = false;
00514
00515 RioErr << "Failed to connect to the storage " << np->sn_hostname
00516 << endl;
00517
00518 #ifdef RIO_DEBUG2
00519 RioErr << "DiskMgr::BuildNode enviando a mensagem "
00520 << "EventStorageDown ao Router do servidor de "
00521 << "armazenamento com a ID " << event->StorageId << endl;
00522 #endif
00523
00524 m_Router->Put( ( Event * ) event );
00525 }
00526
00527
00528
00529
00530 np->sn_diskidorg = m_numDisks;
00531 m_numDisks += SNode::sn_maxdisks;
00532
00533 m_NumberOfStorageNodes++;
00534
00535 pthread_attr_init( &attribtx );
00536 pthread_attr_init( &attribrx );
00537 pthread_attr_setstacksize( &attribtx, 2*PTHREAD_STACK_MIN );
00538 pthread_attr_setstacksize( &attribrx, 2*PTHREAD_STACK_MIN );
00539
00540 if( pthread_create( &np->sn_txthread, &attribtx,
00541 &SNode::txthreadep, (void *)np)
00542 )
00543 {
00544 if( m_log.is_open() )
00545 m_log << "tx pthread_create failed for " << hostname
00546 << " error: " << strerror(errno) << endl;
00547
00548 RioErr << " tx pthread_create failed for " << hostname << endl;
00549
00550 #ifdef RIO_DEBUG1
00551 RioErr << "### [DiskMgr - BuildNode] Finish4" << endl;
00552 #endif
00553
00554 return -1;
00555 }
00556
00557 if( pthread_create( &np->sn_rxthread, &attribrx,
00558 &SNode::rxthreadep, (void *)np)
00559 )
00560 {
00561 if( m_log.is_open() )
00562 m_log << "rx pthread_create failed for " << hostname
00563 << " error: " << strerror(errno) << endl;
00564
00565 RioErr << "rx pthread_create failed for " << hostname << endl;
00566
00567 #ifdef RIO_DEBUG1
00568 RioErr << "### [DiskMgr - BuildNode] Finish5" << endl;
00569 #endif
00570
00571 return -1;
00572 }
00573
00574 if( Connect )
00575 {
00576
00577
00578
00579
00580 event = (EventStorageRequest*) EventManager.New(
00581 EventTypeStorageRequest );
00582 event->StorageRequest.Header.Type = MSG_RSS_INITIALIZATION_REQ;
00583 event->StorageRequest.Header.Size = SizeMsgRSSInitializationReq;
00584 event->StorageRequest.Header.Token = RSS_TOKEN_ROUTER;
00585 MsgRSSInitializationReq* msg = & ( event->StorageRequest.
00586 Initialization );
00587 msg->BlockSize = m_BlockSize;
00588 np->sn_sendqueue.Put( ( Event* ) event );
00589
00590 }
00591
00592 struct in_addr address;
00593 address.s_addr = np->sn_IPaddress.Host;
00594
00595 AddNodeEvent *it_event = new AddNodeEvent( np->sn_hostname, inet_ntoa( address ) );
00596 m_SystemManager->PostITEvent( (MonitorEvent *)it_event );
00597
00598 #ifdef RIO_DEBUG1
00599 RioErr << "### [DiskMgr - BuildNode] Finish6" << endl;
00600 #endif
00601
00602 return 0;
00603 }
00604
00605
00606 int DiskMgr::Format()
00607 {
00608 Disk *dp;
00609 int rc = 0;
00610 int rc2;
00611 unsigned int junk;
00612
00613 for( dp = m_disklist; dp != 0; dp = dp->d_link )
00614 {
00615 rc2 = dp->d_BitMap.Format();
00616
00617 if( !rc2 )
00618 {
00619 rc2 = dp->d_BitMap.AllocSeq( 0, &junk );
00620 }
00621 if( !rc2 )
00622 {
00623 rc2 = dp->d_BitMap.AllocSeq( 1, &junk );
00624 }
00625 if( rc2 && !rc )
00626 rc = rc2;
00627 }
00628
00629 if( !rc )
00630 {
00631 Confirm();
00632 BuildAlloc();
00633 }
00634 else
00635 {
00636 RioErr << "\n\n\n***************ERRO NO FORMAT!!!!*********" << endl;
00637 }
00638
00639 return rc;
00640 }
00641
00642
00643 int DiskMgr::Start()
00644 {
00645 Disk *dp;
00646 int rc = 0;
00647 int rc2;
00648
00649 for( dp = m_disklist; dp != 0; dp = dp->d_link )
00650 {
00651 rc2 = dp->d_BitMap.Load();
00652 if( rc2 && !rc )
00653 rc = rc2;
00654 }
00655
00656 if( rc )
00657 {
00658 return rc;
00659 }
00660
00661 BuildAlloc();
00662 return 0;
00663 }
00664
00665
00666
00667
00668 int DiskMgr::Reset()
00669 {
00670 Disk *dp;
00671 int rc = 0;
00672 int rc2;
00673
00674 for( dp = m_disklist; dp != 0; dp = dp->d_link )
00675 {
00676 rc2 = dp->d_BitMap.Reset();
00677 if( rc2 && !rc )
00678 rc = rc2;
00679 }
00680 return rc;
00681 }
00682
00683
00684
00685 int DiskMgr::Confirm()
00686 {
00687 Disk *dp;
00688 int rc = 0;
00689 int rc2;
00690
00691 for( dp = m_disklist; dp != 0; dp = dp->d_link )
00692 {
00693 rc2 = dp->d_BitMap.Confirm();
00694 if( rc2 && !rc )
00695 rc = rc2;
00696 }
00697 return rc;
00698 }
00699
00700
00701
00702 void DiskMgr::BuildAlloc()
00703 {
00704 Disk *dp;
00705 int i = 0;
00706
00707
00708
00709 m_storagesnotfull = 0;
00710
00711 for( dp = m_disklist; dp != 0; dp = dp->d_link )
00712 {
00713 if( dp->d_status & Disk::STAT_ALLOC && dp->d_BitMap.nFree() > 0 )
00714 {
00715 m_diska[i] = dp;
00716 i++;
00717 m_storagesnotfull = m_storagesnotfull |
00718 (1ull << ( ( dp->d_diskid - 1 ) /
00719 SNode::sn_maxdisks ) );
00720 }
00721 }
00722 m_diskanum = i;
00723
00724 #ifdef __GENERATE_DISKLOG
00725
00726 if( m_log.is_open() )
00727 m_log << "DiskMgr::BuildAlloc m_storagesnotfull = "
00728 for( i = MAXNUMSTORAGES - 1; i >= 0; i-- )
00729 {
00730 unsigned int q = ( m_storagesnotfull >> i ) & 1ull;
00731 if( m_log.is_open() )
00732 m_log << q;
00733 }
00734 if( m_log.is_open() )
00735 m_log << endl;
00736 #endif
00737
00738 }
00739
00740
00741
00742 bool DiskMgr::CheckDiskAlloc( int num, RioDiskBlock *rep, int choice )
00743 {
00744 Disk *dp, *rd;
00745 int j;
00746 bool isEnabled;
00747
00748 if( ( choice > m_MaxDisks ) || ( num > m_MaxReps ) )
00749 {
00750 #ifdef __GENERATE_DISKLOG
00751 if( m_log.is_open() )
00752 m_log << "DiskMgr::CheckDiskAlloc invalid input, choice "
00753 << choice << " num " << num << endl;
00754 #endif
00755 return false;
00756 }
00757
00758 dp = m_disks[choice];
00759
00760 if( dp == 0 )
00761 {
00762 #ifdef __GENERATE_DISKLOG
00763 if( m_log.is_open() )
00764 m_log << "DiskMgr::CheckDiskAlloc dp = 0" << endl;
00765 #endif
00766 return false;
00767 }
00768 if( ( dp->d_status & Disk::STAT_ALLOC ) == 0 )
00769 {
00770 #ifdef __GENERATE_DISKLOG
00771 if( m_log.is_open() )
00772 m_log << "DiskMgr.CheckDiskAlloc ( dp->d_status & "
00773 << "Disk::STAT_ALLOC ) == 0 (" << dp->d_status << ")" << endl;
00774 #endif
00775 return false;
00776 }
00777 if( dp->d_BitMap.nFree() <= 0 )
00778 {
00779 #ifdef __GENERATE_DISKLOG
00780 if( m_log.is_open() )
00781 m_log << "DiskMgr.CheckDiskAlloc dp->d_Bitmap.nFree() <= 0"
00782 << "(" << dp->d_BitMap.nFree() << ")" << endl;
00783 #endif
00784 return false;
00785 }
00786
00787
00788
00789
00790 pthread_mutex_lock( &dp->d_node->sn_mutex );
00791
00792 isEnabled = dp->d_node->sn_isenabled;
00793
00794 pthread_mutex_unlock( &dp->d_node->sn_mutex );
00795 if( !isEnabled )
00796 {
00797 #ifdef __GENERATE_DISKLOG
00798 if( m_log.is_open() )
00799 m_log << "DiskMgr.CheckDiskAlloc dp->d_node->sn_isenabled = false"
00800 << endl;
00801 #endif
00802 RioErr << "DiskMgr.CheckDiskAlloc dp->d_node->sn_isenabled = false"
00803 << endl;
00804 return false;
00805 }
00806
00807
00808
00809
00810
00811 for( j = 0; j < num; j++ )
00812 {
00813 rd = m_disks[rep[j].disk];
00814 if( rd == 0 )
00815 continue;
00816 if( rd->d_node == dp->d_node )
00817 {
00818 #ifdef __GENERATE_DISKLOG
00819 if( m_log.is_open() )
00820 m_log << "DiskMgr.CheckDiskAlloc rd->d_node == dp->d_node"
00821 << "(0x" << hex << ( unsigned int ) rd->d_node << ", 0x"
00822 << ( unsigned int ) dp->d_node << dec << ")" << endl;
00823 #endif
00824 return false;
00825 }
00826 }
00827 return true;
00828 }
00829
00830
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841 int DiskMgr::AllocMult( int numReplicas, RioDiskBlock *rep,
00842 unsigned long long int ExcludeStorages )
00843 {
00844 int i, x, rc;
00845 Disk *dp;
00846 int cntretry = 0;
00847 int maxretry = ( SNode::sn_maxdisks + 1 ) * numReplicas;
00848 unsigned long long int storages_disp;
00849
00850
00851 int numStorages = ( m_numDisks - 1 ) / SNode::sn_maxdisks;
00852
00853
00854 if( numReplicas > numStorages ) numReplicas = numStorages;
00855
00856
00857 for( i = 0; i < numReplicas; i++ )
00858 {
00859 memset(&rep[i], 0, sizeof(RioDiskBlock));
00860 }
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883 storages_disp = ~ ( ExcludeStorages | ( ~m_storagesnotfull ) );
00884
00885 for( i = 0; i < numReplicas; i++ )
00886 {
00887 m_cntAllocMult++;
00888 retry:
00889
00890
00891 #ifdef __GENERATE_DISKLOG
00892 if( m_log.is_open() )
00893 {
00894 m_log << "Tentando obter um disco de um storage " << endl;
00895 }
00896 #endif
00897
00898 #ifdef RIO_DEBUG2
00899 RioErr << "DiskMgr::AllocMult storages que nao podem ser usados "
00900 << "(ExcludeStorages = ";
00901 for( int p = MAXNUMSTORAGES - 1; p >= 0 ; p--)
00902 {
00903 unsigned int q = ( ExcludeStorages >> p ) & 1ull;
00904 RioErr << q;
00905 }
00906 RioErr << ")" << endl;
00907 RioErr << "DiskMgr::AllocMult storages cujos discos nao estao cheios "
00908 << "(m_storagesnotfull = ";
00909 for( int p = MAXNUMSTORAGES - 1; p >= 0 ; p--)
00910 {
00911 unsigned int q = ( m_storagesnotfull >> p ) & 1ull;
00912 RioErr << q;
00913 }
00914 RioErr << ")" << endl;
00915 RioErr << "DiskMgr::AllocMult storages que pode ser usados "
00916 << "(storages_disp = ";
00917 for( int p = MAXNUMSTORAGES - 1; p >= 0 ; p--)
00918 {
00919 unsigned int q = ( storages_disp >> p ) & 1ull;
00920 RioErr << q;
00921 }
00922 RioErr << ")" << endl;
00923 #endif
00924
00925
00926
00927
00928
00929
00930
00931 if( storages_disp == 0 )
00932 return ERROR_DISKMGR + ERROR_DISKFULL;
00933
00934 do
00935 {
00936
00937 x = RandomNumber( m_diskanum );
00938 dp = m_diska[x];
00939
00940 #ifdef __GENERATE_DISKLOG
00941 if( m_log.is_open() )
00942 {
00943 m_log << "DiskMgr::AllocMult replicacao " << i+1 << ", x = "
00944 << x << ", dp->d_diskid = " << dp->d_diskid
00945 << ", numero de blocos (do disco) = "
00946 << dp->d_BitMap.nFree() << ", storages_disp = ";
00947
00948 for(int p = MAXNUMSTORAGES - 1; p >= 0 ; p--)
00949 {
00950 unsigned int q = ( storages_disp >> p ) & 1ull;
00951 m_log << q;
00952 }
00953
00954 m_log << endl;
00955 }
00956 #endif
00957
00958 }
00959
00960 while( !( storages_disp &
00961 (1ull << ( ( dp->d_diskid - 1 ) / SNode::sn_maxdisks ) ) ) );
00962 if( CheckDiskAlloc( i, rep, dp->d_diskid ) )
00963 {
00964
00965 rc = dp->d_BitMap.AllocSeq(RandomNumber(dp->d_ntot), &rep[i].block);
00966 if( rc != 0 )
00967 return rc;
00968 rep[ i ].disk = dp->d_diskid;
00969
00970 #ifdef __GENERATE_DISKLOG
00971 if( m_log.is_open() )
00972 m_log << " AllocMult:: Chose Disk " << rep[ i ].disk
00973 << " block " << rep[ i ].block << endl;
00974 #endif
00975
00976 storages_disp -= ( 1ull << ( ( dp->d_diskid - 1 ) /
00977 SNode::sn_maxdisks ) );
00978 continue;
00979 }
00980 else
00981 {
00982 m_cntAllocRetry++;
00983 cntretry++;
00984
00985 #ifdef __GENERATE_DISKLOG
00986 if( m_log.is_open() )
00987 {
00988 m_log << "DiskMgr::AllocMult disco " << dp->d_diskid
00989 << " incorreto. Possivel erro de implementacao."
00990 << endl;
00991 m_log << "DiskMgr::AllocMult Failed for disk " << dp->d_diskid
00992 << " try " << cntretry << " from " << maxretry << endl;
00993 }
00994 #endif
00995
00996 if( cntretry < maxretry )
00997 {
00998
00999 BuildAlloc();
01000
01001
01002
01003
01004 storages_disp = storages_disp & m_storagesnotfull;
01005 goto retry;
01006 }
01007 m_cntAllocFail++;
01008
01009 return ERROR_DISKMGR + ERROR_UNEXPECTED;
01010 }
01011 }
01012 return 0;
01013 }
01014
01015
01016 int DiskMgr::Alloc( RioDiskBlock *rep, unsigned long long int ExcludeStorages )
01017 {
01018 return AllocMult( 1, rep, ExcludeStorages );
01019 }
01020
01021
01022 int DiskMgr::Free( RioDiskBlock *rep )
01023 {
01024 Disk *dp;
01025
01026
01027 if( ( rep->disk == 0 ) && ( rep->block == 0 ) )
01028 {
01029 return 0;
01030 }
01031 if( ( rep->disk < 0 ) || ( rep->disk > m_MaxDisks ) )
01032 {
01033
01034 if( m_log.is_open() ) m_log << "DiskMgr.Free - invalid disk number "
01035 << rep->disk << endl;
01036 return ERROR_DISKMGR + ERROR_INVALID_DISK;
01037 }
01038 dp = m_disks[rep->disk];
01039 if( dp == 0 )
01040 {
01041
01042
01043
01044
01045
01046
01047
01048
01049
01050
01051
01052
01053
01054
01055 SNode *sp, *spsb;
01056 bool SaveBlock;
01057 SaveBlock = false;
01058 spsb = NULL;
01059 for( sp = m_nodelist; sp != 0; sp = sp->sn_link )
01060 {
01061
01062
01063
01064 if( ( rep->disk >= sp->sn_diskidorg ) &&
01065 ( rep->disk < ( sp->sn_diskidorg + SNode::sn_maxdisks ) ) )
01066 {
01067 if( ( !sp->sn_restart ) && ( !sp->sn_isenabled ) )
01068 {
01069 SaveBlock = true;
01070 spsb = sp;
01071 }
01072 break;
01073 }
01074
01075 }
01076
01077
01078
01079 if( SaveBlock )
01080 {
01081 char FreeBlocksFileName[ 2 * MaxPathSize + 1 ];
01082 int FreeBlocksFile;
01083 int BytesWroten;
01084 int error;
01085
01086
01087 spsb->sn_readfreeblocksfile = true;
01088
01089
01090
01091
01092 strcpy( FreeBlocksFileName, m_MetaRoot );
01093
01094 if( m_MetaRoot[ strlen( m_MetaRoot ) - 1 ] != '/' )
01095 strcat( FreeBlocksFileName, "/" );
01096
01097 strcat( FreeBlocksFileName, "FreeBlocks-" );
01098
01099 strcat( FreeBlocksFileName, spsb->sn_hostname );
01100 #ifdef RIO_DEBUG2
01101 RioErr << "DiskMgr::Free detectado que o servidor "
01102 << spsb->sn_hostname << " nao foi ainda inicializado. "
01103 << "Salvando o bloco " << rep->block << " associado ao "
01104 << "disco " << rep->disk << " no arquivo "
01105 << FreeBlocksFileName << " com os blocos a serem liberados "
01106 << " deste servidor" << endl;
01107 #endif
01108
01109 FreeBlocksFile = open( FreeBlocksFileName, O_WRONLY | O_APPEND |
01110 O_CREAT, 0700 );
01111 if( FreeBlocksFile < 0 )
01112 {
01113 #ifdef RIO_DEBUG2
01114 RioErr << "DiskMgr::Free erro " << errno << " ("
01115 << strerror( errno ) << ") ao abrir o arquivo "
01116 << FreeBlocksFileName << " com os blocos a serem salvos "
01117 << "do servidor de armazenamento " << spsb->sn_hostname
01118 << endl;
01119 #endif
01120 return ERROR_DISKMGR + ERROR_OPEN_FREEBLOCKS_FILE;
01121 }
01122
01123 BytesWroten = write( FreeBlocksFile, rep, sizeof( RioDiskBlock ) );
01124 error = errno;
01125 close( FreeBlocksFile );
01126 if( BytesWroten < 0 )
01127 {
01128 #ifdef RIO_DEBUG2
01129 RioErr << "DiskMgr::Free erro " << error << " ("
01130 << strerror( error ) << " ao gravar no arquivo "
01131 << FreeBlocksFileName << " com os blocos a serem salvos "
01132 << "do servidor de armazenamento " << spsb->sn_hostname
01133 << endl;
01134 #endif
01135 return ERROR_DISKMGR + ERROR_OPEN_FREEBLOCKS_FILE;
01136 }
01137 else if( BytesWroten != sizeof( RioDiskBlock ) )
01138
01139
01140 {
01141 #ifdef RIO_DEBUG2
01142 RioErr << "DiskMgr::Free foram gravados " << BytesWroten
01143 << " bytes ao inves de " << sizeof( RioDiskBlock )
01144 << " bytes no arquivo " << FreeBlocksFileName
01145 << " com os blocos a serem salvos do servidor de "
01146 << "armazenamento " << spsb->sn_hostname << endl;
01147 #endif
01148 return ERROR_DISKMGR + ERROR_OPEN_FREEBLOCKS_FILE;
01149 }
01150 else
01151
01152 return S_OK;
01153
01154 }
01155 else
01156 {
01157 #ifdef RIO_DEBUG2
01158 RioErr << "DiskMgr::Free Foi detectado que o bloco " << rep->block
01159 << " associado ao disco " << rep->disk << " e invalido"
01160 << endl;
01161 #endif
01162 return ERROR_DISKMGR + ERROR_INVALID_DISK;
01163 }
01164 }
01165
01166 dp->d_BitMap.Free( rep->block );
01167
01168
01169
01170 if( dp->d_BitMap.nFree() == 1 )
01171 {
01172 BuildAlloc();
01173 }
01174 memset(rep, 0, sizeof(RioDiskBlock));
01175 return 0;
01176 }
01177
01178
01179 void DiskMgr::SendStorageNode( EventStorageRequest* event, u16 DiskId )
01180 {
01181 if( DiskId > m_MaxDisks )
01182 {
01183 RioErr << "DiskMgr ERROR: StorageNode request to invalid disk: "
01184 << DiskId << endl;
01185 EventManager.Free((Event*) event);
01186 return;
01187 }
01188
01189 if( m_disks[DiskId] == 0 )
01190 {
01191 RioErr << "DiskMgr ERROR: StorageNode request to inactive disk: "
01192 << DiskId << endl;
01193 EventManager.Free((Event*)event);
01194 return;
01195 }
01196 m_disks[DiskId]->SendStorageNode( event );
01197 return;
01198 }
01199
01200
01201 int DiskMgr::GetDiskServiceTime( EventStorageRequest* event, u16 DiskId )
01202 {
01203 if( DiskId > m_MaxDisks )
01204 {
01205 #ifdef RIO_DEBUG2
01206 RioErr << "DiskMgr ERROR: GetDiskServiceTime request to invalid disk: "
01207 << DiskId << endl;
01208 #endif
01209 EventManager.Free((Event*) event);
01210 return -1;
01211 }
01212
01213 if( m_disks[DiskId] == 0 )
01214 {
01215 #ifdef RIO_DEBUG2
01216 RioErr << "DiskMgr ERROR: GetDiskServiceTime request to inactive disk: "
01217 << DiskId << endl;
01218 #endif
01219 EventManager.Free((Event*)event);
01220 return -1;
01221 }
01222 m_disks[DiskId]->GetServiceTime( event );
01223 return 0;
01224 }
01225
01226
01227
01228
01229
01230 int DiskMgr::GetNumberOfActiveDisks( unsigned int *NumberOfDisks )
01231 {
01232 *NumberOfDisks = m_NumberOfActiveDisks;
01233 #ifdef RIO_DEBUG2
01234 if( m_log.is_open() ) m_log << "DiskMgr: GetNumberOfActiveDisks = "
01235 << m_NumberOfActiveDisks << endl;
01236 #endif
01237 return 0;
01238 }
01239
01240
01241 int DiskMgr::GetNumberOfStorageNodes( unsigned int *NumberOfStorageNodes )
01242 {
01243 *NumberOfStorageNodes = m_NumberOfStorageNodes;
01244 #ifdef RIO_DEBUG2
01245 if( m_log.is_open() )
01246 m_log << "DiskMgr: GetNumberOfStorageNodes = " << m_NumberOfStorageNodes
01247 << endl;
01248 #endif
01249 return 0;
01250 }
01251
01252
01253
01254 unsigned int DiskMgr::GetDiskNumberOfFreeBlocks( unsigned int DiskId )
01255 {
01256 #ifdef RIO_DEBUG2
01257 if( m_log.is_open() )
01258 m_log << "DiskMgr: GetDiskNumberOfFreeBlocks disk " << DiskId <<endl;
01259 #endif
01260
01261 if( m_disks[DiskId] != NULL )
01262 return m_disks[DiskId]->GetNumberOfFreeBlocks();
01263 else
01264 return 0;
01265 }
01266
01267
01268
01269 int DiskMgr::GetDiskName( unsigned int DiskId, char *DiskName )
01270 {
01271 #ifdef RIO_DEBUG2
01272 if( m_log.is_open() ) m_log << "DiskMgr: GetDiskName " << DiskId <<endl;
01273 #endif
01274 if( m_disks[DiskId] != NULL )
01275 return m_disks[DiskId]->GetName( DiskName );
01276 else
01277 {
01278
01279
01280 DiskName[ 0 ] = 0;
01281 return 0;
01282 }
01283 }
01284
01285
01286 int DiskMgr::GetStorageNodeInfo( unsigned int StorageNodeIndex,
01287 RioStorageNodeInfo* StorageNodeInfo )
01288 {
01289 SNode *np;
01290 unsigned int i = 0;
01291
01292 #ifdef RIO_DEBUG2
01293 if( m_log.is_open() ) m_log << "DiskMgr: GetStorageNodeInfo StorageIndex "
01294 << StorageNodeIndex << endl;
01295 #endif
01296
01297 for(i = 0, np = m_nodelist;(( np != 0 ) && ( i != StorageNodeIndex )); i++)
01298 {
01299 np = np->sn_link;
01300 }
01301 strcpy( StorageNodeInfo->Hostname, np->sn_hostname );
01302 StorageNodeInfo->NumberOfDisks = np->sn_disks;
01303
01304 int disk_index;
01305
01306 for( i = 0; i < ( unsigned int ) np->sn_disks; i++ ) {
01307
01308
01309
01310
01311 disk_index = StorageNodeIndex * SNode::sn_maxdisks + 1 + i;
01312
01313 np->sn_mgr->GetDiskName( disk_index,
01314 &StorageNodeInfo->Disks[i].DiskName[0] );
01315 StorageNodeInfo->Disks[i].Size = np->sn_disksize[i];
01316 StorageNodeInfo->Disks[i].NumberOfFreeBlocks =
01317 np->sn_mgr->GetDiskNumberOfFreeBlocks( disk_index );
01318 }
01319
01320 #ifdef RIO_DEBUG2
01321 if( m_log.is_open() ) m_log << "Host "<< np->sn_hostname << endl;
01322 #endif
01323 return 0;
01324 }
01325
01326
01327
01328 int DiskMgr::GetDiskStorageNodeInfo( unsigned int DiskId,
01329 RioStorageNodeInfo* StorageNodeInfo )
01330 {
01331 unsigned int SNId;
01332
01333 #ifdef RIO_DEBUG2
01334 if( m_log.is_open() ) m_log << "DiskMgr: GetDiskStorageNodeInfo DiskId "
01335 << DiskId << endl;
01336 #endif
01337
01338
01339
01340
01341
01342
01343 SNId = ( DiskId - 1 ) / SNode::sn_maxdisks;
01344
01345 GetStorageNodeInfo( SNId, StorageNodeInfo );
01346 return 0;
01347 }
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358 int DiskMgr::GetStorageServersAddress( SOCKADDR_IN *ServerAddress )
01359 {
01360 SNode *np;
01361 unsigned int i = 0;
01362
01363
01364
01365 np = m_nodelist;
01366
01367
01368 while ( np != 0 ) {
01369
01370
01371
01372
01373 ServerAddress[ i ].sin_family = AF_INET;
01374 ServerAddress[ i ].sin_port = htons( STORAGESERVERUDPPORT );
01375 ServerAddress[ i ].sin_addr.s_addr = np->sn_IPaddress.Host;
01376 i++;
01377 np = np->sn_link;
01378 }
01379
01380 return 0;
01381 }
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391 int DiskMgr::SendStorageNodeByIP( EventStorageRequest* event, u32 StorageIP )
01392 {
01393 SNode *np;
01394
01395
01396 np = m_nodelist;
01397 while( ( np != NULL ) && ( np->sn_IPaddress.Host != StorageIP ) )
01398 {
01399 np = np->sn_link;
01400 }
01401
01402
01403 if( np != NULL )
01404 {
01405 np->sn_sendqueue.Put( ( Event* ) event );
01406 return ERROR_DISKMGR + S_OK;
01407 }
01408 else
01409 {
01410
01411 EventManager.Free( ( Event* )event );
01412 #ifdef RIO_DEBUG2
01413 struct in_addr ip;
01414 ip.s_addr = StorageIP;
01415 RioErr << "DiskMgr::SendStorageNodeByIP erro ao tentar enviar um "
01416 << "evento ao servidor de armazenamento com o IP "
01417 << inet_ntoa( ip ) << endl;
01418 #endif
01419 return ERROR_DISKMGR + ERROR_UNEXPECTED;
01420 }
01421 }
01422
01423
01424
01425
01426
01427
01428 int DiskMgr::GetNumberOfReplications( void )
01429 {
01430 return m_MaxReps;
01431 }
01432
01433
01434
01435
01436
01437
01438 int DiskMgr::LoadDisksBitMaps( unsigned int StorageId )
01439 {
01440 Disk *dp;
01441 int rc = 0;
01442 int rc2;
01443 unsigned int DiskStorageId;
01444
01445
01446 pthread_mutex_lock( &m_mutex );
01447
01448 for( dp = m_disklist; dp != 0; dp = dp->d_link )
01449 {
01450 DiskStorageId = ( dp->d_node->sn_diskidorg - 1 ) / SNode::sn_maxdisks;
01451 if( StorageId == DiskStorageId )
01452 {
01453
01454 #ifdef RIO_DEBUG2
01455 RioErr << "DiskMgr::LoadDisksBitMaps tentando carregar o BitMap "
01456 << " do disco com a ID " << dp->d_diskid << " associado ao "
01457 << "servidor de armazenamento com a ID " << DiskStorageId
01458 << endl;
01459 #endif
01460
01461 rc2 = dp->d_BitMap.Load();
01462 if( rc2 && !rc )
01463 rc = rc2;
01464
01465 #ifdef RIO_DEBUG2
01466 RioErr << "DiskMgr::LoadDisksBitMaps a carga do BitMap executou "
01467 << "com o erro rc2 = " << rc2 << endl;
01468 #endif
01469 }
01470 }
01471
01472 if( rc )
01473 {
01474
01475 pthread_mutex_unlock( &m_mutex );
01476
01477 return rc;
01478 }
01479
01480 BuildAlloc();
01481
01482
01483 pthread_mutex_unlock( &m_mutex );
01484
01485 return 0;
01486 }
01487
01488
01489
01490
01491
01492 int DiskMgr::FreeSalvedBlocks( const char *StorageName )
01493 {
01494 char FreeBlocksFileName[ 2 * MaxPathSize + 1 ];
01495 int FreeBlocksFile;
01496 int BytesRead;
01497 int error;
01498 RioDiskBlock Block;
01499
01500
01501
01502
01503 strcpy( FreeBlocksFileName, m_MetaRoot );
01504
01505 if( m_MetaRoot[ strlen( m_MetaRoot ) - 1 ] != '/' )
01506 strcat( FreeBlocksFileName, "/" );
01507
01508 strcat( FreeBlocksFileName, "FreeBlocks-" );
01509
01510 strcat( FreeBlocksFileName, StorageName );
01511 #ifdef RIO_DEBUG2
01512 RioErr << "DiskMgr::FreeSalvedBlocks liberando todos os blocos salvos no "
01513 << "arquivo " << FreeBlocksFileName << " associados aos discos do "
01514 << "servidor " << StorageName << endl;
01515 #endif
01516
01517 FreeBlocksFile = open( FreeBlocksFileName, O_RDONLY );
01518 if( FreeBlocksFile < 0 )
01519 {
01520 #ifdef RIO_DEBUG2
01521 RioErr << "DiskMgr::FreeSalvedBlocks erro " << errno << " ("
01522 << strerror( errno ) << " ao abrir o arquivo "
01523 << FreeBlocksFileName << " com os blocos a serem salvos do "
01524 << "do servidor de armazenamento " << StorageName << endl;
01525 #endif
01526 RioErr << "Reading blocks file " << FreeBlocksFileName << ": Failed"
01527 << endl;
01528 return ERROR_DISKMGR + ERROR_OPEN_FREEBLOCKS_FILE;
01529 }
01530
01531
01532 while( ( BytesRead = read( FreeBlocksFile, &Block,
01533 sizeof( RioDiskBlock ) ) ) == sizeof( RioDiskBlock ) )
01534 {
01535
01536 #ifdef RIO_DEBUG2
01537 RioErr << "DiskMgr::FreeSalvedBlocks removendo o bloco "
01538 << Block.block << " associado ao disco " << Block.disk
01539 << " lido do arquivo com os blocos " << FreeBlocksFileName
01540 << ": ";
01541 #endif
01542 if( m_disks[ Block.disk ] != NULL )
01543 {
01544 #ifdef RIO_DEBUG2
01545 RioErr << "OK" << endl;
01546 #endif
01547
01548 m_disks[ Block.disk ]->d_BitMap.Free( Block.block );
01549 }
01550 else
01551 {
01552
01553
01554 #ifdef RIO_DEBUG2
01555 RioErr << "Falhou. O disco " << Block.disk << " nao foi "
01556 << " inicializado." << endl;
01557 #endif
01558 RioErr << "Warning in read blocks file " << FreeBlocksFileName
01559 << ": disk " << Block.disk << " of fisical block "
01560 << Block.block << " is invalid" << endl;
01561 }
01562 }
01563 error = errno;
01564 close( FreeBlocksFile );
01565 if( BytesRead < 0 )
01566 {
01567 #ifdef RIO_DEBUG2
01568 RioErr << "DiskMgr::FreeSalvedBlocks erro " << error << " ("
01569 << strerror( error ) << " ao ler do arquivo "
01570 << FreeBlocksFileName << " com os blocos a serem salvos do "
01571 << "servidor de armazenamento " << StorageName << endl;
01572 #endif
01573 RioErr << "Reading blocks file " << FreeBlocksFileName << ": Failed"
01574 << endl;
01575 return ERROR_DISKMGR + ERROR_OPEN_FREEBLOCKS_FILE;
01576 }
01577 else if( BytesRead > 0 )
01578 {
01579 #ifdef RIO_DEBUG2
01580 RioErr << "DiskMgr::FreeSalvedBlocks foram lidos " << BytesRead
01581 << " bytes ao inves de " << sizeof( RioDiskBlock )
01582 << " bytes do arquivo " << FreeBlocksFileName
01583 << " com os blocos a serem salvos do servidor de "
01584 << "armazenamento " << StorageName << endl;
01585 #endif
01586 RioErr << "Reading blocks file " << FreeBlocksFileName << ": Failed"
01587 << endl;
01588 return ERROR_DISKMGR + ERROR_OPEN_FREEBLOCKS_FILE;
01589 }
01590 else
01591 {
01592 #ifdef RIO_DEBUG2
01593 RioErr << "DiskMgr::FreeSalvedBlocks removendo o arquivo "
01594 << FreeBlocksFileName << " com os blocos do disco " << endl;
01595 #endif
01596 RioErr << "Reading blocks file " << FreeBlocksFileName << ": Ok"
01597 << endl;
01598
01599 Confirm();
01600
01601 BuildAlloc();
01602
01603 remove( FreeBlocksFileName );
01604 return S_OK;
01605 }
01606 }
01607
01608
01609
01610 int DiskMgr::GetNumberOfNeededDisks( unsigned int *NumberOfDisks )
01611 {
01612 SNode *np;
01613 unsigned int NeededDisks;
01614 bool IsEnabled;
01615
01616 NeededDisks = 0;
01617
01618
01619
01620
01621
01622 for( np = m_nodelist; ( np != 0 ); np = np->sn_link )
01623 {
01624
01625 pthread_mutex_lock( &np->sn_mutex );
01626
01627 IsEnabled = np->sn_isenabled;
01628
01629 pthread_mutex_unlock( &np->sn_mutex );
01630 if( IsEnabled )
01631 NeededDisks = NeededDisks + np->sn_disks;
01632 else
01633 NeededDisks = NeededDisks + SNode::sn_maxdisks;
01634 }
01635
01636 *NumberOfDisks = NeededDisks;
01637
01638 return S_OK;
01639 }
01640
01641
01642
01643 SNode::SNode( DiskMgr *mgr, char *hostname )
01644 {
01645 int i;
01646
01647 sn_mgr = mgr;
01648 pthread_mutex_lock( &mgr->m_mutex );
01649 i = strlen( hostname ) + 1;
01650 sn_hostname = new char[i];
01651 if( sn_hostname )
01652 {
01653 memcpy( sn_hostname, hostname, i );
01654 }
01655
01656 sn_txthread = 0;
01657 sn_rxthread = 0;
01658 sn_termflag = false;
01659
01660 sn_link = NULL;
01661 memset( (char *)&sn_IPaddress, 0, sizeof( vsiIPaddress ) );
01662
01663
01664 sn_diskidorg = 0;
01665
01666 SNode *nodelast = VIRTORG( mgr->m_nodelist, SNode, sn_link );
01667
01668 while( nodelast->sn_link != 0 )
01669 nodelast = nodelast->sn_link;
01670
01671 nodelast->sn_link = this;
01672
01673 sn_disks = 0;
01674 for( i = 0; i < sn_maxdisks; i++ )
01675 {
01676 sn_disksize[i] = 0;
01677 }
01678
01679
01680
01681 sn_restart = false;
01682 sn_isenabled = false;
01683 sn_sendevent = false;
01684 sn_readfreeblocksfile = false;
01685
01686 pthread_mutex_unlock( &mgr->m_mutex );
01687
01688
01689
01690 pthread_mutex_init( &sn_mutex, NULL );
01691 pthread_cond_init( &sn_waitstorage, NULL );
01692 }
01693
01694 SNode::~SNode()
01695 {
01696 Event *event;
01697
01698
01699 sn_termflag = true;
01700
01701
01702
01703
01704 sn_socket.Close();
01705
01706
01707 if( sn_txthread != 0 )
01708 {
01709 event = EventManager.New( EventTypeFinalizeThread );
01710 sn_sendqueue.Put( event );
01711
01712
01713 pthread_cond_signal( &sn_waitstorage );
01714
01715 pthread_join( sn_txthread, NULL );
01716 }
01717 if( sn_rxthread != 0 )
01718 pthread_join( sn_rxthread, NULL );
01719
01720
01721 pthread_mutex_lock( &sn_mgr->m_mutex );
01722 SNode *node = VIRTORG( sn_mgr->m_nodelist, SNode, sn_link );
01723
01724 while( node->sn_link != 0 )
01725 {
01726 if( node->sn_link == this )
01727 {
01728 node->sn_link = this->sn_link;
01729 break;
01730 }
01731
01732
01733 node = node->sn_link;
01734
01735 }
01736
01737 pthread_mutex_unlock( &sn_mgr->m_mutex );
01738
01739 delete [] sn_hostname;
01740
01741
01742
01743 pthread_mutex_destroy( &sn_mutex );
01744 pthread_cond_destroy( &sn_waitstorage );
01745 }
01746
01747 void *SNode::rxthreadep( void *parm )
01748 {
01749 ((SNode *)parm)->rxthread();
01750 return 0;
01751 }
01752
01753
01754
01755
01756 int SNode::rxthread()
01757 {
01758 int rc;
01759 EventStorageReply *event;
01760
01761
01762 const unsigned long pfxsize = (unsigned long) &event->StorageReply.Header.Token -
01763 (unsigned long) &event->StorageReply.Header;
01764 bool isenabled;
01765
01766 RioErr << "RXTHREADID " << syscall( SYS_gettid ) << endl;
01767
01768 while( 1 )
01769 {
01770 if( sn_termflag )
01771 {
01772 RioErr << " DiskMgr rxthread finished. " << endl;
01773 pthread_exit( NULL );
01774 }
01775
01776
01777
01778
01779
01780 pthread_mutex_lock( &sn_mutex );
01781
01782 isenabled = sn_isenabled;
01783
01784 pthread_mutex_unlock( &sn_mutex );
01785
01786
01787
01788 if( isenabled )
01789 {
01790
01791 event = ( EventStorageReply* ) EventManager.New(
01792 EventTypeStorageReply );
01793 rc = sn_socket.Receive( ( char * ) &event->StorageReply.Header.Type,
01794 pfxsize );
01795
01796 if( sn_termflag )
01797 {
01798 EventManager.Free( ( Event* ) event );
01799 RioErr << " DiskMgr rxthread finished. " << endl;
01800 pthread_exit( NULL );
01801 }
01802
01803 if( rc )
01804 {
01805 unsigned int storageid;
01806
01807 EventManager.Free( ( Event* ) event );
01808
01809 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
01810
01811 #ifdef RIO_DEBUG2
01812 RioErr << "SNode::rxthread receive retornou o erro " << errno
01813 << " (" << strerror( errno ) << ") para o storage na "
01814 << "maquina " << sn_hostname << endl;
01815 #endif
01816
01817 if( sn_mgr->m_log.is_open() )
01818 sn_mgr->m_log << "DiskMgr.rxthread receive1 failed."
01819 << endl;
01820
01821 RioErr << " DiskMgr rxthread error. See log file. Storage "
01822 << "server " << sn_hostname << " is down" << endl;
01823
01824
01825
01826
01827 pthread_mutex_lock( &sn_mutex );
01828
01829 sn_isenabled = false;
01830
01831 pthread_mutex_unlock( &sn_mutex );
01832
01833
01834 EventStorageDown* event;
01835 event = ( EventStorageDown * ) EventManager.New(
01836 EventTypeStorageDown );
01837
01838 event->StorageId = storageid;
01839 event->EmptyStorageQueue = true;
01840
01841 #ifdef RIO_DEBUG2
01842 RioErr << "SNode::rxthread enviando a mensagem "
01843 << "EventStorageDown ao Router do servidor de "
01844 << "armazenamento com a ID " << event->StorageId << endl;
01845 #endif
01846
01847 sn_mgr->m_Router->Put( ( Event * ) event );
01848
01849 }
01850 else
01851 {
01852 if( event->StorageReply.Header.Size >
01853 sizeof( event->StorageReply ) )
01854 {
01855 if( sn_mgr->m_log.is_open() )
01856 sn_mgr->m_log << "DiskMgr.rxthread msg too large "
01857 << event->StorageReply.Header.Size
01858 << endl;
01859
01860 RioErr << " DiskMgr rxthread error. See log file. " << endl;
01861 }
01862 else
01863 {
01864 rc = sn_socket.Receive( (char *) &event->StorageReply.
01865 Header.Token,
01866 event->StorageReply.Header.Size -
01867 pfxsize );
01868
01869 if( sn_termflag )
01870 {
01871 EventManager.Free( ( Event* ) event );
01872 RioErr << " DiskMgr rxthread finished. " << endl;
01873 pthread_exit( NULL );
01874 }
01875
01876 if( rc )
01877 {
01878 unsigned int storageid;
01879
01880 EventManager.Free( ( Event* ) event );
01881
01882 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
01883
01884 if( sn_mgr->m_log.is_open() )
01885 sn_mgr->m_log << "DiskMgr.rxthread receive2 failed."
01886 << endl;
01887
01888 RioErr << " DiskMgr rxthread error. See log file. "
01889 << "Storage server " << sn_hostname
01890 << " is down" << endl;
01891
01892
01893 pthread_mutex_lock( &sn_mutex );
01894
01895
01896 sn_isenabled = false;
01897
01898
01899 pthread_mutex_unlock( &sn_mutex );
01900
01901
01902
01903 EventStorageDown* event;
01904 event = ( EventStorageDown * ) EventManager.New(
01905 EventTypeStorageDown );
01906
01907 event->StorageId = storageid;
01908 event->EmptyStorageQueue = true;
01909
01910 #ifdef RIO_DEBUG2
01911 RioErr << "SNode::rxthread enviando a mensagem "
01912 << "EventStorageDown ao Router do servidor de "
01913 << "armazenamento com a ID " << event->StorageId
01914 << endl;
01915 #endif
01916
01917 sn_mgr->m_Router->Put( ( Event * ) event );
01918 }
01919 else
01920 {
01921
01922
01923
01924
01925
01926
01927
01928
01929
01930
01931
01932
01933
01934 if( sn_termflag )
01935 {
01936 EventManager.Free(( Event* )event );
01937 RioErr << " DiskMgr rxthread finished. " << endl;
01938 return 0;
01939 }
01940
01941 switch( event->StorageReply.Header.Type )
01942 {
01943 case MSG_RSS_NODEINFO:
01944 rxthread_nodeinfo( event );
01945 break;
01946
01947 case MSG_RSS_DISKINFO:
01948 rxthread_diskinfo( event );
01949 break;
01950
01951 case MSG_RSS_DISKSERVICETIMEINFO:
01952 rxthread_diskservicetimeinfo( event );
01953 break;
01954
01955
01956
01957
01958 case MSG_RSS_CONFIRMATION:
01959 rxthread_confirmation( event );
01960 break;
01961
01962 #ifdef RIO_DEGUG2
01963
01964
01965
01966
01967 case MSG_RSS_SEARCHLOGS_STATUS:
01968 rxthread_searchlogsstatus( event );
01969 break;
01970 #endif
01971
01972
01973 default:
01974 sn_mgr->m_Router->Put( ( Event* )event );
01975 break;
01976 }
01977 }
01978 }
01979 }
01980 }
01981 else
01982 {
01983
01984
01985
01986
01987 sn_socket.Close();
01988
01989 rc = sn_socket.Connect( &sn_IPaddress );
01990 if( rc )
01991 {
01992
01993 RioErr << " connect failed for " << sn_hostname << ": "
01994 << GetErrorDescription( rc ) << ". Trying again!"
01995 << endl;
01996
01997 sleep( sn_mgr->m_TimeBetweenAttempts );
01998 }
01999 else
02000 {
02001
02002 Event *oldevent;
02003 do
02004 {
02005 oldevent = ( Event * ) sn_sendqueue.Remove();
02006 if( oldevent != NULL )
02007 EventManager.Free( oldevent );
02008 } while( oldevent != NULL );
02009
02010 RioErr << "SNode::rxthread conexao executada com sucesso para "
02011 << "o servidor de armazenamento " << sn_hostname
02012 << ". Enviando a mensagem MSG_RSS_INITIALIZATION_REQ "
02013 << "para reiniciar o servidor de armazenamento "
02014 << endl;
02015
02016 pthread_mutex_lock( &sn_mutex );
02017
02018
02019 sn_isenabled = true;
02020
02021
02022 sn_sendevent = true;
02023
02024
02025 pthread_cond_signal( &sn_waitstorage );
02026
02027 EventStorageRequest *event;
02028 event = ( EventStorageRequest* ) EventManager.New(
02029 EventTypeStorageRequest );
02030 event->StorageRequest.Header.Type = MSG_RSS_INITIALIZATION_REQ;
02031 event->StorageRequest.Header.Size = SizeMsgRSSInitializationReq;
02032 event->StorageRequest.Header.Token = RSS_TOKEN_ROUTER;
02033 MsgRSSInitializationReq* msg = & ( event->StorageRequest.
02034 Initialization );
02035 msg->BlockSize = sn_mgr->m_BlockSize;
02036 sn_sendqueue.Put( ( Event* ) event );
02037
02038 pthread_mutex_unlock( &sn_mutex );
02039
02040 }
02041 }
02042 }
02043 }
02044
02045
02046
02047
02048 void SNode::rxthread_confirmation( EventStorageReply *event )
02049 {
02050 EventStorageRequest *TxEvent;
02051 bool isenabled;
02052
02053 if( ( event->StorageReply.NodeInfo.Size != SizeMsgRSSConfirmation )
02054 || ( event->StorageReply.NodeInfo.Token != RSS_TOKEN_STORAGE ))
02055 {
02056 RioErr << "event->StorageReply.NodeInfo.Size = "
02057 << event->StorageReply.NodeInfo.Size << endl;
02058 RioErr << "SizeMsgRSSConfirmation = " << SizeMsgRSSConfirmation
02059 << endl;
02060 RioErr << "event->StorageReply.NodeInfo.Token = " << hex
02061 << event->StorageReply.NodeInfo.Token << dec << endl;
02062 RioErr << "RSS_TOKEN_STORAGE = " << hex << RSS_TOKEN_STORAGE << dec
02063 << endl;
02064 if( sn_mgr->m_log.is_open() )
02065 sn_mgr->m_log << "DiskMgr.rxthread invalid confirmation msg "
02066 << "ignored for " << sn_hostname << endl;
02067
02068 EventManager.Free(( Event* ) event );
02069 return;
02070 }
02071
02072 EventManager.Free(( Event* ) event );
02073
02074
02075
02076
02077 pthread_mutex_lock( &sn_mutex );
02078
02079 isenabled = sn_isenabled;
02080
02081 pthread_mutex_unlock( &sn_mutex );
02082
02083
02084
02085
02086
02087
02088
02089
02090 if( ( sn_sendevent ) && ( sn_restart ) )
02091 {
02092 unsigned int storageid;
02093
02094 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
02095
02096
02097
02098
02099 EventStorageUp* event;
02100
02101
02102
02103 event = ( EventStorageUp * ) EventManager.New( EventTypeStorageUp );
02104 event->StorageId = storageid;
02105
02106 #ifdef RIO_DEBUG2
02107 RioErr << "SNode::rxthread_confirmation enviando a mensagem "
02108 << "EventStorageUp ao Router do servidor de armazenamento com "
02109 << "a ID " << event->StorageId << endl;
02110 #endif
02111
02112 sn_mgr->m_Router->Put( ( Event * ) event );
02113
02114
02115 sn_sendevent = false;
02116 }
02117
02118 if( !sn_restart )
02119 {
02120
02121
02122
02123
02124 TxEvent = ( EventStorageRequest* ) EventManager.New(
02125 EventTypeStorageRequest );
02126 TxEvent->StorageRequest.Header.Type = MSG_RSS_NODEINFO_REQ;
02127 TxEvent->StorageRequest.Header.Size = SizeMsgRSSnodeInfoReq;
02128 TxEvent->StorageRequest.Header.Token = RSS_TOKEN_ROUTER;
02129
02130 sn_sendqueue.Put( (Event*) TxEvent );
02131
02132
02133
02134 sn_restart = true;
02135 }
02136 }
02137
02138 #ifdef RIO_DEGUG2
02139
02140
02141
02142
02143 void SNode::rxthread_searchlogsstatus( EventStorageReply *event )
02144 {
02145 struct in_addr ip;
02146 RioErr << "SNode::rxthread_searchlogsstatus a mensagem de controle "
02147 << event->StorageReply.SearchLogsStatus.StatusType << " executou "
02148 << " e retornou o erro " << Error;
02149 if( event->StorageReply.SearchLogsStatus.StatusType ==
02150 MSG_RSS_REMOVE_SEARCH_FILE_REQ )
02151 RioErr << endl;
02152 else
02153 {
02154 ip.s_addr = event->StorageReply.SearchLogsStatus.IPaddr;
02155 RioErr << ". A mensagem veio do cliente com o IP " << inet_ntoa( ip )
02156 << " e a porta "
02157 << ntohs( event->StorageReply.SearchLogsStatus.Port )
02158 << ", e estava associada a ID "
02159 << event->StorageReply.SearchLogsStatus.ClientId << endl;
02160 }
02161 EventManager.Free(( Event* ) event );
02162 return;
02163 }
02164 #endif
02165
02166
02167
02168 void SNode::rxthread_nodeinfo( EventStorageReply *event )
02169 {
02170 EventStorageRequest *TxEvent;
02171 int i;
02172
02173 if( ( event->StorageReply.NodeInfo.Size != SizeMsgRSSnodeInfo )
02174 || ( event->StorageReply.NodeInfo.Token != RSS_TOKEN_STORAGE ))
02175 {
02176 if( sn_mgr->m_log.is_open() )
02177 sn_mgr->m_log << "DiskMgr.rxthread invalid nodeinfo msg "
02178 << "ignored for " << sn_hostname << endl;
02179
02180 EventManager.Free(( Event* ) event );
02181 return;
02182 }
02183
02184 if( sn_disks != 0 )
02185 {
02186 if( sn_mgr->m_log.is_open() )
02187 sn_mgr->m_log << "DiskMgr.rxthread duplicate nodeinfo "
02188 << "ignored for " << sn_hostname << endl;
02189
02190 EventManager.Free(( Event* ) event );
02191 return;
02192 }
02193
02194 i = event->StorageReply.NodeInfo.nDisks;
02195
02196 if( i > sn_maxdisks )
02197 {
02198 if( sn_mgr->m_log.is_open() )
02199 sn_mgr->m_log << "DiskMgr.rxthread nDisks " << i
02200 << " > sn_maxdisks " << sn_maxdisks
02201 << " ignored for " << sn_hostname << endl;
02202
02203 EventManager.Free((Event*) event);
02204 return;
02205 }
02206
02207 sn_disks = i;
02208 EventManager.Free(( Event* ) event );
02209
02210
02211 for( i = 0; i < sn_disks; i++ )
02212 {
02213 TxEvent = ( EventStorageRequest* )
02214 EventManager.New(EventTypeStorageRequest);
02215 TxEvent->StorageRequest.DiskInfo.Type = MSG_RSS_DISKINFO_REQ;
02216 TxEvent->StorageRequest.DiskInfo.Size = SizeMsgRSSdiskInfoReq;
02217 TxEvent->StorageRequest.DiskInfo.Token = RSS_TOKEN_ROUTER;
02218 TxEvent->StorageRequest.DiskInfo.Disk = i;
02219 sn_sendqueue.Put(( Event* )TxEvent );
02220 }
02221 }
02222
02223
02224
02225 void SNode::rxthread_diskinfo( EventStorageReply *event )
02226 {
02227 int diskid;
02228 Disk *dp;
02229 bool sendevent;
02230
02231 #ifdef RIO_DEBUG1
02232 RioErr << "### [SNode - rxthread_diskinfo] Start" << endl;
02233 #endif
02234
02235 if( ( event->StorageReply.DiskInfo.Size != SizeMsgRSSdiskInfo )
02236 || ( event->StorageReply.DiskInfo.Token != RSS_TOKEN_STORAGE ))
02237 {
02238 if( sn_mgr->m_log.is_open() )
02239 sn_mgr->m_log << "DiskMgr.rxthread invalid diskinfo msg "
02240 << "ignored for " << sn_hostname << endl;
02241
02242 EventManager.Free((Event*) event);
02243 return;
02244 }
02245
02246 if( sn_mgr->m_log.is_open() )
02247 sn_mgr->m_log << " DiskMgr.rxthread_diskinfo: node " << sn_hostname
02248 << " Disk " << event->StorageReply.DiskInfo.Disk
02249 << " DiskSize " << event->StorageReply.DiskInfo.DiskSize
02250 << " DiskName " << event->StorageReply.DiskInfo.DiskName;
02251
02252
02253 char diskname[256] = "", diskdevice[256] = "";
02254 strcpy( diskname, event->StorageReply.DiskInfo.DiskName );
02255 for( int aux = 0 ; diskname[aux] != '\0'; aux++ )
02256 if( diskname[aux] == '/' )
02257 diskname[aux] = '.';
02258 sprintf( diskdevice, "%s/%s.%s", sn_mgr->m_MetaRoot, sn_hostname, diskname);
02259
02260 if( sn_mgr->m_log.is_open() )
02261 sn_mgr->m_log << " DiskBitsName " << diskdevice << endl;
02262
02263
02264 if( event->StorageReply.DiskInfo.Disk > sn_maxdisks )
02265 {
02266 if( sn_mgr->m_log.is_open() )
02267 sn_mgr->m_log << "DiskMgr.rxthread Disk > maxdisks "
02268 << "ignored for " << sn_hostname << endl;
02269 EventManager.Free((Event*) event);
02270 return;
02271 }
02272
02273 sn_disksize[event->StorageReply.DiskInfo.Disk] =
02274 event->StorageReply.DiskInfo.DiskSize;
02275
02276 diskid = sn_diskidorg + event->StorageReply.DiskInfo.Disk;
02277
02278 if( sn_mgr->m_disks[diskid] )
02279 {
02280 if( sn_mgr->m_log.is_open() )
02281 sn_mgr->m_log << "Diskmgr.rxthread duplicate disk info for disk "
02282 << diskid << " from " << sn_hostname
02283 << " ignored " << endl;
02284
02285 EventManager.Free((Event*) event);
02286 return;
02287 }
02288
02289
02290 pthread_mutex_lock( &sn_mgr->m_mutex );
02291 dp = new Disk( sn_mgr, this );
02292 dp->d_diskid = diskid;
02293 dp->d_diskseq = event->StorageReply.DiskInfo.Disk;
02294 dp->d_ntot = sn_disksize[dp->d_diskseq] / sn_mgr->m_BlockSize;
02295
02296
02297
02298
02299
02300 dp->d_BitMap.Initialize( diskdevice, dp->d_ntot, &cerr );
02301
02302
02303 strcpy( dp->d_DiskName, event->StorageReply.DiskInfo.DiskName );
02304
02305
02306
02307
02308 dp->d_status = Disk::STAT_READ | Disk::STAT_WRITE | Disk::STAT_ALLOC;
02309 sn_mgr->m_disks[diskid] = dp;
02310
02311
02312
02313 sendevent = true;
02314 for( int d = 0; d < sn_disks; d++ )
02315 sendevent = sendevent && ( sn_mgr->m_disks[ d + sn_diskidorg ] != 0 );
02316
02317 pthread_mutex_unlock( &sn_mgr->m_mutex );
02318
02319
02320 EventManager.Free(( Event* ) event );
02321
02322
02323 EventAddDisk* eventdisk;
02324 eventdisk = ( EventAddDisk* ) EventManager.New( EventTypeAddDisk );
02325 eventdisk->Disk = diskid;
02326 sn_mgr->m_Router->Put(( Event* )eventdisk );
02327
02328
02329
02330
02331
02332
02333
02334
02335
02336
02337 if( ( sn_sendevent ) && ( sendevent ) )
02338 {
02339 unsigned int storageid;
02340
02341 storageid = ( sn_diskidorg - 1 ) / SNode::sn_maxdisks;
02342
02343
02344
02345
02346 sn_mgr->LoadDisksBitMaps( storageid );
02347
02348
02349
02350 if( sn_readfreeblocksfile )
02351 {
02352 sn_mgr->FreeSalvedBlocks( sn_hostname );
02353 sn_readfreeblocksfile = false;
02354 }
02355
02356
02357 sn_mgr->m_NumberOfActiveDisks = sn_mgr->m_NumberOfActiveDisks +
02358 sn_disks;
02359
02360
02361
02362
02363 EventStorageUp* event;
02364
02365
02366
02367 event = ( EventStorageUp * ) EventManager.New( EventTypeStorageUp );
02368 event->StorageId = storageid;
02369
02370 #ifdef RIO_DEBUG2
02371 RioErr << "SNode::rxthread_confirmation enviando a mensagem "
02372 << "EventStorageUp ao Router do servidor de armazenamento com "
02373 << "a ID " << event->StorageId << endl;
02374 #endif
02375
02376 sn_mgr->m_Router->Put( ( Event * ) event );
02377
02378
02379 sn_sendevent = false;
02380 }
02381
02382 #ifdef RIO_DEBUG1
02383 RioErr << "### [SNode - rxthread_diskinfo] End" << endl;
02384 #endif
02385 }
02386
02387
02388
02389 void SNode::rxthread_diskservicetimeinfo( EventStorageReply *event )
02390 {
02391 if( ( event->StorageReply.DiskServiceTimeInfo.Size !=
02392 SizeMsgRSSdiskServiceTimeInfo ) ||
02393 ( event->StorageReply.DiskServiceTimeInfo.Token != RSS_TOKEN_STORAGE ))
02394 {
02395 if( sn_mgr->m_log.is_open() )
02396 sn_mgr->m_log << "DiskMgr.rxthread invalid diskServiceTimeinfo msg "
02397 << "ignored for " << sn_hostname << endl;
02398 EventManager.Free((Event*) event);
02399 return;
02400 }
02401
02402 if( event->StorageReply.DiskServiceTimeInfo.Disk > sn_maxdisks )
02403 {
02404 if( sn_mgr->m_log.is_open() )
02405 sn_mgr->m_log << "DiskMgr.rxthread Disk > maxdisks "
02406 << "ignored for " << sn_hostname
02407 << endl;
02408
02409 EventManager.Free((Event*) event);
02410 return;
02411 }
02412
02413 #ifdef RIO_DEBUG2
02414 if( sn_mgr->m_log.is_open() )
02415 sn_mgr->m_log << "DiskMgr.rxthread received servicetimeinfo of disk "
02416 << event->StorageReply.DiskServiceTimeInfo.Disk << ":"
02417 << event->StorageReply.DiskServiceTimeInfo.DiskId
02418 << endl;
02419 #endif
02420
02421
02422 sn_mgr->m_Router->SetDiskServiceTime(
02423 event->StorageReply.DiskServiceTimeInfo.DiskId,
02424 event->StorageReply.DiskServiceTimeInfo.EstimatedDiskServTime,
02425 event->StorageReply.DiskServiceTimeInfo.EstimatedServTimeAccT);
02426 EventManager.Free((Event*)event);
02427 }
02428
02429
02430
02431 void *SNode::txthreadep(void *parm)
02432 {
02433 (( SNode * )parm )->txthread();
02434 return 0;
02435 }
02436
02437
02438
02439 int SNode::txthread()
02440 {
02441 EventStorageRequest *event = NULL;
02442 Event *auxevent;
02443 int rc;
02444
02445 RioErr << "TXTHREADID " << syscall( SYS_gettid ) << endl;
02446
02447 while( true )
02448 {
02449
02450
02451
02452 pthread_mutex_lock( &sn_mutex );
02453
02454 if( !sn_isenabled )
02455
02456 pthread_cond_wait( &sn_waitstorage, &sn_mutex );
02457
02458 pthread_mutex_unlock( &sn_mutex );
02459
02460
02461
02462 auxevent = sn_sendqueue.Get();
02463
02464 if( auxevent->Type == EventTypeFinalizeThread )
02465 {
02466 EventManager.Free( (Event*) auxevent );
02467 RioErr << " DiskMgr txthread finished. " << endl;
02468
02469
02470
02471 return 0;
02472 }
02473
02474 event = ( EventStorageRequest* ) auxevent;
02475 #ifdef RIO_DEBUG2
02476 DataRequest& Request = (( EventDataRequest* )( event ))->Request;
02477 struct in_addr clientip;
02478
02479 clientip.s_addr = Request.Target.IPaddress;
02480 RioErr << "Enviando ao storage agora!: "
02481 << "IP " << inet_ntoa(clientip)
02482 << " port " << Request.Target.Port
02483 << " reqid " << Request.Reqid
02484 << " block " << Request.Block
02485 << endl;
02486 #endif
02487
02488
02489
02490
02491
02492
02493
02494 rc = sn_socket.Send( (char *) &(event->StorageRequest.Header.Type),
02495 event->StorageRequest.Header.Size );
02496 if( rc )
02497 {
02498 if( sn_mgr->m_log.is_open() )
02499 sn_mgr->m_log << "SNode.txthread send error " << rc << endl;
02500 RioErr << " DiskMgr txthread error. See log file. " << endl;
02501 }
02502
02503
02504
02505
02506
02507
02508
02509
02510
02511
02512
02513
02514
02515 EventManager.Free((Event*)event);
02516 }
02517 }
02518
02519
02520
02521 void SNode::SendStorageNode( EventStorageRequest* event, u16 DiskSeq )
02522 {
02523 if( event->Header.Type != EventTypeStorageRequest )
02524 {
02525 if( sn_mgr->m_log.is_open() )
02526 sn_mgr->m_log << "SNode::SendStorageNode ERROR: Event has "
02527 << "invalid type:" << (int) (event->Header.Type)
02528 << endl;
02529
02530 EventManager.Free((Event*)event);
02531 return;
02532 }
02533 u16 type = event->StorageRequest.Header.Type & 0xf;
02534
02535 if( ( type == MSG_RSS_READ ) || ( type == MSG_RSS_WRITE ) ||
02536 ( type == MSG_RSS_FETCH ) || ( type == MSG_RSS_RECEIVE ))
02537 {
02538 event->StorageRequest.Request.New.Disk = DiskSeq;
02539 }
02540
02541 else if( ( type != MSG_RSS_SEND ) &&
02542 ( type != MSG_RSS_FLUSH ) &&
02543 ( type != MSG_RSS_CANCEL ) )
02544 {
02545 if( sn_mgr->m_log.is_open() )
02546 sn_mgr->m_log << "SNode::SendStorageNode ERROR: Storage request"
02547 << " has invalid type: "
02548 << event->StorageRequest.Header.Type << endl;
02549
02550 EventManager.Free((Event*)event);
02551 return;
02552 }
02553 sn_sendqueue.Put((Event*) event);
02554
02555 return;
02556 }
02557
02558
02559 void SNode::GetDiskServiceTime( EventStorageRequest* event, u16 DiskSeq )
02560 {
02561 if( event->Header.Type != EventTypeStorageRequest )
02562 {
02563 if( sn_mgr->m_log.is_open() )
02564 sn_mgr->m_log << "SNode::GetDiskServiceTime ERROR: "
02565 << "Event has invalid type:"
02566 << (int) (event->Header.Type) << endl;
02567
02568 EventManager.Free((Event*)event);
02569 return;
02570 }
02571 u16 type = event->StorageRequest.Header.Type;
02572 if( type == MSG_RSS_DISKSERVICETIMEINFO_REQ )
02573 {
02574 event->StorageRequest.DiskServiceTimeInfo.Disk = DiskSeq;
02575 }
02576 else
02577 {
02578 if( sn_mgr->m_log.is_open() )
02579 sn_mgr->m_log << "SNode::GetDiskServiceTime ERROR: Storage request"
02580 << " has invalid type: "
02581 << event->StorageRequest.Header.Type << endl;
02582
02583 EventManager.Free((Event*)event);
02584 return;
02585 }
02586
02587 #ifdef RIO_DEBUG2
02588 if( sn_mgr->m_log.is_open() )
02589 sn_mgr->m_log << "SNode::GetDiskServiceTime getting service time for "
02590 << "disk "
02591 << event->StorageRequest.DiskServiceTimeInfo.Disk << " : "
02592 << event->StorageRequest.DiskServiceTimeInfo.DiskId
02593 << endl;
02594 #endif
02595
02596 sn_sendqueue.Put((Event*) event);
02597
02598 return;
02599 }
02600
02601
02602 Disk::Disk( DiskMgr *mgr, SNode *node )
02603 {
02604
02605 d_mgr = mgr;
02606 d_diskid = 0;
02607 d_ntot = 0;
02608 d_link = 0;
02609 d_status = 0;
02610 d_node = node;
02611 d_diskseq = 0;
02612
02613 memset( d_DiskName, 0, MaxPathSize );
02614
02615
02616 Disk *disklast = VIRTORG( mgr->m_disklist, Disk, d_link );
02617 while( disklast->d_link != 0 )
02618 {
02619 disklast = disklast->d_link;
02620 }
02621 disklast->d_link = this;
02622 }
02623
02624 Disk::~Disk()
02625 {
02626
02627 Disk *disk = VIRTORG( d_mgr->m_disklist, Disk, d_link );
02628
02629 while( disk->d_link != 0 )
02630 {
02631 if( disk->d_link == this )
02632 {
02633 disk->d_link = this->d_link;
02634 break;
02635 }
02636 }
02637 }
02638
02639
02640
02641 void Disk::SendStorageNode( EventStorageRequest* event )
02642 {
02643 d_node->SendStorageNode( event, d_diskseq );
02644 }
02645
02646
02647
02648 void Disk::GetServiceTime( EventStorageRequest* event )
02649 {
02650 #ifdef RIO_DEBUG2
02651 RioErr << "Disk:GetDiskServiceTime "<< d_diskseq <<endl;
02652 #endif
02653
02654 d_node->GetDiskServiceTime( event, d_diskseq );
02655 }
02656
02657
02658
02659 unsigned int Disk::GetNumberOfFreeBlocks()
02660 {
02661 return d_BitMap.nFree();
02662 }
02663
02664
02665 int Disk::GetName( char *DiskName )
02666 {
02667 strcpy( DiskName, d_DiskName );
02668 return 0;
02669 }
02670