00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef DCDT_CHANNEL_IP_H
00026 #define DCDT_CHANNEL_IP_H
00027
00028 #include <UDPSocket.h>
00029 #include <TCPSocket.h>
00030 #include <DCDT_Channel.h>
00031 #include <DCDT_ctrace.h>
00032
00033 #define IP_LOST_TIMER_VAL 10000000
00034 #define IP_SEND_TIMER_VAL 1000000
00035 #define IP_RECV_TIMER_VAL 100000
00036 #define IP_CONN_TIMER_VAL 10000000
00037
00038 #define SNDLOWAT_VAL 500
00039 #define SNDBUF_VAL 10*SNDLOWAT_VAL
00040 #define MSS_VAL 500
00041
00042 typedef struct _Data_IP {
00043 int tcp_addr, udp_addr;
00044 int tcp_port, udp_port;
00045 } Data_IP;
00046 #define DATA_IP_LEN sizeof(Data_IP)
00047
00048 class CommData_IP;
00049
00050 class DCDT_Channel_IP : public DCDT_Channel {
00051 public:
00052 DCDT_Channel_IP(int AgoraID) : DCDT_Channel(AgoraID) {
00053 udp_sock = NULL;
00054 tcp_sock1 = tcp_sock2 = tcp_wsock = NULL;
00055 notify.payload_len = answer.payload_len = DATA_IP_LEN;
00056 localCD = remoteCD = NULL;
00057 startCD = NULL;
00058 udp_ready = false;
00059 };
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 inline void Open(int link_status = 0);
00072 inline void Reopen(int link_status);
00073 inline void Prepare(CommData *local, CommData *remote);
00074 inline void ChooseChannel(int channel){channel=0;};
00075 inline void DynamicPrepare(int local_addr, int mc_addr, short port);
00076 inline void StaticPrepare(unsigned int prof, char *local_addr, short local_port, char *remote_addr, short remote_port);
00077 inline void WaitConn();
00078 inline void StartConn();
00079 inline void Send(const DCDT_Msg *msg);
00080 inline void SendNotify();
00081 inline void SendAnswer(CommData* to, CommData* local);
00082 inline DCDT_Msg* Receive();
00083 inline void ReceiveHS(HSMsgHeader*& header, CommData*& remote);
00084 inline void Dispose();
00085 inline void Close();
00086 inline void KeepAlive(){};
00087 inline CommData* GetCommData();
00088 inline CommData* GetStartingCD();
00089 inline unsigned int ReadStartingProfile();
00090 inline void SetTimers();
00091 inline void SetLostTimer(int usec);
00092 inline void LostTimerOn();
00093 inline void LostTimerOff();
00094 inline void SetConnTimer(int usec);
00095 inline void ConnTimerOn();
00096 inline void ConnTimerOff();
00097 inline void SetSendTimer(int usec);
00098 inline void SendTimerOn();
00099 inline void SendTimerOff();
00100 inline void UnblockSend();
00101 inline void SetReceiveTimer(int usec);
00102 inline void ReceiveTimerOn();
00103 inline void ReceiveTimerOff();
00104 inline void UnblockReceive();
00105 void Restart(int link_status);
00106 void Stop();
00107
00108 private:
00109 bool
00111 dynamic,
00113 udp_ready;
00114
00115 unsigned int profile;
00116 UDPSocket *udp_sock;
00117 TCPSocket
00119 *tcp_sock1,
00121 *tcp_sock2,
00123 *tcp_wsock;
00124
00125 int multi_addr, local_addr;
00126 short multi_port;
00127 CommData_IP
00129 *startCD,
00131 *localCD,
00133 *remoteCD;
00134
00136 DCDT_Msg *msg;
00137 int fd_tcp, fd_udp;
00138 fd_set set;
00139 };
00140
00141 class CommData_IP : public CommData {
00142 public:
00143 Data_IP* data;
00144
00145 CommData_IP() {
00146 data = new Data_IP();
00147 memset ( data, 0, sizeof(Data_IP) );
00148 }
00149
00150 CommData_IP(int t_a, int t_p, int u_a, int u_p) {
00151 data = new Data_IP();
00152 data->tcp_addr = t_a;
00153 data->tcp_port = t_p;
00154 data->udp_addr = u_a;
00155 data->udp_port = u_p;
00156 }
00157
00158 CommData_IP(Data_IP* d) {
00159
00160 data = new Data_IP();
00161 memcpy ( data, d, sizeof(Data_IP) );
00162 }
00163
00164 CommData_IP(CommData_IP *cd) {
00165 data = new Data_IP();
00166 data->tcp_addr = cd->data->tcp_addr;
00167 data->tcp_port = cd->data->tcp_port;
00168 data->udp_addr = cd->data->udp_addr;
00169 data->udp_port = cd->data->udp_port;
00170 }
00171
00172 CommData_IP& operator=(const CommData_IP& cd) {
00173 data->tcp_addr = cd.data->tcp_addr;
00174 data->tcp_port = cd.data->tcp_port;
00175 data->udp_addr = cd.data->udp_addr;
00176 data->udp_port = cd.data->udp_port;
00177 return (*this);
00178 }
00179
00180 DCDT_Channel* CreateChannel(int AgoraID) {
00181 return (new DCDT_Channel_IP(AgoraID));
00182 }
00183
00184 virtual ~CommData_IP() {
00185 if ( data )
00186 delete data;
00187 }
00188 };
00189
00194 inline void DCDT_Channel_IP::Prepare(CommData *local, CommData *remote)
00195 {
00196 TRC_PRINT ( DCDT_TRC_COMM, TRC1, ("Prepare channel IP") );
00197 if (local) {
00198 if (localCD)
00199 delete localCD;
00200 localCD = new CommData_IP((CommData_IP*)local);
00201
00202 TRC_PRINT ( DCDT_TRC_COMM, TRC1, ("localCD: tcp_addr=%x tcp_port=%hx udp_addr=%x udp_port=%hx", localCD->data->tcp_addr, localCD->data->tcp_port, localCD->data->udp_addr, localCD->data->udp_port));
00203 } else
00204 localCD = new CommData_IP();
00205
00206 if (remote) {
00207 if (remoteCD)
00208 delete remoteCD;
00209 remoteCD = new CommData_IP((CommData_IP*)remote);
00210
00211 TRC_PRINT ( DCDT_TRC_COMM, TRC1, ("remoteCD: tcp_addr=%x tcp_port=%hx udp_addr=%x udp_port=%hx", remoteCD->data->tcp_addr, remoteCD->data->tcp_port, remoteCD->data->udp_addr, remoteCD->data->udp_port));
00212 } else
00213 remoteCD = new CommData_IP();
00214
00215 }
00216
00221 inline void DCDT_Channel_IP::DynamicPrepare(int l_addr, int mc_addr, short mc_port)
00222 {
00223 dynamic = true;
00224 profile = MPM_SYSTEM | MPM_LINK;
00225
00226 if ( udp_sock )
00227 delete udp_sock;
00228
00229 udp_sock = new UDPSocket();
00230 udp_sock->Open();
00231 udp_sock->MC_Set((multi_addr = mc_addr), local_addr = l_addr);
00232
00233 udp_sock->Bind(0, (multi_port = mc_port));
00234
00235 localCD = new CommData_IP();
00236 localCD->data->udp_addr = local_addr;
00237 localCD->data->udp_port = mc_port;
00238
00239 TRC_PRINT ( DCDT_TRC_COMM, TRC1, ("UDP localCD: addr=%x port=%hx", localCD->data->udp_addr, localCD->data->udp_port));
00240
00241 udp_sock->SetReceiveTimer(0);
00242 udp_sock->ReceiveTimerOn();
00243 udp_sock->SetLostTimer(IP_LOST_TIMER_VAL);
00244 udp_sock->LostTimerOn();
00245 udp_sock->SetConnTimer(0);
00246 udp_sock->ConnTimerOn();
00247
00248 startCD = new CommData_IP();
00249 startCD->data->tcp_addr = startCD->data->udp_addr = local_addr;
00250 startCD->data->tcp_port = startCD->data->udp_port = 0;
00251
00252 }
00253
00258 inline void DCDT_Channel_IP::StaticPrepare(unsigned int prof, char *l_addr, short l_port, char *r_addr, short r_port)
00259 {
00260 dynamic = false;
00261 profile = prof;
00262
00263
00264 localCD = new CommData_IP();
00265 inet_pton(AF_INET, l_addr, &localCD->data->udp_addr);
00266 local_addr = localCD->data->udp_addr;
00267 localCD->data->udp_port = l_port;
00268
00269 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("UDP localCD: addr=%x port=%hx", localCD->data->udp_addr, localCD->data->udp_port));
00270
00271
00272 remoteCD = new CommData_IP();
00273 inet_pton(AF_INET, r_addr, &remoteCD->data->udp_addr);
00274 remoteCD->data->udp_port = r_port;
00275
00276 udp_sock = new UDPSocket();
00277 udp_sock->Open();
00278 udp_sock->Bind(localCD->data->udp_addr, (short)localCD->data->udp_port);
00279
00280 udp_sock->SetReceiveTimer(0);
00281 udp_sock->ReceiveTimerOn();
00282 udp_sock->SetLostTimer(IP_LOST_TIMER_VAL);
00283 udp_sock->LostTimerOn();
00284 udp_sock->SetConnTimer(0);
00285 udp_sock->ConnTimerOn();
00286
00287 startCD = new CommData_IP();
00288 startCD->data->tcp_addr = startCD->data->udp_addr = localCD->data->udp_addr;
00289 startCD->data->tcp_port = startCD->data->udp_port = 0;
00290 }
00291
00296 inline void DCDT_Channel_IP::Open(int link_status )
00297 {
00298 short port;
00299 DCDT_Msg m(1,1);
00300
00301 tcp_sock1 = new TCPSocket();
00302 tcp_sock1->Open();
00303 tcp_sock1->Bind(localCD->data->tcp_addr, (short)localCD->data->tcp_port);
00304 tcp_sock1->SetBufs(0, (int)SNDBUF_VAL/2, 0, SNDLOWAT_VAL);
00305
00306
00307 if (link_status == L_WAITING)
00308 tcp_sock1->Listen();
00309 tcp_sock1->GetCommData(localCD->data->tcp_addr, port);
00310 localCD->data->tcp_port = (int)port;
00311
00312 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("TCP socket bound to: addr=%x port=%hx", localCD->data->tcp_addr, localCD->data->tcp_port));
00313 tcp_wsock = tcp_sock1;
00314
00315 udp_sock = new UDPSocket();
00316 udp_sock->Open();
00317 udp_sock->Bind(localCD->data->udp_addr, (short)localCD->data->udp_port);
00318 udp_sock->Send(&m, "127.0.0.1", 1234);
00319 udp_sock->GetCommData(localCD->data->udp_addr, port);
00320 localCD->data->udp_port = (int)port;
00321
00322 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("UDP socket bound to: addr=%x port=%hx", localCD->data->udp_addr, localCD->data->udp_port));
00323 }
00324
00327 inline void DCDT_Channel_IP::Reopen(int link_status)
00328 {
00329 short port;
00330 DCDT_Msg m(1,1);
00331
00332 tcp_sock1->Open();
00333 tcp_sock1->Bind(localCD->data->tcp_addr, (short)localCD->data->tcp_port);
00334 tcp_sock1->SetBufs(0, (int)SNDBUF_VAL/2, 0, SNDLOWAT_VAL);
00335
00336
00337 if (link_status == L_WAITING)
00338 tcp_sock1->Listen();
00339 tcp_sock1->GetCommData(localCD->data->tcp_addr, port);
00340 localCD->data->tcp_port = (int)port;
00341 tcp_wsock = tcp_sock1;
00342
00343 udp_sock->Open();
00344 udp_sock->Bind(localCD->data->udp_addr, (short)localCD->data->udp_port);
00345 udp_sock->Send(&m, "127.0.0.1", 1234);
00346 udp_sock->GetCommData(localCD->data->udp_addr, port);
00347 localCD->data->udp_port = (int)port;
00348
00349 TRC_PRINT( DCDT_TRC_COMM, TRC1, ("UDP socket bound to: addr=%x port=%hx", localCD->data->udp_addr, localCD->data->udp_port));
00350 }
00351
00353 inline CommData* DCDT_Channel_IP::GetCommData()
00354 {
00355 return localCD;
00356 }
00357
00359 inline CommData* DCDT_Channel_IP::GetStartingCD()
00360 {
00361
00362 return startCD;
00363 }
00364
00366 inline unsigned int DCDT_Channel_IP::ReadStartingProfile()
00367 {
00368 return profile;
00369 }
00370
00375 inline void DCDT_Channel_IP::WaitConn()
00376 {
00377 DCDT_Msg *m;
00378 Data_IP *d;
00379
00380
00381 tcp_sock2 = tcp_wsock->Accept();
00382 tcp_wsock = tcp_sock2;
00383 tcp_wsock->SetReceiveTimer(IP_CONN_TIMER_VAL);
00384 m = tcp_wsock->Receive();
00385
00386
00387
00388 if (m->ReadType() == MT_CONNECT) {
00389 d = (Data_IP*)m->GetPayload();
00390
00391 if (remoteCD) {
00392 remoteCD->data->udp_addr = d->udp_addr;
00393 remoteCD->data->udp_port = d->udp_port;
00394 }
00395 else
00396 remoteCD = new CommData_IP(d->tcp_addr, d->tcp_port, d->udp_addr, d->udp_port);
00397
00398 udp_sock->Connect(d->udp_addr, d->udp_port);
00399 }
00400
00401 delete m;
00402 tcp_wsock->SetReceiveTimer(IP_RECV_TIMER_VAL);
00403 fd_tcp = tcp_wsock->GetFd();
00404 fd_udp = udp_sock->GetFd();
00405 }
00406
00411 inline void DCDT_Channel_IP::StartConn()
00412 {
00413
00414 tcp_wsock->Connect(remoteCD->data->tcp_addr, remoteCD->data->tcp_port);
00415
00416 DCDT_Msg m(MT_CONNECT, 1);
00417 m.SetPayload(localCD->data, sizeof(Data_IP));
00418
00419
00420 udp_sock->Connect(remoteCD->data->udp_addr, remoteCD->data->udp_port);
00421
00422 tcp_wsock->Send(&m);
00423 fd_tcp = tcp_wsock->GetFd();
00424 fd_udp = udp_sock->GetFd();
00425
00426 }
00427
00436 inline DCDT_Msg* DCDT_Channel_IP::Receive()
00437 {
00438 msg = NULL;
00439
00440 if (udp_ready) {
00441 udp_ready = false;
00442 msg = udp_sock->Receive();
00443 } else {
00444 FD_ZERO(&set);
00445 FD_SET(fd_tcp, &set);
00446 FD_SET(fd_udp, &set);
00447 if (fd_tcp > fd_udp) {
00448 switch (select(fd_tcp+1, &set, NULL, NULL, NULL))
00449 {
00450 case 0: throw TimeOut();
00451 case -1: throw ConnError(errno);
00452 }
00453 } else {
00454 switch (select(fd_udp+1, &set, NULL, NULL, NULL))
00455 {
00456 case 0: throw TimeOut();
00457 case -1: throw ConnError(errno);
00458 }
00459 }
00460 if (FD_ISSET(fd_tcp, &set)) {
00461 msg = tcp_wsock->Receive();
00462 if (FD_ISSET(fd_udp, &set))
00463 udp_ready = true;
00464 } else if (FD_ISSET(fd_udp, &set))
00465 msg = udp_sock->Receive();
00466 }
00467
00468 return msg;
00469 }
00470
00473 inline void DCDT_Channel_IP::ReceiveHS(HSMsgHeader*& header, CommData*& remote)
00474 {
00475 void* data;
00476 udp_sock->ReceiveHS(header, data);
00477
00478 remote = new CommData_IP((Data_IP*)data);
00479 }
00480
00485 inline void DCDT_Channel_IP::Send(const DCDT_Msg *msg)
00486 {
00487 if (msg->ReadDeliveryWarranty() == TCP_WARRANTY) {
00488 if (!tcp_wsock)
00489 throw ChannelError();
00490 tcp_wsock->Send(msg);
00491 }
00492 else {
00493 if (!udp_sock)
00494 throw ChannelError();
00495 udp_sock->Send(msg);
00496 }
00497 }
00498
00499 inline void DCDT_Channel_IP::SendNotify()
00500 {
00501 TRC_PRINT ( DCDT_TRC_COMM, TRC1, ("Sending HS_NOTIFY"));
00502 notify.channel = channel_type;
00503 if (dynamic)
00504 udp_sock->SendHS(multi_addr, multi_port, ¬ify, localCD->data);
00505 else udp_sock->SendHS(remoteCD->data->udp_addr, remoteCD->data->udp_port, ¬ify, localCD->data);
00506 }
00507
00508 inline void DCDT_Channel_IP::SendAnswer(CommData* to, CommData* local)
00509 {
00510 TRC_PRINT ( DCDT_TRC_COMM, TRC1, ("Sending HS_ANSWER"));
00511 answer.channel = channel_type;
00512 udp_sock->SendHS(((CommData_IP*)to)->data->udp_addr, ((CommData_IP*)to)->data->udp_port, &answer, ((CommData_IP*)local)->data);
00513 }
00514
00515 inline void DCDT_Channel_IP::Dispose()
00516 {
00517 }
00518
00519 inline void DCDT_Channel_IP::Close()
00520 {
00521 if (tcp_sock1)
00522 tcp_sock1->Close();
00523 if (tcp_sock2)
00524 tcp_sock2->Close();
00525 tcp_wsock = NULL;
00526 if (udp_sock)
00527 udp_sock->Close();
00528 }
00529
00530 inline void DCDT_Channel_IP::SetTimers()
00531 {
00532 udp_sock->SetLostTimer(IP_LOST_TIMER_VAL);
00533 udp_sock->LostTimerOn();
00534 udp_sock->SetConnTimer(IP_CONN_TIMER_VAL);
00535 udp_sock->ConnTimerOn();
00536 udp_sock->SetSendTimer(IP_SEND_TIMER_VAL);
00537 udp_sock->SendTimerOn();
00538 udp_sock->SetReceiveTimer(IP_RECV_TIMER_VAL/2);
00539 udp_sock->ReceiveTimerOn();
00540
00541 tcp_wsock->SetLostTimer(IP_LOST_TIMER_VAL);
00542 tcp_wsock->LostTimerOn();
00543 tcp_wsock->SetConnTimer(IP_CONN_TIMER_VAL);
00544 tcp_wsock->ConnTimerOn();
00545 tcp_wsock->SetSendTimer(IP_SEND_TIMER_VAL);
00546 tcp_wsock->SendTimerOn();
00547 tcp_wsock->SetReceiveTimer(IP_RECV_TIMER_VAL/2);
00548 tcp_wsock->ReceiveTimerOn();
00549 }
00550
00551 inline void DCDT_Channel_IP::SetLostTimer(int usec)
00552 {
00553 udp_sock->SetLostTimer(usec);
00554 if (tcp_wsock)
00555 tcp_wsock->SetLostTimer(usec);
00556 }
00557
00558 inline void DCDT_Channel_IP::LostTimerOn()
00559 {
00560 udp_sock->LostTimerOn();
00561 if (tcp_wsock)
00562 tcp_wsock->LostTimerOn();
00563 }
00564
00565 inline void DCDT_Channel_IP::LostTimerOff()
00566 {
00567 udp_sock->LostTimerOff();
00568 if (tcp_wsock)
00569 tcp_wsock->LostTimerOff();
00570 }
00571
00572 inline void DCDT_Channel_IP::SetConnTimer(int usec)
00573 {
00574 udp_sock->SetConnTimer(usec);
00575 if (tcp_wsock)
00576 tcp_wsock->SetConnTimer(usec);
00577 }
00578
00579 inline void DCDT_Channel_IP::ConnTimerOn()
00580 {
00581 udp_sock->ConnTimerOn();
00582 if (tcp_wsock)
00583 tcp_wsock->ConnTimerOn();
00584 }
00585
00586 inline void DCDT_Channel_IP::ConnTimerOff()
00587 {
00588 udp_sock->ConnTimerOff();
00589 if (tcp_wsock)
00590 tcp_wsock->ConnTimerOff();
00591 }
00592
00593 inline void DCDT_Channel_IP::SetSendTimer(int usec)
00594 {
00595 udp_sock->SetSendTimer(usec);
00596 if (tcp_wsock)
00597 tcp_wsock->SetSendTimer(usec);
00598 }
00599
00600 inline void DCDT_Channel_IP::SendTimerOn()
00601 {
00602 udp_sock->SendTimerOn();
00603 if (tcp_wsock)
00604 tcp_wsock->SendTimerOn();
00605 }
00606
00607 inline void DCDT_Channel_IP::SendTimerOff()
00608 {
00609 udp_sock->SendTimerOff();
00610 if (tcp_wsock)
00611 tcp_wsock->SendTimerOff();
00612 }
00613
00614 inline void DCDT_Channel_IP::UnblockSend()
00615 {
00616 udp_sock->UnblockSend();
00617 if (tcp_wsock)
00618 tcp_wsock->UnblockSend();
00619 }
00620
00621 inline void DCDT_Channel_IP::SetReceiveTimer(int usec)
00622 {
00623 udp_sock->SetReceiveTimer(usec);
00624 if (tcp_wsock)
00625 tcp_wsock->SetReceiveTimer(usec);
00626 }
00627
00628 inline void DCDT_Channel_IP::ReceiveTimerOn()
00629 {
00630 udp_sock->ReceiveTimerOn();
00631 if (tcp_wsock)
00632 tcp_wsock->ReceiveTimerOn();
00633 }
00634
00635 inline void DCDT_Channel_IP::ReceiveTimerOff()
00636 {
00637 udp_sock->ReceiveTimerOff();
00638 if (tcp_wsock)
00639 tcp_wsock->ReceiveTimerOff();
00640 }
00641
00642 inline void DCDT_Channel_IP::UnblockReceive()
00643 {
00644 udp_sock->UnblockReceive();
00645 if (tcp_wsock)
00646 tcp_wsock->UnblockReceive();
00647 }
00648
00649 inline void DCDT_Channel_IP::Restart(int link_status)
00650 {
00651 Open(link_status);
00652 }
00653
00654 inline void DCDT_Channel_IP::Stop()
00655 {
00656 if (tcp_sock1)
00657 tcp_sock1->ForcedClose();
00658 if (tcp_sock2)
00659 tcp_sock2->ForcedClose();
00660 tcp_wsock = NULL;
00661 if (udp_sock)
00662 udp_sock->ForcedClose();
00663 }
00664
00665
00666 #endif
00667