00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef TCPSOCKET_H
00026 #define TCPSOCKET_H
00027
00028 #include <Socket.h>
00029 #include <netdb.h>
00030 #include <netinet/tcp.h>
00031
00032 class TCPSocket : public Socket
00033 {
00034 public:
00035 TCPSocket() {};
00036 ~TCPSocket() {};
00037
00038 inline void Open(int fd = 0);
00039 inline void Send(const DCDT_Msg* msg);
00040 inline DCDT_Msg* Receive();
00041 inline void Listen(int qlen = 5);
00042 inline TCPSocket* Accept();
00043 inline void SetBufs(int rcvbuf, int sndbuf, int rcvlowat, int sndlowat);
00044 inline void SetSegSize(int size);
00045
00046 };
00047
00053 inline void TCPSocket::Open(int fd)
00054 {
00055 struct sockaddr_in *sa=NULL;
00056 socklen_t salen = 0;
00057 if (opened)
00058 return;
00059 if (fd) {
00060 sockfd = fd;
00061 if (getpeername(sockfd, (struct sockaddr *) sa, &salen) < 0)
00062 throw ChannelError(errno);
00063 connected = 1;
00064 }
00065 else if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
00066 throw ChannelError(errno);
00067 opened = 1;
00068
00069
00070
00071
00072 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCPSocket successfully opened"));
00073 }
00074
00085 inline void TCPSocket::Send(const DCDT_Msg* msg)
00086 {
00087 fd_set wset;
00088 struct timeval tv, *tv_ptr = NULL;
00089 int sent, tosend = MSGHEADER_LEN;
00090 char *sending_ptr;
00091
00092 if (!opened || !connected)
00093 return;
00094
00095
00096 sending_ptr = (char*)msg->GetHeader();
00097 while (tosend) {
00098 FD_ZERO(&wset);
00099 FD_SET(sockfd, &wset);
00100 if (unblock_send_flag) {
00101 tv.tv_sec = 0;
00102 tv.tv_usec = 0;
00103 tv_ptr = &tv;
00104 }
00105 else if (tsend_flag) {
00106 tv.tv_sec = tsend_value.tv_sec;
00107 tv.tv_usec = tsend_value.tv_usec;
00108 tv_ptr = &tv;
00109 }
00110 else if (tlost_flag) {
00111 tv.tv_sec = tlost_value.tv_sec;
00112 tv.tv_usec = tlost_value.tv_usec;
00113 tv_ptr = &tv;
00114 }
00115 switch (select(sockfd+1, NULL, &wset, NULL, tv_ptr)) {
00116 case 0: throw TimeOut();
00117 case -1: throw ConnError(errno);
00118 }
00119
00120 if ((sent = write(sockfd, sending_ptr, tosend)) < 0)
00121 throw ConnError();
00122
00123 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP header byte sent: %i", sent));
00124
00125 tosend -= sent;
00126 sending_ptr += sent;
00127 }
00128
00129
00130 sending_ptr = (char*)msg->GetPayload();
00131 tosend = msg->ReadPayloadLen();
00132 while (tosend) {
00133 FD_ZERO(&wset);
00134 FD_SET(sockfd, &wset);
00135 if (tlost_flag) {
00136 tv.tv_sec = tlost_value.tv_sec;
00137 tv.tv_usec = tlost_value.tv_usec;
00138 tv_ptr = &tv;
00139 }
00140 switch (select(sockfd+1, NULL, &wset, NULL, tv_ptr)) {
00141 case 0:
00142 case -1: throw ConnError(errno);
00143 }
00144
00145 if ((sent = write(sockfd, sending_ptr, tosend)) < 0)
00146 throw ConnError();
00147
00148 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP payload byte sent: %i", sent));
00149
00150 tosend -= sent;
00151 sending_ptr += sent;
00152 }
00153
00154 }
00155
00164 inline DCDT_Msg*
00165 TCPSocket::Receive()
00166 {
00167 DCDT_MsgHeader *msg_header;
00168 DCDT_Msg *msg;
00169 int plen;
00170 void *msg_payload;
00171
00172 char *index;
00173 int nread = 0, toread = MSGHEADER_LEN;
00174 fd_set rset;
00175 struct timeval tv, *tv_ptr = NULL;
00176
00177 if (!opened || !connected)
00178 return NULL;
00179
00180
00181 FD_ZERO(&rset);
00182 FD_SET(sockfd, &rset);
00183 if (unblock_recv_flag)
00184 {
00185 tv.tv_sec = 0;
00186 tv.tv_usec = 0;
00187 tv_ptr = &tv;
00188 }
00189 else if (trecv_flag)
00190 {
00191 tv.tv_sec = trecv_value.tv_sec;
00192 tv.tv_usec = trecv_value.tv_usec;
00193 tv_ptr = &tv;
00194 }
00195 else if (tlost_flag)
00196 {
00197 tv.tv_sec = tlost_value.tv_sec;
00198 tv.tv_usec = tlost_value.tv_usec;
00199 tv_ptr = &tv;
00200 }
00201 switch (select(sockfd+1, &rset, NULL, NULL, tv_ptr))
00202 {
00203 case 0: return NULL;
00204 case -1: throw ConnError(errno);
00205 }
00206
00207
00208 msg_header = new DCDT_MsgHeader;
00209 index = (char*)msg_header;
00210 if ((nread = read(sockfd, index, toread)) < 0)
00211 {
00212 delete msg_header;
00213 throw ConnError(errno);
00214 }
00215 else if (nread == 0)
00216 {
00217 delete msg_header;
00218 throw ConnError();
00219 }
00220 index += nread;
00221
00222 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP header byte read: %i", nread));
00223
00224 toread -= nread;
00225
00226 while ( toread )
00227 {
00228 FD_ZERO(&rset);
00229 FD_SET(sockfd, &rset);
00230 if (tlost_flag)
00231 {
00232 tv.tv_sec = tlost_value.tv_sec;
00233 tv.tv_usec = tlost_value.tv_usec;
00234 tv_ptr = &tv;
00235 }
00236 switch (select(sockfd+1, &rset, NULL, NULL, tv_ptr))
00237 {
00238 case 0:
00239 case -1: throw ConnError(errno);
00240 }
00241 if ((nread = read(sockfd, index, toread)) < 0)
00242 {
00243 delete msg_header;
00244 throw ConnError(errno);
00245 }
00246 else if (nread == 0)
00247 {
00248 delete msg_header;
00249 throw ConnError();
00250 }
00251 index += nread;
00252
00253 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP header byte read: %i", nread));
00254
00255 toread -= nread;
00256 }
00257 msg = new DCDT_Msg(msg_header);
00258
00259
00260 msg_payload = malloc(plen = msg->ReadPayloadLen());
00261 index = (char*)msg_payload;
00262 toread = plen;
00263 while ( toread )
00264 {
00265 if (tlost_flag)
00266 {
00267 tv.tv_sec = tlost_value.tv_sec;
00268 tv.tv_usec = tlost_value.tv_usec;
00269 tv_ptr = &tv;
00270 }
00271 switch (select(sockfd+1, &rset, NULL, NULL, tv_ptr))
00272 {
00273 case 0:
00274 case -1: throw ConnError(errno);
00275 }
00276 if ((nread = read(sockfd, index, toread)) < 0)
00277 {
00278 free(msg_payload);
00279 delete msg_header;
00280 throw ConnError(errno);
00281 }
00282 else if (nread == 0)
00283 {
00284 free(msg_payload);
00285 delete msg_header;
00286 throw ConnError();
00287 }
00288 index += nread;
00289
00290 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP payload byte read: %i", nread));
00291 toread -= nread;
00292 }
00293 msg->SetPayload(msg_payload, plen);
00294
00295 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP message successfully received"));
00296
00297 return msg;
00298 }
00299
00304 inline void TCPSocket::Listen(int qlen)
00305 {
00306 if (!opened)
00307 return;
00308 if (listen(sockfd, qlen) < 0)
00309 throw ChannelError(errno);
00310
00311 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP socket listening"));
00312 }
00313
00323 inline TCPSocket*
00324 TCPSocket::Accept()
00325 {
00326 int conn, addrlen;
00327 struct sockaddr_in sinremote;
00328 TCPSocket *s=NULL;
00329 fd_set rset;
00330 struct timeval tv, *tv_ptr = NULL;
00331
00332 if (!opened)
00333 return NULL;
00334
00335 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("TCP socket accepting"));
00336
00337 addrlen = sizeof(sinremote);
00338
00339 FD_ZERO(&rset);
00340 FD_SET(sockfd, &rset);
00341 if (tconn_flag)
00342 {
00343 tv.tv_sec = tconn_value.tv_sec;
00344 tv.tv_usec = tconn_value.tv_usec;
00345 tv_ptr = &tv;
00346 }
00347 switch (select(sockfd+1, &rset, NULL, NULL, tv_ptr))
00348 {
00349 case 0: throw TimeOut();
00350 case -1: throw ConnError(errno);
00351 }
00352
00353 if ((conn = accept(sockfd, (struct sockaddr*) &sinremote, (socklen_t*) &addrlen)) < 0)
00354 throw ConnError(errno);
00355 try
00356 {
00357 s = new TCPSocket();
00358 s->Open(conn);
00359 }
00360 catch (ChannelError e)
00361 {
00362 delete s;
00363 throw;
00364 }
00365 if (tconn_flag) {
00366 s->SetConnTimer(tconn_value.tv_sec + tconn_value.tv_usec);
00367 s->ConnTimerOn();
00368 }
00369 if (tlost_flag) {
00370 s->SetLostTimer(tlost_value.tv_sec + tlost_value.tv_usec);
00371 s->LostTimerOn();
00372 }
00373 if (trecv_flag) {
00374 s->SetReceiveTimer(trecv_value.tv_sec + trecv_value.tv_usec);
00375 s->ReceiveTimerOn();
00376 }
00377 if (tsend_flag) {
00378 s->SetSendTimer(tsend_value.tv_sec + tsend_value.tv_usec);
00379 s->SendTimerOn();
00380 }
00381 return s;
00382 }
00383
00384 inline void TCPSocket::SetBufs(int rcvbuf, int sndbuf, int rcvlowat, int sndlowat)
00385 {
00386 GetOption(SOL_SOCKET, SO_RCVBUF);
00387 if (rcvbuf) {
00388 if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0)
00389 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("Warning: can't set receive buffer for tcp socket"));
00390 }
00391 GetOption(SOL_SOCKET, SO_RCVBUF);
00392 GetOption(SOL_SOCKET, SO_SNDBUF);
00393 if (sndbuf) {
00394 if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) < 0)
00395 TRC_PRINT(DCDT_TRC_COMM, TRC0, ("Warning: can't set send buffer for tcp socket"));
00396 }
00397 GetOption(SOL_SOCKET, SO_SNDBUF);
00398 GetOption(SOL_SOCKET, SO_RCVLOWAT);
00399 if (rcvlowat) {
00400 if (setsockopt(sockfd, SOL_SOCKET, SO_RCVLOWAT, &rcvlowat, sizeof(rcvlowat)) < 0)
00401 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("Warning: can't set receive low water mark for tcp socket"));
00402 }
00403 GetOption(SOL_SOCKET, SO_RCVLOWAT);
00404 GetOption(SOL_SOCKET, SO_SNDLOWAT);
00405 if (sndlowat) {
00406 if (setsockopt(sockfd, SOL_SOCKET, SO_SNDLOWAT, &sndlowat, sizeof(sndlowat)) < 0)
00407 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("Warning: can't set send low water mark for tcp socket"));
00408 }
00409 GetOption(SOL_SOCKET, SO_SNDLOWAT);
00410 }
00411
00412 inline void TCPSocket::SetSegSize(int size)
00413 {
00414 GetOption(IPPROTO_TCP, TCP_MAXSEG);
00415 if (setsockopt(sockfd, IPPROTO_TCP, TCP_MAXSEG, &size, sizeof(size)) < 0)
00416 TRC_PRINT( DCDT_TRC_COMM, TRC0, ("Warning: can't set maximum segment size for tcp socket"));
00417 GetOption(IPPROTO_TCP, TCP_MAXSEG);
00418 }
00419
00420 #endif