source: flair-src/trunk/tools/FlairGCS/src/UdtSocket.cpp@ 399

Last change on this file since 399 was 399, checked in by Sanahuja Guillaume, 23 months ago

change color of status bar in gcs if high bandwidth usage is detected

File size: 10.8 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5#include "UdtSocket.h"
6#include "ConnectionLayout.h"
7#include <stdio.h>
8#include <stdlib.h>
9#include <QApplication>
10#include <QtEndian>
11#include <QDir>
12#include <QDate>
13#include "communication.h"
14#include <zlib.h>
15#include <assert.h>
16#include <QThread>
17
18#define COMPRESS_CHUNK 1024*100
19
20#ifndef WIN32
21#include <arpa/inet.h>
22#else
23#include <winsock2.h>
24#include <ws2tcpip.h>
25#endif
26
27#define COMPUTE_UDT_STATS_TIMER 2000
28#define HEARTBEAT_TIMER 200
29
30using namespace std;
31
32UdtSocket::UdtSocket(UDTSOCKET socket,QString name): QObject() {
33 this->socket = socket;
34 this->name=name;
35 stop = false;
36 destroySocket=true;
37 socketType=unknown;
38 total_received=0;
39 pktSndLossTotal=0;
40 lastpktRecvTotal=0;
41
42 bool blocking = false;
43 if (UDT::setsockopt(socket, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
44 fprintf(stderr,"UDT::setsockopt error (UDT_RCVSYN)\n");
45
46 heartbeat_timer = new QTimer(this);
47 connect(heartbeat_timer, SIGNAL(timeout()), this, SLOT(heartbeat()));
48 heartbeat_timer->start(HEARTBEAT_TIMER);
49 //heartbeat();//send directly the first one, but conflicts if this socket is for logs... TODO: delay start of watchdog timer on uav side
50 udtstats_timer = new QTimer(this);
51 connect(udtstats_timer, SIGNAL(timeout()), this, SLOT(getUTDStats()));
52 udtstats_timer->start(COMPUTE_UDT_STATS_TIMER);
53}
54
55UdtSocket::~UdtSocket() {
56 heartbeat_timer->stop();
57 udtstats_timer->stop();
58 if(destroySocket) UDT::close(socket);
59}
60
61void UdtSocket::setName(QString name) {
62 fprintf(stderr," %s is %s\n",this->name.toLocal8Bit().constData(),name.toLocal8Bit().constData());
63 this->name=name;
64}
65
66// send signal to uav, to check connectivity through a watchdog
67// this is necessary because we use udt (udp based), and we cannot check
68// disconnection of ground station
69void UdtSocket::heartbeat(void) {
70 //printf("h %s %i.%i\n",name.toLocal8Bit().constData(),QTime::currentTime().second(),QTime::currentTime().msec());
71 char data = WATCHDOG_HEADER;
72 quint64 sent=write(&data, 1,HEARTBEAT_TIMER,true);
73 if (sent != 1 && UDT::getlasterror().getErrorCode() == 2001) {
74 heartbeat_timer->stop();
75 }
76
77}
78
79void UdtSocket::getUTDStats(void) {
80 float rxRate=((float)total_received/(COMPUTE_UDT_STATS_TIMER/1000))/1000;//in Ko/s
81 total_received=0;
82
83 UDT::TRACEINFO perf;
84 if (UDT::ERROR == UDT::perfmon(socket, &perf)) {
85 fprintf(stderr,"perfmon: %s\n",UDT::getlasterror().getErrorMessage());
86 }/* else {
87 fprintf(stderr,"%s socket stats:\n",name.toLocal8Bit().constData());
88 fprintf(stderr,"total number of sent packets, including retransmissions: %i\n",perf.pktSentTotal);
89 fprintf(stderr,"total number of received packets: %i\n",perf.pktRecvTotal);
90 fprintf(stderr,"total number of lost packets, measured in the sending side: %i\n",perf.pktSndLossTotal);
91 fprintf(stderr,"total number of lost packets, measured in the receiving side: %i\n",perf.pktRcvLossTotal);
92 fprintf(stderr,"total number of retransmitted packets, measured in the sending side: %i\n",perf.pktRetransTotal);
93 fprintf(stderr,"total number of sent ACK packets: %i\n",perf.pktSentACKTotal);
94 fprintf(stderr,"total number of received ACK packets: %i\n",perf.pktRecvACKTotal);
95 fprintf(stderr,"total number of sent NAK packets: %i\n",perf.pktSentNAKTotal);
96 fprintf(stderr,"total number of received NAK packets: %i\n",perf.pktRecvNAKTotal);
97 fprintf(stderr,"round trip time: %fms\n",perf.msRTT);
98 }*/
99 bool loosingPackets=false;
100 if(perf.pktSndLossTotal>pktSndLossTotal) loosingPackets=true;
101 pktSndLossTotal=perf.pktSndLossTotal;
102 unsigned int pktPerSec=(perf.pktRecvTotal-lastpktRecvTotal)*1000/COMPUTE_UDT_STATS_TIMER;
103 QString stylesheet="color: black";
104 if(pktPerSec>=100) {
105 stylesheet="color: red";
106 }
107 else if(pktPerSec>=50) {
108 stylesheet="color: orange";
109 }
110 stats=QString("rx rate %1kB/s, %2 rx packets/s, round trip %3ms, lost packets %4").arg(rxRate,0,'f',3).arg(pktPerSec).arg(perf.msRTT,0,'f',3).arg(perf.pktSndLossTotal);
111 lastpktRecvTotal=perf.pktRecvTotal;
112 UDTStats(stats,stylesheet,loosingPackets);
113}
114
115QString UdtSocket::getUDTStats() {
116 return stats;
117}
118
119void UdtSocket::kill(void) {
120 stop = true;
121 deleteLater();
122}
123
124void UdtSocket::receiveData(void) {
125 int buf_size;
126 int opt_size;
127 UDT::getsockopt(socket, 0, UDT_RCVBUF, &buf_size, &opt_size);
128 char *buf = (char *)malloc(buf_size);
129 if (!buf) {
130 fprintf(stderr,"error malloc UdtSocket::receiveData buffer\n");
131 return;
132 }
133 char *uncompressbuf=(char *)malloc(COMPRESS_CHUNK);
134 if (!uncompressbuf) {
135 fprintf(stderr,"error malloc UdtSocket::receiveData uncompress buffer\n");
136 free(buf);
137 return;
138 }
139 //fprintf(stderr,stderr,"receiveData %x\n",thread());
140
141 while (!stop) {
142 QCoreApplication::processEvents();
143
144 //we need to use epoll?
145 //tests are showing that UDT::recvmsg is waiting for the entire timeout before returning
146 //note that in flair::core the behaviour of UDT::recvmsg timeout is as expected
147 //is it a difference between client and server??
148 //tests also show that we need to recreate the eid every time, otherwise wait returns immediately
149 int eid = UDT::epoll_create();
150 if (eid < 0) {
151 fprintf(stderr,"%s: epoll_create error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage());
152 }
153
154 if (UDT::epoll_add_usock(eid, socket) < 0) {
155 if (UDT::getlasterror().getErrorCode() == 5004) {
156 fprintf(stderr,"disconnected from %s\n",name.toLocal8Bit().constData());
157 heartbeat_timer->stop();
158 deleteLater();
159 stop=true;;
160 } else {
161 fprintf(stderr,"%s: epoll_add_usock error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage());
162 }
163 }
164
165 int num = 1;
166 UDTSOCKET readfds;
167
168 int rv = UDT::epoll_wait2(eid, &readfds, &num,NULL, NULL,100);
169 if (rv == -1) {
170 if (UDT::getlasterror().getErrorCode() != 6003)
171 fprintf(stderr,"prob %i\n", UDT::getlasterror().getErrorCode());
172 } else if(readfds==socket && num==1 && rv==1) {
173 int size;
174 QTime initTime;
175 initTime.start();
176 do {
177 //we are in "infinite" loop, so let the heartbeat run if needed
178 if(initTime.elapsed()>HEARTBEAT_TIMER) {
179 initTime.start();
180 //do not use error check of heartbeat() method: (otherwise, closes com too early)
181 char data = WATCHDOG_HEADER;
182 UDT::sendmsg(socket, &data, 1, HEARTBEAT_TIMER, true);
183 }
184
185 size=UDT::recvmsg(socket, buf, buf_size);
186
187 //fprintf(stderr,"recu %i %x\n",size,buf[0]);
188 if (size > 0) {
189 total_received+=size;
190
191 switch ((unsigned char)buf[0]) {
192 case ZLIB_HEADER: {
193 ssize_t out_size;
194 uncompressBuffer(buf, size, uncompressbuf, &out_size);
195 if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) {
196 socketType=gui;
197 QString remoteName=ConnectionLayout::getDocRootName(uncompressbuf, out_size);
198 setName(remoteName);
199 emit newConnectionLayout(remoteName);//connection is Qt::BlockingQueuedConnection
200 }
201 emit dataReady(uncompressbuf, out_size);//connection is Qt::BlockingQueuedConnection, as we have only one buffer
202 break;
203 }
204 case START_SENDING_FILES: {
205 //printf("log\n");
206 if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) {
207 socketType=log;
208 }
209 setName("log files");
210 heartbeat_timer->stop();
211 emit newFileUI(socket);
212 deleteLater();
213 stop=true;
214 size=-1;//exit from do while loop
215 destroySocket=false;
216 break;
217 }
218 case XML_HEADER:
219 if(socketType==unknown) {
220 socketType=gui;
221 QString remoteName=ConnectionLayout::getDocRootName(buf, size );
222 setName(remoteName);
223 emit newConnectionLayout(remoteName);
224 }
225 case DATA_BIG_ENDIAN:
226 case DATA_LITTLE_ENDIAN:
227 emit dataReady(buf, size );
228 break;
229 case CLOSING_CONNECTION:
230 fprintf(stderr,"%s, connection closed by remote\n",name.toLocal8Bit().constData());
231 emit dataReady(buf, size );
232 stop = true;
233 heartbeat_timer->stop();
234 deleteLater();
235 break;
236 default:
237 fprintf(stderr,"udt trame non supportée %x\n", buf[0]);
238 }
239 } else {
240 //not necessary to check, watchdog (heartbeat_timer) can do it
241 //if(UDT::getlasterror().getErrorCode()!=6002 && !stop)
242 //fprintf(stderr,"udt socket: %s\n",UDT::getlasterror().getErrorMessage());
243 //UDT::close(socket);//si deconnecté
244 //free(buf);
245 //break;
246 }
247 } while(size>0);
248 } else {
249 //not necessary to check, watchdog (heartbeat_timer) can do it
250 //fprintf(stderr,"udt socket: %s\n",UDT::getlasterror().getErrorMessage());
251 }
252 UDT::epoll_remove_usock(eid, socket);
253 UDT::epoll_release(eid);
254 }
255
256 free(uncompressbuf);
257 free(buf);
258 thread()->quit();
259}
260
261qint64 UdtSocket::write(const char *buf, qint64 size,int ttl,bool inOrder) {
262 qint64 sent = UDT::sendmsg(socket, buf, size, ttl, inOrder);
263 if (sent != size) {
264 fprintf(stderr,"sent %lld/%lld %s\n",sent ,size,UDT::getlasterror().getErrorMessage());
265 fprintf(stderr,"%s, error writting to udt (%s)\n",name.toLocal8Bit().constData(), UDT::getlasterror().getErrorMessage());
266 if (UDT::getlasterror().getErrorCode() == 2001) {
267 fprintf(stderr,"%s, closing connection\n",name.toLocal8Bit().constData());
268 stop = true;
269 heartbeat_timer->stop();
270 deleteLater();
271 }
272 }
273 return sent;
274}
275
276int UdtSocket::uncompressBuffer(char *in, ssize_t in_size, char *out,
277 ssize_t *out_size) {
278 int ret;
279 unsigned have;
280 z_stream strm;
281
282 // allocate inflate state
283 strm.zalloc = Z_NULL;
284 strm.zfree = Z_NULL;
285 strm.opaque = Z_NULL;
286 strm.avail_in = 0;
287 strm.next_in = Z_NULL;
288 ret = inflateInit(&strm);
289 if (ret != Z_OK)
290 return ret;
291
292 strm.avail_in = in_size;
293 strm.next_in = (unsigned char *)in;
294 strm.avail_out = COMPRESS_CHUNK;
295 strm.next_out = (unsigned char *)out;
296
297 ret = inflate(&strm, Z_NO_FLUSH);
298 assert(ret != Z_STREAM_ERROR); // state not clobbered
299 switch (ret) {
300 case Z_NEED_DICT:
301 ret = Z_DATA_ERROR; // and fall through
302 case Z_DATA_ERROR:
303 case Z_MEM_ERROR:
304 (void)inflateEnd(&strm);
305 return ret;
306 }
307 have = COMPRESS_CHUNK - strm.avail_out;
308 *out_size = have;
309
310 // clean up and return
311 (void)inflateEnd(&strm);
312 return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
313}
Note: See TracBrowser for help on using the repository browser.