00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef LINKRX_CPP
00026 #define LINKRX_CPP
00027
00028 using namespace std;
00029
00030 #include <DCDT_LinkRx.h>
00031 #include <DCDT_Finder.h>
00032
00033 DCDT_LinkRx::DCDT_LinkRx(DCDT_Agora *agora, int id, int r_id) : DCDT_Member(agora)
00034 {
00035 linkID = id;
00036 remoteID = r_id;
00037 status = restart_counter = 0;
00038 for (int i=0; i<NUMINT_MSGTYPE; i++)
00039 mymask[i] = 0;
00040 localCD = NULL;
00041 remoteCD = NULL;
00042 msgrcv = NULL;
00043 LinkTxPtr = NULL;
00044 }
00045
00046 void DCDT_LinkRx::Init()
00047 {
00048 }
00049
00055 CommData* DCDT_LinkRx::Prepare(int stat, CommData *l_cd, CommData *r_cd)
00056 {
00057 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Preparing LINKRX - Status %d",stat));
00058 channel = l_cd->CreateChannel(myAgoraID);
00059 channel->Prepare(l_cd, r_cd);
00060 channel->Open(creation_status = status = stat);
00061 channel->SetTimers();
00062 return channel->GetCommData();
00063 }
00064
00065
00077 void DCDT_LinkRx::DoYourJob(int par)
00078 {
00079 try {
00080 switch (status) {
00081
00082 case L_WAITING:
00083 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("LINKRX %i channel waiting", linkID));
00084 try {
00085 channel->WaitConn();
00086 } catch (TimeOut t) {
00087 if (status != L_RESETTOCONN)
00088 throw;
00089 else break;
00090 }
00091
00092
00093
00094 status = L_WORKING;
00095 channel->SetTimers();
00096 channel->ReceiveTimerOff();
00097 if (LinkTxPtr) {
00098 LinkTxPtr->Restart(channel);
00099 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Restarting LinkTx"));
00100 }
00101 else {
00102 LinkTxPtr = new DCDT_LinkTx(myAgora, linkID, remoteID, this);
00103 myAgora->AddSysMember(LinkTxPtr, Profile);
00104 LinkTxPtr->Prepare(channel);
00105 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("LinkTx created!"));
00106 myAgora->ActivateMember(LinkTxPtr);
00107 }
00108 break;
00109
00110 case L_CONNECTING:
00111 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("LINKRX %i Wow, we're connecting!!!", linkID));
00112 channel->StartConn();
00113 status = L_WORKING;
00114 channel->SetTimers();
00115 channel->ReceiveTimerOff();
00116 if (LinkTxPtr) {
00117 LinkTxPtr->Restart(channel);
00118 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Restarting LinkTx"));
00119 }
00120 else {
00121 LinkTxPtr = new DCDT_LinkTx(myAgora, linkID, remoteID, this);
00122 myAgora->AddSysMember(LinkTxPtr, Profile);
00123 LinkTxPtr->Prepare(channel);
00124 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("LinkTx created!"));
00125 myAgora->ActivateMember(LinkTxPtr);
00126 }
00127 break;
00128
00129 case L_RESETTOCONN:
00130 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("Reset LINKRX %i to L_CONNECTING", linkID));
00131 channel->Stop();
00132 channel->Dispose();
00133 Restart(L_CONNECTING, localCD, remoteCD);
00134 break;
00135
00136 case L_WORKING:
00137 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("LINKRX %i WORKING", linkID));
00138
00139
00140 if ((msgrcv = channel->Receive())) {
00141 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("LINKRX %i received remote msg type %i", linkID, msgrcv->ReadType()));
00142 if ((msgrcv->ReadType()) == MT_SUBSUPD) {
00143 LinkTxPtr->UnSubscribeAll();
00144 subs_mask_rcv = (unsigned int*)msgrcv->GetPayload();
00145 LinkTxPtr->Subscribe(subs_mask_rcv);
00146 delete msgrcv;
00147 }
00148 else if ((msgrcv->ReadType()) == MT_KEEPALIVE) {
00149 delete msgrcv;
00150 }
00151 else {
00152 LinkTxPtr->ShareMsg(msgrcv);
00153 }
00154 }
00155 break;
00156
00157 case L_STOPPING:
00158 channel->Stop();
00159 channel->Dispose();
00160 lost_counter = 0;
00161 status = L_LOST;
00162
00163 case L_LOST:
00164 if ((lost_counter++) == LINK_LOST_NUM) {
00165 Close();
00166 myAgora->RemoveMember(LinkTxPtr);
00167 myFinder->CloseLink(linkID);
00168 break;
00169 }
00170 Delay(LINK_LOST_TIMER);
00171 }
00172 } catch(TimeOut e) {
00173 cout << "\nTimeout on linkrx " << linkID << "\nConnection lost";
00174 status = L_STOPPING;
00175 LinkTxPtr->UnSubscribeAll();
00176 LinkTxPtr->ChangeStatus(L_LOST);
00177 } catch(ConnError e) {
00178 cout << "\nError on linkrx " << linkID << "\nerrno=" << e.errval << "\nConnection lost";
00179 status = L_STOPPING;
00180 if ( LinkTxPtr ) {
00181 LinkTxPtr->UnSubscribeAll();
00182 LinkTxPtr->ChangeStatus(L_LOST);
00183 }
00184 } catch(ChannelError e) {
00185 cout << "\nError on linkrx " << linkID << "\nerrno=" << e.errval << "\nConnection lost";
00186 status = L_STOPPING;
00187 LinkTxPtr->UnSubscribeAll();
00188 LinkTxPtr->ChangeStatus(L_LOST);
00189 } catch(Exception e) {
00190 cout << "\nException! Closing linkrx " << linkID << "\n";
00191 Close();
00192 myAgora->RemoveMember(LinkTxPtr);
00193 myFinder->CloseLink(linkID);
00194 }
00195 }
00196
00197
00198 CommData* DCDT_LinkRx::Restart(int re_status, CommData *l_cd, CommData *r_cd)
00199 {
00200 cout << "\nRestarting LINKRX " << linkID;
00201 channel->Prepare(l_cd, r_cd);
00202 channel->Reopen(creation_status = status = re_status);
00203 l_cd = channel->GetCommData();
00204 channel->SetTimers();
00205 return l_cd;
00206 }
00207
00208 CommData* DCDT_LinkRx::GetCommData()
00209 {
00210 return channel->GetCommData();
00211 }
00212
00213 void DCDT_LinkRx::ResetToConnecting(CommData *l_cd, CommData *r_cd)
00214 {
00215 SetCDs(l_cd, r_cd);
00216 status = L_RESETTOCONN;
00217 }
00218
00219 void DCDT_LinkRx::SetCDs(CommData *l_cd, CommData *r_cd)
00220 {
00221 if (localCD)
00222 delete localCD;
00223 localCD = l_cd;
00224 if (remoteCD)
00225 delete remoteCD;
00226 remoteCD = r_cd;
00227 }
00228
00229 void DCDT_LinkRx::Close()
00230 {
00231 channel->Close();
00232 }
00233
00234 #endif
00235