// %flair:license{ // This file is part of the Flair framework distributed under the // CECILL-C License, Version 1.0. // %flair:license} #include "UdtSocket.h" #include "ConnectionLayout.h" #include #include #include #include #include #include #include "communication.h" #include #include #include #define COMPRESS_CHUNK 1024*100 #ifndef WIN32 #include #else #include #include #endif #define COMPUTE_UDT_STATS_TIMER 2000 #define HEARTBEAT_TIMER 200 using namespace std; UdtSocket::UdtSocket(UDTSOCKET socket,QString name): QObject() { this->socket = socket; this->name=name; stop = false; destroySocket=true; socketType=unknown; total_received=0; pktSndLossTotal=0; bool blocking = false; if (UDT::setsockopt(socket, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0) fprintf(stderr,"UDT::setsockopt error (UDT_RCVSYN)\n"); heartbeat_timer = new QTimer(this); connect(heartbeat_timer, SIGNAL(timeout()), this, SLOT(heartbeat())); heartbeat_timer->start(HEARTBEAT_TIMER); udtstats_timer = new QTimer(this); connect(udtstats_timer, SIGNAL(timeout()), this, SLOT(getUTDStats())); udtstats_timer->start(COMPUTE_UDT_STATS_TIMER); } UdtSocket::~UdtSocket() { heartbeat_timer->stop(); udtstats_timer->stop(); if(destroySocket) UDT::close(socket); } void UdtSocket::setName(QString name) { fprintf(stderr," %s is %s\n",this->name.toLocal8Bit().constData(),name.toLocal8Bit().constData()); this->name=name; } // send signal to uav, to check connectivity through a watchdog // this is necessary because we use udt (udp based), and we cannot check // disconnection of ground station void UdtSocket::heartbeat(void) { char data = WATCHDOG_HEADER; quint64 sent=write(&data, 1,HEARTBEAT_TIMER,false); if (sent != 1 && UDT::getlasterror().getErrorCode() == 2001) { heartbeat_timer->stop(); } } void UdtSocket::getUTDStats(void) { float rxRate=((float)total_received/(COMPUTE_UDT_STATS_TIMER/1000))/1000;//in Ko/s total_received=0; UDT::TRACEINFO perf; if (UDT::ERROR == UDT::perfmon(socket, &perf)) { fprintf(stderr,"perfmon: %s\n",UDT::getlasterror().getErrorMessage()); }/* else { fprintf(stderr,"%s socket stats:\n",name.toLocal8Bit().constData()); fprintf(stderr,"total number of sent packets, including retransmissions: %i\n",perf.pktSentTotal); fprintf(stderr,"total number of received packets: %i\n",perf.pktRecvTotal); fprintf(stderr,"total number of lost packets, measured in the sending side: %i\n",perf.pktSndLossTotal); fprintf(stderr,"total number of lost packets, measured in the receiving side: %i\n",perf.pktRcvLossTotal); fprintf(stderr,"total number of retransmitted packets, measured in the sending side: %i\n",perf.pktRetransTotal); fprintf(stderr,"total number of sent ACK packets: %i\n",perf.pktSentACKTotal); fprintf(stderr,"total number of received ACK packets: %i\n",perf.pktRecvACKTotal); fprintf(stderr,"total number of sent NAK packets: %i\n",perf.pktSentNAKTotal); fprintf(stderr,"total number of received NAK packets: %i\n",perf.pktRecvNAKTotal); fprintf(stderr,"round trip time: %fms\n",perf.msRTT); }*/ bool loosingPackets=false; if(perf.pktSndLossTotal>pktSndLossTotal) loosingPackets=true; pktSndLossTotal=perf.pktSndLossTotal; stats=QString("rx rate %1kB/s, round trip %2ms, lost packets %3").arg(rxRate,0,'f',3).arg(perf.msRTT,0,'f',3).arg(perf.pktSndLossTotal); UDTStats(stats,loosingPackets); } QString UdtSocket::getUDTStats() { return stats; } void UdtSocket::kill(void) { stop = true; deleteLater(); } void UdtSocket::receiveData(void) { int buf_size; int opt_size; UDT::getsockopt(socket, 0, UDT_RCVBUF, &buf_size, &opt_size); char *buf = (char *)malloc(buf_size); if (!buf) { fprintf(stderr,"error malloc UdtSocket::receiveData buffer\n"); return; } char *uncompressbuf=(char *)malloc(COMPRESS_CHUNK); if (!uncompressbuf) { fprintf(stderr,"error malloc UdtSocket::receiveData uncompress buffer\n"); free(buf); return; } //fprintf(stderr,stderr,"receiveData %x\n",thread()); while (!stop) { QCoreApplication::processEvents(); //we need to use epoll? //tests are showing that UDT::recvmsg is waiting for the entire timeout before returning //note that in flair::core the behaviour of UDT::recvmsg timeout is as expected //is it a difference between client and server?? //tests also show that we need to recreate the eid every time, otherwise wait returns immediately int eid = UDT::epoll_create(); if (eid < 0) { fprintf(stderr,"%s: epoll_create error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage()); } if (UDT::epoll_add_usock(eid, socket) < 0) { if (UDT::getlasterror().getErrorCode() == 5004) { fprintf(stderr,"disconnected from %s\n",name.toLocal8Bit().constData()); heartbeat_timer->stop(); deleteLater(); stop=true;; } else { fprintf(stderr,"%s: epoll_add_usock error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage()); } } int num = 1; UDTSOCKET readfds; int rv = UDT::epoll_wait2(eid, &readfds, &num,NULL, NULL,100); if (rv == -1) { if (UDT::getlasterror().getErrorCode() != 6003) fprintf(stderr,"prob %i\n", UDT::getlasterror().getErrorCode()); } else if(readfds==socket && num==1 && rv==1) { int size; do { size=UDT::recvmsg(socket, buf, buf_size); //fprintf(stderr,"recu %i\n",size); if (size > 0) { total_received+=size; switch ((unsigned char)buf[0]) { case ZLIB_HEADER: { ssize_t out_size; uncompressBuffer(buf, size, uncompressbuf, &out_size); if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) { socketType=gui; QString remoteName=ConnectionLayout::getDocRootName(uncompressbuf, out_size); setName(remoteName); emit newConnectionLayout(remoteName);//connection is Qt::BlockingQueuedConnection } emit dataReady(uncompressbuf, out_size);//connection is Qt::BlockingQueuedConnection, as we have only one buffer break; } case START_SENDING_FILES: { if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) { socketType=log; } setName("log files"); heartbeat_timer->stop(); emit newFileUI(socket); deleteLater(); stop=true; destroySocket=false; break; } case XML_HEADER: if(socketType==unknown) { socketType=gui; QString remoteName=ConnectionLayout::getDocRootName(buf, size ); setName(remoteName); emit newConnectionLayout(remoteName); } case DATAS_BIG_ENDIAN: case DATAS_LITTLE_ENDIAN: emit dataReady(buf, size ); break; case CLOSING_CONNECTION: fprintf(stderr,"%s, connection closed\n",name.toLocal8Bit().constData()); emit dataReady(buf, size ); stop = true; heartbeat_timer->stop(); deleteLater(); break; default: fprintf(stderr,"trame non supportée %x\n", buf[0]); } } else { //not necessary to check, watchdog (heartbeat_timer) can do it //if(UDT::getlasterror().getErrorCode()!=6002 && !stop) //fprintf(stderr,"udt socket: %s\n",UDT::getlasterror().getErrorMessage()); //UDT::close(socket);//si deconnecté //free(buf); //break; } } while(size>0); } else { //not necessary to check, watchdog (heartbeat_timer) can do it //fprintf(stderr,"udt socket: %s\n",UDT::getlasterror().getErrorMessage()); } UDT::epoll_remove_usock(eid, socket); UDT::epoll_release(eid); } free(uncompressbuf); free(buf); thread()->quit(); } qint64 UdtSocket::write(const char *buf, qint64 size,int ttl,bool inOrder) { qint64 sent = UDT::sendmsg(socket, buf, size, ttl, inOrder); if (sent != size) { fprintf(stderr,"%s, error writting to udt (%s)\n",name.toLocal8Bit().constData(), UDT::getlasterror().getErrorMessage()); if (UDT::getlasterror().getErrorCode() == 2001) { fprintf(stderr,"%s, closing connection\n",name.toLocal8Bit().constData()); stop = true; heartbeat_timer->stop(); deleteLater(); } } return sent; } int UdtSocket::uncompressBuffer(char *in, ssize_t in_size, char *out, ssize_t *out_size) { int ret; unsigned have; z_stream strm; // allocate inflate state strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; strm.avail_in = 0; strm.next_in = Z_NULL; ret = inflateInit(&strm); if (ret != Z_OK) return ret; strm.avail_in = in_size; strm.next_in = (unsigned char *)in; strm.avail_out = COMPRESS_CHUNK; strm.next_out = (unsigned char *)out; ret = inflate(&strm, Z_NO_FLUSH); assert(ret != Z_STREAM_ERROR); // state not clobbered switch (ret) { case Z_NEED_DICT: ret = Z_DATA_ERROR; // and fall through case Z_DATA_ERROR: case Z_MEM_ERROR: (void)inflateEnd(&strm); return ret; } have = COMPRESS_CHUNK - strm.avail_out; *out_size = have; // clean up and return (void)inflateEnd(&strm); return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR; }