00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef FINDER_CPP
00026 #define FINDER_CPP
00027
00028 using namespace std;
00029
00030 #include <string.h>
00031
00032 #include <DCDT_Finder.h>
00033 #include <DCDT_Agora.h>
00034
00035 DCDT_Finder::DCDT_Finder(DCDT_Agora* agora, FinderData *fd) : DCDT_Member(agora)
00036 {
00037 for(int i=1; i<MAXLINKS; i++) {
00038 DeleteLinkTableSlot(i);
00039 }
00040
00041 for (int i=0; i<MAXCHANNELS; i++) {
00042 dynamic_channels[i].active = false;
00043 dynamic_channels[i].channel = NULL;
00044 }
00045
00046 hs_header = NULL;
00047 localCD = remoteCD = NULL;
00048 finder_data = fd;
00049 starting_timer = false;
00050 passed=GetTime()-GetTime();
00051 }
00052
00053
00054 DCDT_Finder::~DCDT_Finder()
00055 {
00056 if ( finder_data ) {
00057 if ( finder_data->fp )
00058 fclose ( finder_data->fp );
00059
00060 delete finder_data;
00061 }
00062
00063 for (int idx=0; idx<MAXCHANNELS; idx++ ) {
00064 if ( dynamic_channels[idx].channel ) {
00065 delete dynamic_channels[idx].channel;
00066 }
00067 }
00068 }
00069
00070
00076 void DCDT_Finder::Init()
00077 {
00078
00079 try {
00080 dynamic_channels[0].active = true;
00081 dynamic_channels[0].dynamic = true;
00082 dynamic_channels[0].channel = new DCDT_Channel_IP(myAgoraID);
00083 ((DCDT_Channel_IP*)dynamic_channels[0].channel)->DynamicPrepare(finder_data->IfNumAddr, finder_data->McNumAddr, finder_data->McPort);
00084 } catch (ChannelError e) {
00085 cout << "\nError while preparing channel 0 (" << strerror(e.errval) << ")\n";
00086 exit(1);
00087 }
00088
00089
00090 try {
00091 StaticLinks();
00092 } catch (Exception e) {
00093 cout << "\nError while preparing static links\n";
00094 }
00095
00096 if (starting_timer) {
00097 cout << "\ndeactivating ip channel!!!!!!\n";
00098 dynamic_channels[0].active = false;
00099 start_ts = GetTime();
00100 }
00101
00102
00103 for (i=0; i<MAXCHANNELS; i++) {
00104 if (dynamic_channels[i].active) {
00105 try {
00106 dynamic_channels[i].channel->SendNotify();
00107 } catch(TimeOut e) {
00108 cout << "\nCan't send notify on channel " << i << "( Timeout )" << "\n";
00109 } catch(ChannelError e) {
00110 cout << "\nError while sending notify on channel " << i << "(" << strerror(e.errval) << ")\n";
00111 }
00112 }
00113 }
00114 old_ts = notify_ts = GetTime();
00115 }
00116
00117
00118
00119
00120
00121
00122
00123
00124 void DCDT_Finder::DoYourJob(int par)
00125 {
00126
00127
00128
00129
00130
00131
00132
00133
00134 ChannelTableElem *channel;
00135 int idx;
00136
00137
00138 linked = received = false;
00139
00140 try {
00141 for (idx=0; idx<MAXCHANNELS; idx++) {
00142 channel=&(dynamic_channels[idx]);
00143 if (channel->active) {
00144 try {
00145 channel->channel->ReceiveHS(hs_header, remoteCD);
00146 } catch (TimeOut t) {
00147 } catch (ChannelError e) {
00148 cout << "\nError while receiving on channel " << i;
00149 }
00150
00151 if (hs_header) {
00152 received = true;
00153 switch (hs_header->type) {
00154
00155 case HS_NOTIFY:
00156
00157 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Notify received from AgoraId %i", hs_header->AgoraID));
00158
00159 linknum = IsPresent(hs_header->AgoraID);
00160 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("IsPresent: %i", linknum));
00161
00162 if (!linknum) {
00163 channel->channel->ChooseChannel(hs_header->channel);
00164 localCD = CreateLink(hs_header->AgoraID, L_WAITING, channel->channel->GetStartingCD(), NULL, channel->channel->ReadStartingProfile());
00165 channel->channel->SendAnswer(remoteCD, localCD);
00166 linked = true;
00167 }
00168 else {
00169 switch (LinkTable[linknum].LinkPtr->ReadStatus()) {
00170
00171 case L_WAITING:
00172 channel->channel->SendAnswer(remoteCD, LinkTable[linknum].LinkPtr->GetCommData());
00173 break;
00174
00175 case L_LOST:
00176 channel->channel->ChooseChannel(hs_header->channel);
00177 localCD = LinkTable[linknum].LinkPtr->Restart(L_WAITING,
00178 channel->channel->GetStartingCD(),
00179 NULL);
00180 channel->channel->SendAnswer(remoteCD, localCD);
00181 linked = true;
00182
00183 }
00184 }
00185 delete remoteCD;
00186 remoteCD = NULL;
00187 break;
00188
00189 case HS_ANSWER:
00190
00191 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Answer received from AgoraId %i", hs_header->AgoraID));
00192
00193 linknum = IsPresent(hs_header->AgoraID);
00194 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("IsPresent: %i", linknum));
00195
00196 if (!linknum) {
00197 channel->channel->ChooseChannel(hs_header->channel);
00198 CreateLink(hs_header->AgoraID, L_CONNECTING, channel->channel->GetStartingCD(), remoteCD, channel->channel->ReadStartingProfile());
00199 remoteCD = NULL;
00200 linked = true;
00201 }
00202 else {
00203 switch (LinkTable[linknum].LinkPtr->ReadStatus()) {
00204
00205 case L_WAITING:
00206 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("RemoteAgoraID=%i MyAgoraID=%i", hs_header->AgoraID, myAgoraID));
00207
00208 if (hs_header->AgoraID > myAgoraID) {
00209 LinkTable[linknum].LinkPtr->ResetToConnecting(channel->channel->GetStartingCD(), remoteCD);
00210 remoteCD = NULL;
00211 }
00212 else {
00213 delete remoteCD;
00214 remoteCD = NULL;
00215 }
00216 break;
00217
00218 case L_CONNECTING:
00219 case L_RESETTOCONN:
00220 delete remoteCD;
00221 remoteCD = NULL;
00222 break;
00223
00224 case L_LOST:
00225 channel->channel->ChooseChannel(hs_header->channel);
00226 LinkTable[linknum].LinkPtr->Restart(L_CONNECTING,
00227 channel->channel->GetStartingCD(),
00228 remoteCD);
00229 remoteCD = NULL;
00230 linked = true;
00231 }
00232 }
00233 break;
00234
00235 default:
00236 cout << "\nFinder received unknown msg!";
00237 }
00238 delete hs_header;
00239 hs_header = NULL;
00240 }
00241 }
00242 }
00243
00244 new_ts = GetTime();
00245
00246
00247 if ((starting_timer) && ((new_ts - start_ts) > STARTING_TIMER_VAL)) {
00248 dynamic_channels[0].active = true;
00249 starting_timer = false;
00250 cout << "\nreactivating ip channel!!!!!!\n";
00251 }
00252
00253
00254
00255
00256 if (!received || ((passed = new_ts - old_ts) >= FINDER_PERIOD)) {
00257 if ((passed) < FINDER_PERIOD)
00258 Delay(FINDER_PERIOD - passed);
00259
00260 old_ts = GetTime();
00261
00262
00263 if ((new_ts - notify_ts) >= NOTIFY_PERIOD) {
00264 for (i=0; i<MAXCHANNELS; i++) {
00265 if (dynamic_channels[i].active) {
00266 try {
00267 dynamic_channels[i].channel->SendNotify();
00268 } catch(TimeOut e) {
00269 cout << "\nCan't send notify on channel " << i;
00270 } catch(ChannelError e) {
00271 cout << "\nError while sending notify on channel " << i;
00272 }
00273 }
00274 }
00275 notify_ts = old_ts;
00276 }
00277 }
00278 } catch (Exception e) {
00279 cout << "\nFinder exception!";
00280 exit(0);
00281 }
00282 }
00283
00284
00287 CommData* DCDT_Finder::CreateLink(int r_id, int status, CommData *l_cd, CommData *r_cd, unsigned int profile)
00288 {
00289 DCDT_LinkRx *newLink;
00290 int newLinkID;
00291
00292 newLinkID = GetFirstFreeLinkTableSlot();
00293 newLink = new DCDT_LinkRx(myAgora, newLinkID, r_id);
00294 myAgora->AddSysMember(newLink, profile);
00295 LinkTable[newLinkID].LinkPtr = newLink;
00296 l_cd = newLink->Prepare(status, l_cd, r_cd);
00297
00298 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Link created!"));
00299 myAgora->ActivateMember(newLink);
00300 return l_cd;
00301 }
00302
00305 int DCDT_Finder::IsPresent(int id)
00306 {
00307 for (int i=1; i<MAXLINKS; i++) {
00308 if ((LinkTable[i].freeSlot == false) && ((LinkTable[i].LinkPtr->ReadRemoteID()) == id))
00309 return i;
00310 }
00311 return 0;
00312 }
00313
00314
00315 void DCDT_Finder::CloseLink(int id) {
00316 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Closing LINK %i", id));
00317
00318 if (id < 1000) {
00319 DeleteLinkTableSlot(id);
00320 myAgora->RemoveMember(LinkTable[id].LinkPtr);
00321 }
00322 }
00323
00330 void DCDT_Finder::StaticLinks() {
00331
00332 char ch_type[4], lk_type[15];
00333 int ch_count = 1, rv;
00334 char l_ip_addr[INET_ADDRSTRLEN], r_ip_addr[INET_ADDRSTRLEN];
00335 int l_ip_port, r_ip_port;
00336 unsigned int profile;
00337 char device_name[UNIX_PATH_MAX];
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348 rv = fscanf(finder_data->fp, "%s%s", ch_type, lk_type);
00349 if (((rv > 0) && (rv < 2))) {
00350 fclose(finder_data->fp);
00351 throw Exception();
00352 }
00353 while (!feof(finder_data->fp)) {
00354 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("%s %s", ch_type, lk_type));
00355
00356 profile = MPM_SYSTEM;
00357 if (!strcmp(lk_type, "link"))
00358 profile |= MPM_LINK;
00359 if (!strcmp(lk_type, "bridge"))
00360 profile |= MPM_BRIDGE;
00361 if (!strcmp(ch_type, "ip")) {
00362 rv = fscanf(finder_data->fp, "%s%i%s%i\n", l_ip_addr, &l_ip_port, r_ip_addr, &r_ip_port);
00363 if ((rv < 4) || (rv == EOF)) {
00364 fclose(finder_data->fp);
00365 throw Exception();
00366 }
00367
00368 TRC_PRINT( DCDT_TRC_COMM, TRC1, (" %s %i %s %i", l_ip_addr, l_ip_port, r_ip_addr, r_ip_port));
00369
00370 dynamic_channels[ch_count].active = true;
00371 dynamic_channels[ch_count].dynamic = false;
00372 dynamic_channels[ch_count].channel = new DCDT_Channel_IP(myAgoraID);
00373 ((DCDT_Channel_IP*)dynamic_channels[ch_count].channel)->StaticPrepare(profile, l_ip_addr, htons((short)l_ip_port), r_ip_addr, htons((short)r_ip_port));
00374
00375 ch_count++;
00376 }
00377 else if (!strcmp(ch_type, "ser")) {
00378 rv = fscanf(finder_data->fp, "%s", device_name);
00379 if ((rv == 0) || (rv == EOF)) {
00380 fclose(finder_data->fp);
00381 throw Exception();
00382 }
00383
00384 TRC_PRINT( DCDT_TRC_COMM, TRC1, (" %s", device_name));
00385 dynamic_channels[ch_count].active = true;
00386 dynamic_channels[ch_count].dynamic = false;
00387 dynamic_channels[ch_count].channel = new DCDT_Channel_Serial(myAgoraID);
00388 ((DCDT_Channel_Serial*)dynamic_channels[ch_count].channel)->StaticPrepare(profile, device_name);
00389
00390 ch_count++;
00391 }
00392
00393 rv = fscanf(finder_data->fp, "%s%s", ch_type, lk_type);
00394 if ((rv > 0) && (rv < 2)) {
00395 fclose(finder_data->fp);
00396 throw Exception();
00397 }
00398 }
00399
00400 if (--ch_count)
00401 starting_timer = true;
00402 }
00403
00404 #endif //define