Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Class Members | File Members

DCDT_Finder.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 dEVICE cOMMUNITIES dEVELOPMENT tOOLKIT 
00003 
00004 DCDT_Finder.cpp
00005 
00006 COPYRIGHT (C) 2002   Alessandro Mazzini (mazzini@airlab.elet.polimi.it)
00007 
00008 
00009 This library is free software; you can redistribute it and/or
00010 modify it under the terms of the GNU Lesser General Public
00011 License as published by the Free Software Foundation; either
00012 version 2 of the License, or (at your option) any later version.
00013 
00014 This library is distributed in the hope that it will be useful,
00015 but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 Lesser General Public License for more details.
00018 
00019 You should have received a copy of the GNU Lesser General Public
00020 License along with this library; if not, write to the Free Software
00021 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
00022 
00023 ****************************************************************************/
00024 
00025 #ifndef FINDER_CPP
00026 #define FINDER_CPP
00027 
00028 using namespace std;
00029 
00030 #include <string.h> // for strerror
00031 
00032 #include <DCDT_Finder.h>
00033 #include <DCDT_Agora.h>
00034 
00035 DCDT_Finder::DCDT_Finder(DCDT_Agora* agora, FinderData *fd) : DCDT_Member(agora)
00036 {
00037   for(int i=1; i<MAXLINKS; i++) {
00038     DeleteLinkTableSlot(i);
00039   }
00040 
00041   for (int i=0; i<MAXCHANNELS; i++) {
00042     dynamic_channels[i].active = false;
00043     dynamic_channels[i].channel = NULL;
00044   }
00045 
00046   hs_header = NULL;
00047   localCD = remoteCD = NULL;
00048   finder_data = fd;
00049   starting_timer = false;
00050   passed=GetTime()-GetTime();
00051 }
00052 
00053 
00054 DCDT_Finder::~DCDT_Finder()
00055 {
00056   if ( finder_data ) {
00057     if ( finder_data->fp )
00058       fclose ( finder_data->fp );
00059 
00060     delete finder_data;
00061   }
00062 
00063   for (int idx=0; idx<MAXCHANNELS; idx++ ) {
00064     if ( dynamic_channels[idx].channel ) {
00065       delete dynamic_channels[idx].channel;
00066     }
00067   }
00068 }
00069 
00070 
00076 void DCDT_Finder::Init()
00077 {
00078   // Open the channel used for dynamic communications on the multicast address
00079   try {
00080     dynamic_channels[0].active = true;
00081     dynamic_channels[0].dynamic = true;
00082     dynamic_channels[0].channel = new DCDT_Channel_IP(myAgoraID);
00083     ((DCDT_Channel_IP*)dynamic_channels[0].channel)->DynamicPrepare(finder_data->IfNumAddr, finder_data->McNumAddr, finder_data->McPort);
00084   } catch (ChannelError e) {
00085     cout << "\nError while preparing channel 0 (" << strerror(e.errval) << ")\n";
00086     exit(1);
00087   }
00088 
00089   // Create static links data structures
00090   try {
00091     StaticLinks();
00092   } catch (Exception e) {
00093     cout << "\nError while preparing static links\n";
00094   }
00095 
00096   if (starting_timer) {
00097     cout << "\ndeactivating ip channel!!!!!!\n";
00098     dynamic_channels[0].active = false;
00099     start_ts = GetTime();
00100   }
00101 
00102   // Send an HS_NOTIFY message throught the active channels.
00103   for (i=0; i<MAXCHANNELS; i++) {
00104     if (dynamic_channels[i].active) {
00105       try {
00106         dynamic_channels[i].channel->SendNotify();
00107       } catch(TimeOut e) {
00108         cout << "\nCan't send notify on channel " << i << "( Timeout )" << "\n";
00109       } catch(ChannelError e) {
00110         cout << "\nError while sending notify on channel " << i << "(" << strerror(e.errval) << ")\n";
00111       }
00112     }
00113   }
00114   old_ts = notify_ts = GetTime();
00115 }
00116 
00117 
00118 /* \brief Manages links and system messages.
00119  *
00120  * This member is logically composed of two parts:\n
00121  * -  a cyclic part that manages HS_NOTIFY and HS_ANSWER messages on all the active channels, if necessary this part can create or close a channel.\n
00122  * - a periodical part that becomes active every NOTIFY_PERIOD usec and sends HS_NOTIFY throught all the active channels.\n
00123  */
00124 void DCDT_Finder::DoYourJob(int par)
00125 {
00126   // spedizione e ricezione di notify e answer sono interne, cosė come la gestione delle CommData. Non si tratta pių quindi
00127   // di DCDT_Msg. La localCD di inizializzazione del canale č ricavata ogni volta con GetStartingCD(), ed č un puntatore
00128   // per riferimento, in modo che possa risalire dalla preparazione del Link in caso sia modificata.
00129   // Il timer dei notify č gestito allo stesso modo, le SysReceive sono senza timer e non
00130   // bloccanti: i canali sono scandagliati uno dopo l'altro e se avanza del tempo il Finder si addormenta per il tempo
00131   // rimanente
00132   // localCD is taken from GetStartingCD() so if the an because is a pointer the initial
00133 
00134   ChannelTableElem *channel;
00135   int idx;
00136 
00137   // Receive and answer to system messages
00138   linked = received = false;
00139   
00140   try {
00141     for (idx=0; idx<MAXCHANNELS; idx++) {
00142       channel=&(dynamic_channels[idx]);
00143       if (channel->active) {
00144         try {
00145           channel->channel->ReceiveHS(hs_header, remoteCD);
00146         } catch (TimeOut t) {
00147         } catch (ChannelError e) {
00148           cout << "\nError while receiving on channel " << i;
00149         }
00150         
00151         if (hs_header) {
00152           received = true;
00153           switch (hs_header->type) {
00154             
00155           case HS_NOTIFY:
00156             //server
00157             TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Notify received from AgoraId %i", hs_header->AgoraID));
00158 
00159             linknum = IsPresent(hs_header->AgoraID);
00160             TRC_PRINT( DCDT_TRC_COMM, TRC1, ("IsPresent: %i", linknum));
00161             
00162             if (!linknum) {
00163               channel->channel->ChooseChannel(hs_header->channel);
00164               localCD = CreateLink(hs_header->AgoraID, L_WAITING, channel->channel->GetStartingCD(), NULL, channel->channel->ReadStartingProfile());
00165               channel->channel->SendAnswer(remoteCD, localCD);
00166               linked = true;
00167             }
00168             else {
00169               switch (LinkTable[linknum].LinkPtr->ReadStatus()) {
00170                 
00171               case L_WAITING:
00172                 channel->channel->SendAnswer(remoteCD, LinkTable[linknum].LinkPtr->GetCommData());
00173                 break;
00174                 
00175               case L_LOST:
00176                 channel->channel->ChooseChannel(hs_header->channel);
00177                 localCD = LinkTable[linknum].LinkPtr->Restart(L_WAITING, 
00178                                                               channel->channel->GetStartingCD(),
00179                                                               NULL);
00180                 channel->channel->SendAnswer(remoteCD, localCD);
00181                 linked = true;
00182                 
00183               }
00184             }
00185             delete remoteCD;
00186             remoteCD = NULL;
00187             break;
00188             
00189           case HS_ANSWER:
00190             //client
00191             TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Answer received from AgoraId %i", hs_header->AgoraID));
00192 
00193             linknum = IsPresent(hs_header->AgoraID);
00194             TRC_PRINT( DCDT_TRC_COMM, TRC1, ("IsPresent: %i", linknum));
00195 
00196             if (!linknum) {
00197               channel->channel->ChooseChannel(hs_header->channel);
00198               CreateLink(hs_header->AgoraID, L_CONNECTING, channel->channel->GetStartingCD(), remoteCD, channel->channel->ReadStartingProfile());
00199               remoteCD = NULL;
00200               linked = true;
00201             }
00202             else {
00203               switch (LinkTable[linknum].LinkPtr->ReadStatus()) {
00204                 
00205               case L_WAITING:
00206                 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("RemoteAgoraID=%i MyAgoraID=%i", hs_header->AgoraID, myAgoraID));
00207 
00208                 if (hs_header->AgoraID > myAgoraID) {
00209                   LinkTable[linknum].LinkPtr->ResetToConnecting(channel->channel->GetStartingCD(), remoteCD);
00210                   remoteCD = NULL;
00211                 }
00212                 else {
00213                   delete remoteCD;
00214                   remoteCD = NULL;
00215                 }
00216                 break;
00217                 
00218               case L_CONNECTING:
00219               case L_RESETTOCONN:
00220                 delete remoteCD;
00221                 remoteCD = NULL;
00222                 break;
00223                 
00224               case L_LOST:
00225                 channel->channel->ChooseChannel(hs_header->channel);
00226                 LinkTable[linknum].LinkPtr->Restart(L_CONNECTING, 
00227                                                     channel->channel->GetStartingCD(), 
00228                                                     remoteCD);
00229                 remoteCD = NULL;
00230                 linked = true;
00231               }
00232             }
00233             break;
00234             
00235           default:
00236             cout << "\nFinder received unknown msg!";
00237           }
00238           delete hs_header;
00239           hs_header = NULL;
00240         }
00241       }
00242     }
00243 
00244     new_ts = GetTime();
00245     
00246     // multicast channel reactivation
00247     if ((starting_timer) && ((new_ts - start_ts) > STARTING_TIMER_VAL)) {
00248       dynamic_channels[0].active = true;
00249       starting_timer = false;
00250       cout << "\nreactivating ip channel!!!!!!\n";
00251     }
00252 
00253     // Periodical part:
00254     // If we haven't received a message and our period wasn't expired -> wait
00255     // Advise others agora that we are alive
00256     if (!received || ((passed = new_ts - old_ts) >= FINDER_PERIOD)) {
00257       if ((passed) < FINDER_PERIOD)
00258         Delay(FINDER_PERIOD - passed);
00259       
00260       old_ts = GetTime();
00261       
00262       // If the previous NS_NOTIFY message was sent more than NOTIFY_PERIOD usec ago start sendinga new message throught all the active channels.
00263       if ((new_ts - notify_ts) >= NOTIFY_PERIOD) {
00264         for (i=0; i<MAXCHANNELS; i++) {
00265           if (dynamic_channels[i].active) {
00266             try {
00267               dynamic_channels[i].channel->SendNotify();
00268             } catch(TimeOut e) {
00269               cout << "\nCan't send notify on channel " << i;
00270             } catch(ChannelError e) {
00271               cout << "\nError while sending notify on channel " << i;
00272             }
00273           }
00274         }
00275         notify_ts = old_ts;
00276       }
00277     }
00278   } catch (Exception e) {
00279     cout << "\nFinder exception!";
00280     exit(0);
00281   }
00282 }
00283 
00284 
00287 CommData* DCDT_Finder::CreateLink(int r_id, int status, CommData *l_cd, CommData *r_cd, unsigned int profile)
00288 {
00289   DCDT_LinkRx *newLink;
00290   int newLinkID;
00291   
00292   newLinkID = GetFirstFreeLinkTableSlot();
00293   newLink = new DCDT_LinkRx(myAgora, newLinkID, r_id);
00294   myAgora->AddSysMember(newLink, profile);
00295   LinkTable[newLinkID].LinkPtr = newLink;
00296   l_cd = newLink->Prepare(status, l_cd, r_cd);
00297 
00298   TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Link created!"));
00299   myAgora->ActivateMember(newLink);
00300   return l_cd;
00301 }
00302 
00305 int DCDT_Finder::IsPresent(int id)
00306 {
00307   for (int i=1; i<MAXLINKS; i++) {
00308     if ((LinkTable[i].freeSlot == false) && ((LinkTable[i].LinkPtr->ReadRemoteID()) == id))
00309       return i;
00310   }
00311   return 0;
00312 }
00313 
00314 
00315 void DCDT_Finder::CloseLink(int id) {
00316   TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Closing LINK %i", id));
00317 
00318   if (id < 1000) {
00319     DeleteLinkTableSlot(id);
00320     myAgora->RemoveMember(LinkTable[id].LinkPtr);
00321   }
00322 }
00323 
00330 void DCDT_Finder::StaticLinks() {
00331   // SI FA RIFERIMENTO ALLA STESSA STRUTTURA DATI dynamic_channels
00332   char ch_type[4], lk_type[15];
00333   int ch_count = 1, rv;
00334   char l_ip_addr[INET_ADDRSTRLEN], r_ip_addr[INET_ADDRSTRLEN];
00335   int l_ip_port, r_ip_port;
00336   unsigned int profile;
00337   char device_name[UNIX_PATH_MAX];
00338 
00339   /* 
00340    * <CHANNEL TYPE> <"link" or "gateway" or "bridge"> <LEVEL> <LOCAL IP> <LOCAL PORT> <REMOTE IP> <REMOTE PORT>\n
00341    * <CHANNEL TYPE> can be be: IP, SER ...\n
00342    * "link" = ( ??? )\n
00343    * "bridge" = ( ??? )\n
00344    * Ex:TCP link 1 131.175.127.61 1234 198.177.56.9 1444\n
00345    * NOTE: If the configuration file contains invalid settings the system starts in STANDALONE mode\n
00346    */
00347   
00348   rv = fscanf(finder_data->fp, "%s%s", ch_type, lk_type);
00349   if (((rv > 0) && (rv < 2))) {
00350     fclose(finder_data->fp);
00351     throw Exception();
00352   }
00353   while (!feof(finder_data->fp)) {
00354     TRC_PRINT( DCDT_TRC_COMM, TRC1, ("%s %s", ch_type, lk_type));
00355 
00356     profile = MPM_SYSTEM;
00357     if (!strcmp(lk_type, "link"))
00358       profile |= MPM_LINK;
00359     if (!strcmp(lk_type, "bridge"))
00360       profile |= MPM_BRIDGE;
00361     if (!strcmp(ch_type, "ip")) {
00362       rv = fscanf(finder_data->fp, "%s%i%s%i\n", l_ip_addr, &l_ip_port, r_ip_addr, &r_ip_port);
00363       if ((rv < 4) || (rv == EOF)) {
00364         fclose(finder_data->fp);
00365         throw Exception();
00366       }
00367 
00368       TRC_PRINT( DCDT_TRC_COMM, TRC1, (" %s %i %s %i", l_ip_addr, l_ip_port, r_ip_addr, r_ip_port));
00369 
00370       dynamic_channels[ch_count].active = true;
00371       dynamic_channels[ch_count].dynamic = false;
00372       dynamic_channels[ch_count].channel = new DCDT_Channel_IP(myAgoraID);
00373       ((DCDT_Channel_IP*)dynamic_channels[ch_count].channel)->StaticPrepare(profile, l_ip_addr, htons((short)l_ip_port), r_ip_addr, htons((short)r_ip_port));
00374 
00375       ch_count++;
00376     }
00377     else if (!strcmp(ch_type, "ser")) {
00378       rv = fscanf(finder_data->fp, "%s", device_name);
00379       if ((rv == 0) || (rv == EOF)) {
00380         fclose(finder_data->fp);
00381         throw Exception();
00382       }
00383 
00384       TRC_PRINT( DCDT_TRC_COMM, TRC1, (" %s", device_name));
00385       dynamic_channels[ch_count].active = true;
00386       dynamic_channels[ch_count].dynamic = false;
00387       dynamic_channels[ch_count].channel = new DCDT_Channel_Serial(myAgoraID);
00388       ((DCDT_Channel_Serial*)dynamic_channels[ch_count].channel)->StaticPrepare(profile, device_name);
00389 
00390       ch_count++;
00391     }
00392 
00393     rv = fscanf(finder_data->fp, "%s%s", ch_type, lk_type);
00394     if ((rv > 0) && (rv < 2)) {
00395       fclose(finder_data->fp);
00396       throw Exception();
00397     }
00398   }
00399 
00400   if (--ch_count)
00401     starting_timer = true;
00402 }
00403 
00404 #endif //define

Generated on Sun Jun 19 10:35:50 2005 for dcdt by  doxygen 1.3.9.1