Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Class Members | File Members

DCDT_Agora.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 dEVICE cOMMUNITIES dEVELOPMENT tOOLKIT 
00003 
00004 DCDT_Agora.cpp
00005 
00006 COPYRIGHT (C) 2002   Paolo Meriggi (meriggi@ing.unibs.it)
00007                      Alessandro Mazzini (mazzini@airlab.elet.polimi.it)
00008 
00009 
00010 This library is free software; you can redistribute it and/or
00011 modify it under the terms of the GNU Lesser General Public
00012 License as published by the Free Software Foundation; either
00013 version 2 of the License, or (at your option) any later version.
00014 
00015 This library is distributed in the hope that it will be useful,
00016 but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 Lesser General Public License for more details.
00019 
00020 You should have received a copy of the GNU Lesser General Public
00021 License along with this library; if not, write to the Free Software
00022 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
00023 
00024 ****************************************************************************/
00025 
00026 
00027 #ifndef DCDT_AGORA_CPP
00028 #define DCDT_AGORA_CPP
00029 
00030 using namespace std;
00031 
00032 #include <DCDT_Agora.h>
00033 #include <DCDT_Finder.h>
00034 #include <DCDT_PostOffice.h>
00035 #include <DCDT_Member.h>
00036 #include <DCDT_MsgManager.h>
00037 #include <DCDT_InnerLinkManager.h>
00038 #include <DCDT_ctrace.h>
00039 
00043 DCDT_Agora::DCDT_Agora( int PO_Type ) {
00044 
00045   // Enable tracing
00046   TRC_INIT ( "agora.log", TRC_ENABLED, TRC_ON, TRC_ALL, UNIT_MAX, 0 );
00047   TRC_ADD_THREAD ( "Agora", 0 );
00048   
00049   // TODO: Associare signalling in order to have a smooth closing
00050   //  signal( SIGINT, endme);
00051 
00052   // Initialization of System variables
00053   Status = DCDT_ZERO;
00054   //LastMemberID = 1;
00055   AgoraID = 0;
00056   MaxIDMember = 0;
00057 
00058   strcpy( IPStrAddr, "");
00059 
00060 #ifdef SEQUENCER_VERSION
00061   Sequencer = new DCDT_Sequencer;
00062 #endif
00063 
00064   // Initialize MemberTable
00065   for( int i = 0; i<MAX_MEMBERS; i++) {
00066     DeleteMemberTableEntry(i);
00067    }
00068 
00069   // Create a brand new postoffice
00070   PostOffice = new DCDT_PostOffice( this, PO_Type ) ;
00071 
00072   // Setup InnerLink
00073   InnerLinkManager = new DCDT_InnerLinkManager( this );
00074   if ( InnerLinkManager->LookForExistingAgora(AgoraID) ) {
00075     // There is an existing agora
00076     FirstAgora = false;
00077     
00078     // Let's create and run the MsgManager
00079     MsgManager = new DCDT_MsgManager( this ) ;
00080     AddSysMember( MsgManager, MPM_SYSTEM );
00081     ActivateSysMember( MsgManager, "MsgManager" );
00082     
00083     Delay( 20000 );
00084     
00085     cout << "\nRunning in standalone secondary mode\n";
00086   }
00087   else {
00088     FirstAgora = true;
00089     Finder = NULL;
00090 
00091     // Let's activate and run the InnerLinkManager
00092     AddSysMember (InnerLinkManager, MPM_SYSTEM );
00093     ActivateSysMember( InnerLinkManager, "InnerLinkManager" );
00094     
00095     // Let's create and run the MsgManager
00096     MsgManager = new DCDT_MsgManager( this ) ;
00097     AddSysMember( MsgManager, MPM_SYSTEM );
00098     ActivateSysMember( MsgManager, "MsgManager" );
00099 
00100     Delay( 20000 );
00101     
00102     cout << "\nRunning in standalone primary mode\n";
00103   }
00104 
00105 };
00106 
00110 DCDT_Agora::DCDT_Agora( char *filename, int PO_Type ) {
00111   // Enable tracing
00112   TRC_INIT ( "agora.log", TRC_ENABLED, TRC_ON, TRC_ALL, UNIT_MAX, 0 );
00113   TRC_ADD_THREAD ( "Agora", 0 );
00114   TRC_PRINT ( DCDT_TRC_AGORA, TRC1, ("Config: %s", filename));
00115 
00116   // Initialization of System variables
00117   Status = DCDT_ZERO;
00118   AgoraID = 0;
00119   MaxIDMember = 0;
00120 
00121 #ifdef SEQUENCER_VERSION
00122   Sequencer = new DCDT_Sequencer;
00123 #endif
00124   
00125   // Initialize MemberTable
00126   for( int i = 0; i<MAX_MEMBERS; i++) {
00127     DeleteMemberTableEntry(i);
00128   }
00129   
00130   commflag = CommConfig(filename);
00131   
00132   // PostOffice type read from the configuration file
00133   if ( finder_data != NULL ) {
00134     if ( finder_data->fp != NULL ) {
00135       char cdmy=10;
00136       while ( cdmy == 0x0a )
00137         cdmy=fgetc( finder_data->fp );
00138       
00139       //printf ( "Read %c (%d)\n", cdmy, cdmy );
00140       if ( cdmy == 'P' ) {
00141         char str1[80],str2[80];
00142         str1[0]=str2[0]=0x00;
00143         
00144         fscanf( finder_data->fp, "%s%s", str1, str2 );
00145         
00146         if ( strcasecmp( str2, "SLWB" ) == 0 )
00147           PO_Type = POSTOFFICE_SLWB;
00148         
00149         if ( strcasecmp( str2, "SLWDWCV" ) == 0 )
00150           PO_Type = POSTOFFICE_SLWDC;
00151         
00152         if ( strcasecmp( str2, "SLWSM" ) == 0 )
00153           PO_Type = POSTOFFICE_SLWSM;
00154       }
00155       else
00156         ungetc ( cdmy, finder_data->fp );
00157     }
00158   }
00159   
00160   PostOffice = new DCDT_PostOffice( this, PO_Type ) ;
00161   
00162   // Setup InnerLink
00163   InnerLinkManager = new DCDT_InnerLinkManager( this );
00164   if ( InnerLinkManager->LookForExistingAgora(AgoraID) ) {
00165     // There is an existing agora
00166     FirstAgora = false;
00167     
00168     // Let's create and run the MsgManager
00169     MsgManager = new DCDT_MsgManager( this ) ;
00170     AddSysMember( MsgManager, MPM_SYSTEM );
00171     ActivateSysMember( MsgManager, "MsgManager" );
00172     
00173     Delay( 20000 );
00174     
00175     cout << "\nRunning in normal secondary mode\n\nPress a key\n";
00176   }
00177   else {
00178     FirstAgora = true;
00179     
00180     // Let's activate and run the InnerLinkManager
00181     AddSysMember (InnerLinkManager, MPM_SYSTEM );
00182     ActivateSysMember( InnerLinkManager, "InnerLink" );
00183     
00184     // Let's create and run the MsgManager
00185     MsgManager = new DCDT_MsgManager( this) ;
00186     AddSysMember( MsgManager, MPM_SYSTEM );
00187     ActivateSysMember( MsgManager, "MsgManager" );
00188     
00189     Delay( 20000 );
00190     
00191     // Let's start a Finder
00192     if (commflag) {
00193       Finder = new DCDT_Finder(this, finder_data);
00194       cout << endl << "Finder " << Finder << endl;
00195 
00196       AddSysMember(Finder, MPM_SYSTEM );
00197       ActivateSysMember(Finder, "Finder" );
00198       
00199       cout << "\nRunning in normal primary mode\n\nPress a key\n";
00200     }
00201     else
00202       cout << "\nRunning in standalone primary mode\n\nPress a key\n";
00203   }
00204 };
00205 
00207 DCDT_Agora::~DCDT_Agora() {
00208   
00209   // TODO: Check that all Members was previously deleted
00210   delete PostOffice;
00211   delete MsgManager;
00212   delete InnerLinkManager; 
00213   
00214   // If we are the frist Agorà, terminate the Finder thread,
00215   // it will free finder_data
00216   // otherwise we have to free it manually
00217   if( FirstAgora )  {
00218     if ( Finder ) {
00219       delete Finder;
00220     }
00221   }
00222   else
00223   if ( finder_data ) {
00224     if ( finder_data->fp )
00225       fclose ( finder_data->fp );
00226     delete finder_data;
00227   }
00228 
00229   /* ??? Manca di terminare alcuni membri questo aiuta ma ne rimane ancora uno attivo e se non si usa valgrind porta a un segment fault */
00230   /*
00231   for(int i = MAX_MEMBERS; i>0; i--) {
00232         if ( MemberTable[i].empty == false &&
00233              MemberTable[i].MemberPtr) {
00234            if ( MemberTable[i].active == true )
00235               pthread_cancel( MemberTable[i].MemberPtr->myThread );
00236         }
00237   }
00238   */
00239 
00240   TRC_REMOVE_THREAD ( pthread_self() );
00241   TRC_END();
00242 };
00243 
00245 void DCDT_Agora::AddMember(DCDT_Member * MemberPtr) {
00246   int newMemberID;
00247 
00248   // Create a new entry for the DCDT_Member in the DCDT_MemberList
00249   newMemberID = GetFirstFreeMemberTableSlot();
00250   
00251   // Set the top ID yet assigned
00252   if (newMemberID > MaxIDMember )
00253     MaxIDMember = newMemberID;
00254   
00255   MemberPtr->Profile = 0;
00256   MemberPtr->myID = newMemberID;
00257   MemberPtr->myAgoraID = AgoraID;
00258   MemberPtr->myPostOffice = PostOffice;
00259   MemberPtr->myFinder = NULL;
00260   strcpy(MemberPtr->myIPAddr, IPStrAddr);// serve per testing di rapida identificazione
00261   
00262 #ifdef SEQUENCER_VERSION
00263   MemberPtr->mySequencer = Sequencer;
00264 #endif
00265  
00266   MemberTable[newMemberID].active = false; 
00267   MemberTable[newMemberID].empty = false; 
00268   MemberTable[newMemberID].system = false; 
00269   MemberTable[newMemberID].MemberPtr = MemberPtr;
00270   MemberTable[newMemberID].Profile = MPM_USER;
00271   MemberTable[newMemberID].status = EMPTYSLOT;
00272   PostOffice->AddMember(newMemberID);
00273 
00274   TRC_PRINT ( DCDT_TRC_AGORA, TRC1, ("AddMember ID=%d", newMemberID));
00275 
00276 };
00277 
00279 void DCDT_Agora::AddSysMember(DCDT_Member * MemberPtr, unsigned int profile) {
00280   int newMemberID;
00281   
00282   // Create a new entry for the DCDT_Member in the DCDT_MemberList
00283   newMemberID = GetFirstFreeMemberTableSlot();
00284   
00285   // Set the top ID yet assigned
00286   if (newMemberID > MaxIDMember )
00287     MaxIDMember = newMemberID;
00288   
00289   MemberPtr->Profile = profile;
00290   MemberPtr->myID = newMemberID;
00291   MemberPtr->myAgoraID = AgoraID;
00292   MemberPtr->myPostOffice = PostOffice;
00293   MemberPtr->myFinder = Finder;
00294   
00295 #ifdef SEQUENCER_VERSION
00296   MemberPtr->mySequencer = Sequencer;
00297 #endif
00298   
00299   MemberTable[newMemberID].active = false; 
00300   MemberTable[newMemberID].empty = false; 
00301   MemberTable[newMemberID].system = true; 
00302   MemberTable[newMemberID].MemberPtr = MemberPtr;
00303   MemberTable[newMemberID].Profile = profile;
00304   MemberTable[newMemberID].status = EMPTYSLOT;
00305   PostOffice->AddSysMember(newMemberID);
00306   
00307   TRC_PRINT( DCDT_TRC_AGORA, TRC1, ("AddSysMember ID=%d Profile=%x", newMemberID, MemberTable[newMemberID].Profile));
00308   
00309 };
00310 
00311 
00313 void DCDT_Agora::RemoveMember(DCDT_Member * MemberPtr) {
00314   
00315   // Wait that the thread ends or is cancelled
00316   int ret;
00317   ret = pthread_join( (MemberPtr->myThread), NULL );
00318   if ( ret!= 0 ) {
00319     TRC_PRINT ( DCDT_TRC_AGORA, TRC1, ("pthread_join ID=%d Error=%d", MemberPtr->myID, ret));
00320     printf ( "pthread_join %p ( MemberID %d ) = %d\n", MemberPtr, MemberPtr->myID, ret );
00321   }
00322   
00323   TRC_PRINT ( DCDT_TRC_AGORA, TRC1, ("RemoveMember ID=%d", MemberPtr->myID));
00324   
00325   MemberPtr->Status = TERMINATING;
00326   
00327   // TODO: check that the member wasn't working
00328   Delay(20000);
00329   
00330   // Delete all the references to the deleted member from PostOffice and Agora members table
00331   PostOffice->DeleteMemberTableEntry(MemberPtr->myID);
00332   DeleteMemberTableEntry(MemberPtr->myID);
00333 
00334   delete(MemberPtr);
00335 };
00336 
00338 int DCDT_Agora::ActivateMember(DCDT_Member * Memb, char *name) {
00339   
00340   return Memb->Activate( name );
00341 };
00342 
00344 int DCDT_Agora::ActivateSysMember(DCDT_Member * Memb, char *name) {
00345 
00346   return Memb->Activate( name );
00347 };
00348 
00349 
00353 void DCDT_Agora::LetsWork() {
00354 
00355   // TODO: set the process priority
00356   //  nice( -18);
00357 
00358   Status = DCDT_LETSWORK;
00359   // da sostituire con un segnale ( ??? )
00360   // Delay(500000);
00361 
00362 #ifdef SEQUENCER_VERSION
00363   while( Status == DCDT_LETSWORK){
00364     Sequencer->CheckNextDeadline();
00365     pthread_testcancel();
00366     //    Delay(100);
00367   };
00368 #else
00369   
00370   bool vote_for_termination;
00371   int existing_members,terminating_votes;
00372   DCDT_MemberTableElem *member;
00373 
00374   while( Status == DCDT_LETSWORK){
00375     vote_for_termination = true;
00376     terminating_votes = 0;
00377     existing_members = 0;
00378 
00379     // if all user Members set TERMINATING, lets shutdown
00380     for(int i = 0; i< MAX_MEMBERS; i++) {
00381       member = &(MemberTable[i]);
00382       if (  member->empty == false && 
00383             member->MemberPtr ) {
00384 
00385         existing_members++;
00386 
00387         if ( member->system == false) { 
00388           if ( member->MemberPtr->Status == TERMINATING ) {
00389             // ??? Questa parte si puo' togliere perche' basta uscire dal ciclo con
00390             // vote_for_termination == true per indicare che bisogna chiudere
00391             vote_for_termination &= true;
00392             terminating_votes ++;
00393           }
00394           else {
00395             vote_for_termination = false;
00396             break;
00397           }
00398         }
00399       }
00400     }
00401 
00402     /* If all user member are in TERMINATING status the agora must terminate */
00403     if (vote_for_termination) { 
00404       Status = DCDT_TERMINATING;
00405       printf("AGORA: Vote for TERMINATION by %d user Members over %d existing\n"
00406              , terminating_votes, existing_members );
00407       break;
00408     }
00409 
00410     Delay( 10000 );
00411     //     printf("No vote for termination, Members = %d\n", MaxIDMember);
00412      
00413   } // while ( Status == DCDT_LETSWORK )
00414  
00415 
00416   if ( Status == DCDT_TERMINATING ) { 
00417     bool all_members_terminated = false;
00418     
00419     TRC_PRINT ( DCDT_TRC_AGORA, TRC1, ("Waiting for the termination of all members"));
00420 
00421     while ( !all_members_terminated ){ 
00422 
00423       all_members_terminated = true; 
00424       for(int i = MAX_MEMBERS; i>1; i--) {
00425         if ( MemberTable[i].empty == false && 
00426              MemberTable[i].MemberPtr) {
00427           if ( MemberTable[i].active == false )
00428             all_members_terminated |= true;
00429           else
00430             all_members_terminated = false;
00431         }
00432       }
00433 
00434       Status = DCDT_SHUTDOWN;
00435       Delay (10000);
00436     }
00437    }
00438    
00439   TRC_PRINT ( DCDT_TRC_AGORA, TRC1, ("Returning from DoYourJob"));
00440 #endif
00441 };
00442 
00457 bool DCDT_Agora::CommConfig(char *filename)
00458 {
00459 // SI FA RIFERIMENTO ALLA STESSA STRUTTURA DATI channels, VA FATTA UNA MODIFICA CHE AL MOMENTO
00460 // DELLA PARTENZA DEL LINK DISATTIVA IL CANALE
00461 
00462 // TODO: Check if the parameters are valid, if not write a message to the user
00463   FILE *fp;
00464   int retval, port;
00465 
00466   cout << "\nReading configuration file...\n";
00467   if ((fp = fopen(filename, "r"))) {
00468     finder_data = new FinderData();
00469     finder_data->fp = fp;
00470 
00471     retval = fscanf(fp, "%s%s%i", finder_data->IfStrAddr, finder_data->McStrAddr, &port);
00472     cout << "\n" << finder_data->IfStrAddr << " " << finder_data->McStrAddr << " " << port << "\n";
00473     if ((retval == 0) || (retval == EOF) || (inet_pton(AF_INET, finder_data->IfStrAddr, &finder_data->IfNumAddr) <= 0) || (inet_pton(AF_INET, finder_data->McStrAddr, &finder_data->McNumAddr) <= 0)) {
00474       cout << "\nError while reading configuration file, running in 'standalone mode'\n\nPress a key\n";
00475       getchar();
00476       return false;
00477     }
00478     finder_data->McPort = htons((short)port);
00479     cout << "\nAgoraID: " << (unsigned int)(AgoraID = finder_data->IfNumAddr) << "\n";
00480     strcpy(IPStrAddr, finder_data->IfStrAddr);
00481     return true;
00482   }
00483   else {
00484     cout << "\nCan't find configuration file, running in 'standalone mode'\n\nPress a key\n";
00485     getchar();
00486     strcpy ( IPStrAddr, "" );
00487     return false;
00488   }
00489 };
00490 
00492 void DCDT_Agora::SetStatus(int NewStatus) {
00493   Status = NewStatus;
00494 };
00495 
00496 
00497 inline int DCDT_Agora::CheckIfAllMemberReady() {
00498       bool all_mebers_ready = true;
00499 
00500     // If all user Members are in READY status
00501     for(int i = 0; i<= MaxIDMember; i++) {
00502       if ( MemberTable[i].system == false &&
00503            MemberTable[i].empty == false && 
00504            MemberTable[i].MemberPtr) { 
00505         if ( MemberTable[i].MemberPtr->Status == READY )
00506           all_mebers_ready |= true;
00507         else
00508           all_mebers_ready = false;
00509       }
00510     }
00511     
00512     return all_mebers_ready;
00513 };
00514 #endif // define

Generated on Sun Jun 19 10:35:49 2005 for dcdt by  doxygen 1.3.9.1