Changeset 234 in flair-src for trunk/tools/FlairGCS/src/UdtSocket.cpp
- Timestamp:
- Apr 10, 2018, 5:05:27 PM (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/tools/FlairGCS/src/UdtSocket.cpp
r30 r234 4 4 // %flair:license} 5 5 #include "UdtSocket.h" 6 #include "ConnectionLayout.h" 6 7 #include <stdio.h> 7 8 #include <stdlib.h> … … 10 11 #include <QDir> 11 12 #include <QDate> 12 #include "file_ui.h"13 13 #include "communication.h" 14 #include <zlib.h> 15 #include <assert.h> 16 #include <QThread> 17 #define COMPRESS_CHUNK 1024*100 14 18 15 19 #ifndef WIN32 … … 20 24 #endif 21 25 26 #define COMPUTE_UDT_STATS_TIMER 2000 27 #define HEARTBEAT_TIMER 200 28 22 29 using namespace std; 23 30 24 UdtSocket::UdtSocket(UDTSOCKET file_socket, UDTSOCKET com_socket, QString name) 25 : QObject() { 26 this->file_socket = file_socket; 27 this->com_socket = com_socket; 28 this->name = name; 31 UdtSocket::UdtSocket(UDTSOCKET socket,QString name): QObject() { 32 this->socket = socket; 33 this->name=name; 29 34 stop = false; 30 file_dialog = new file_ui(); 31 32 bool blocking = true; 33 UDT::setsockopt(file_socket, 0, UDT_RCVSYN, &blocking, sizeof(bool)); 34 35 destroySocket=true; 36 socketType=unknown; 37 total_received=0; 38 39 bool blocking = false; 40 if (UDT::setsockopt(socket, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0) 41 printf("UDT::setsockopt error (UDT_RCVSYN)\n"); 42 35 43 heartbeat_timer = new QTimer(this); 36 44 connect(heartbeat_timer, SIGNAL(timeout()), this, SLOT(heartbeat())); 37 heartbeat_timer->start(200); 45 heartbeat_timer->start(HEARTBEAT_TIMER); 46 47 udtstats_timer = new QTimer(this); 48 connect(udtstats_timer, SIGNAL(timeout()), this, SLOT(getUTDStats())); 49 udtstats_timer->start(COMPUTE_UDT_STATS_TIMER); 38 50 } 39 51 40 52 UdtSocket::~UdtSocket() { 41 53 heartbeat_timer->stop(); 42 43 UDT::close(file_socket); 44 UDT::close(com_socket); 45 46 file_dialog->deleteLater(); 54 udtstats_timer->stop(); 55 if(destroySocket) UDT::close(socket); 56 } 57 58 void UdtSocket::setName(QString name) { 59 printf(" %s is %s\n",this->name.toLocal8Bit().constData(),name.toLocal8Bit().constData()); 60 this->name=name; 47 61 } 48 62 … … 52 66 void UdtSocket::heartbeat(void) { 53 67 char data = WATCHDOG_HEADER; 54 write(&data, 1); 68 quint64 sent=write(&data, 1,HEARTBEAT_TIMER,false); 69 if (sent != 1 && UDT::getlasterror().getErrorCode() == 2001) { 70 heartbeat_timer->stop(); 71 } 72 73 } 74 75 void UdtSocket::getUTDStats(void) { 76 float rxRate=((float)total_received/(COMPUTE_UDT_STATS_TIMER/1000))/1000;//in Ko/s 77 total_received=0; 78 79 UDT::TRACEINFO perf; 80 if (UDT::ERROR == UDT::perfmon(socket, &perf)) { 81 printf("perfmon: %s\n",UDT::getlasterror().getErrorMessage()); 82 }/* else { 83 printf("%s socket stats:\n",name.toLocal8Bit().constData()); 84 printf("total number of sent packets, including retransmissions: %i\n",perf.pktSentTotal); 85 printf("total number of received packets: %i\n",perf.pktRecvTotal); 86 printf("total number of lost packets, measured in the sending side: %i\n",perf.pktSndLossTotal); 87 printf("total number of lost packets, measured in the receiving side: %i\n",perf.pktRcvLossTotal); 88 printf("total number of retransmitted packets, measured in the sending side: %i\n",perf.pktRetransTotal); 89 printf("total number of sent ACK packets: %i\n",perf.pktSentACKTotal); 90 printf("total number of received ACK packets: %i\n",perf.pktRecvACKTotal); 91 printf("total number of sent NAK packets: %i\n",perf.pktSentNAKTotal); 92 printf("total number of received NAK packets: %i\n",perf.pktRecvNAKTotal); 93 printf("round trip time: %fms\n",perf.msRTT); 94 95 }*/ 96 stats=QString("rx rate %1kB/s, round trip %2ms, lost packets %3").arg(rxRate,0,'f',3).arg(perf.msRTT,0,'f',3).arg(perf.pktSndLossTotal); 97 UDTStats(stats); 98 99 } 100 101 QString UdtSocket::getUDTStats() { 102 return stats; 55 103 } 56 104 57 105 void UdtSocket::kill(void) { 58 printf("disconnected\n");59 106 stop = true; 60 107 deleteLater(); 61 108 } 62 109 63 void UdtSocket::handleConnections(void) { 110 void UdtSocket::receiveData(void) { 111 int buf_size; 112 int opt_size; 113 UDT::getsockopt(socket, 0, UDT_RCVBUF, &buf_size, &opt_size); 114 char *buf = (char *)malloc(buf_size); 115 if (!buf) { 116 printf("error malloc UdtSocket::receiveData buffer\n"); 117 return; 118 } 119 char *uncompressbuf=(char *)malloc(COMPRESS_CHUNK); 120 if (!uncompressbuf) { 121 printf("error malloc UdtSocket::receiveData uncompress buffer\n"); 122 free(buf); 123 return; 124 } 125 //fprintf(stderr,"receiveData %x\n",thread()); 126 64 127 while (!stop) { 128 QCoreApplication::processEvents(); 129 130 //we need to use epoll? 131 //tests are showing that UDT::recvmsg is waiting for the entire timeout before returning 132 //note that in flair::core the behaviour of UDT::recvmsg timeout is as expected 133 //is it a difference between client and server?? 134 //tests also show that we need to recreate the eid every time, otherwise wait returns immediately 65 135 int eid = UDT::epoll_create(); 66 136 if (eid < 0) { 67 printf(" epoll_create error: %s\n",UDT::getlasterror().getErrorMessage());68 } 69 70 if (UDT::epoll_add_usock(eid, file_socket) < 0) {137 printf("%s: epoll_create error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage()); 138 } 139 140 if (UDT::epoll_add_usock(eid, socket) < 0) { 71 141 if (UDT::getlasterror().getErrorCode() == 5004) { 72 printf("epoll_add_usock error\n"); 73 break; 142 printf("disconnected from %s\n",name.toLocal8Bit().constData()); 143 heartbeat_timer->stop(); 144 deleteLater(); 145 stop=true;; 74 146 } else { 75 printf("epoll_add_usock error: %s\n", 76 UDT::getlasterror().getErrorMessage()); 147 printf("%s: epoll_add_usock error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage()); 77 148 } 78 149 } 79 if (UDT::epoll_add_usock(eid, com_socket) < 0) { 80 if (UDT::getlasterror().getErrorCode() == 5004) { 81 printf("epoll_add_usock error\n"); 82 break; 83 } else { 84 printf("epoll_add_usock error: %s\n", 85 UDT::getlasterror().getErrorMessage()); 86 } 87 } 88 89 set<UDTSOCKET> readfds; 90 91 int rv = UDT::epoll_wait(eid, &readfds, NULL, 10); 92 150 151 int num = 1; 152 UDTSOCKET readfds; 153 int rv = UDT::epoll_wait2(eid, &readfds, &num,NULL, NULL,100); 154 93 155 if (rv == -1) { 94 156 if (UDT::getlasterror().getErrorCode() != 6003) 95 157 printf("prob %i\n", UDT::getlasterror().getErrorCode()); 96 // printf("wait\n"); 97 } else if (rv == 0) { 98 printf("timeout\n"); // a timeout occured 158 } else if(readfds==socket && num==1 && rv==1) { 159 160 int size=UDT::recvmsg(socket, buf, buf_size); 161 if (size > 0) { 162 total_received+=size; 163 164 switch ((unsigned char)buf[0]) { 165 case ZLIB_HEADER: { 166 ssize_t out_size; 167 uncompressBuffer(buf, size, uncompressbuf, &out_size); 168 if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) { 169 socketType=gui; 170 QString remoteName=ConnectionLayout::getDocRootName(uncompressbuf, out_size); 171 setName(remoteName); 172 emit newConnectionLayout(remoteName);//connection is Qt::BlockingQueuedConnection 173 } 174 emit dataReady(uncompressbuf, out_size);//connection is Qt::BlockingQueuedConnection, as we have only one buffer 175 break; 176 } 177 case START_SENDING_FILES: { 178 if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) { 179 socketType=log; 180 } 181 setName("log files"); 182 heartbeat_timer->stop(); 183 emit newFileUI(socket); 184 deleteLater(); 185 stop=true; 186 destroySocket=false; 187 break; 188 } 189 case XML_HEADER: 190 if(socketType==unknown) { 191 socketType=gui; 192 QString remoteName=ConnectionLayout::getDocRootName(buf, size ); 193 setName(remoteName); 194 emit newConnectionLayout(remoteName); 195 } 196 case DATAS_BIG_ENDIAN: 197 case DATAS_LITTLE_ENDIAN: 198 emit dataReady(buf, size ); 199 break; 200 } 201 } else { 202 //if(UDT::getlasterror().getErrorCode()!=6002) 203 printf("udt socket:%s\n",UDT::getlasterror().getErrorMessage()); 204 //UDT::close(socket);//si deconnecté 205 //free(buf); 206 //break; 207 } 99 208 } else { 100 /* 101 if(UDT::getlasterror().getErrorCode()==2001) { 102 UDT::epoll_release(eid); 103 }*/ 104 for (set<UDTSOCKET>::iterator i = readfds.begin(); i != readfds.end(); 105 i++) { 106 // printf("a\n"); 107 if (*i == file_socket) 108 receiveFile(); 109 if (*i == com_socket) 110 receiveData(); 111 } 112 } 113 114 UDT::epoll_remove_usock(eid, file_socket); 115 UDT::epoll_remove_usock(eid, com_socket); 209 printf("udt socket:%s\n",UDT::getlasterror().getErrorMessage()); 210 } 211 UDT::epoll_remove_usock(eid, socket); 116 212 UDT::epoll_release(eid); 117 118 QCoreApplication::processEvents(); 119 } 120 kill(); 121 } 122 123 void UdtSocket::receiveFile(void) { 124 char *recv_buf; 125 int bytesRead; 126 static bool flag_new_seq = true; 127 static QString folder_name; 128 129 // receive file info 130 recv_buf = (char *)malloc(1024); 131 bytesRead = UDT::recvmsg(file_socket, recv_buf, 1024); 132 133 if (bytesRead <= 0) { 134 free(recv_buf); 135 return; 136 } 137 138 int size; 139 memcpy(&size, &recv_buf[1], sizeof(int)); 140 if (recv_buf[0] == FILE_INFO_BIG_ENDIAN) 141 size = qFromBigEndian(size); 142 143 // printf("file_ui recu %i %x\n",bytesRead,recv_buf[0]); 144 if ((recv_buf[0] == FILE_INFO_LITTLE_ENDIAN || 145 recv_buf[0] == FILE_INFO_BIG_ENDIAN) && 146 size > 0) { 147 if (flag_new_seq == true) { 148 // create directory for storage 149 QDateTime dateTime = QDateTime::currentDateTime(); 150 folder_name = dateTime.toString("yyyyMMdd_hhmm") + "_" + name; 151 if (QDir().exists(folder_name) == true) { 152 folder_name = dateTime.toString("yyyyMMdd_hhmm_ss") + "_" + name; 153 } 154 QDir().mkdir(folder_name); 155 156 flag_new_seq = false; 157 file_dialog->log("Creating directory " + folder_name); 158 } 159 160 QString file_name = 161 QString::fromAscii((const char *)&recv_buf[5], bytesRead - 5); 162 QString file_path = folder_name + "/" + file_name; 163 file_dialog->log( 164 QString("receiving %1 (%2 bytes)").arg(file_name).arg(size)); 165 QFile fichier(file_path); 166 167 if (!fichier.open(QIODevice::WriteOnly)) { 168 file_dialog->log(" could not write to file!"); 169 } else { 170 // receive file 171 recv_buf = (char *)realloc((void *)recv_buf, size); 172 bytesRead = UDT::recvmsg(file_socket, recv_buf, size); 173 if (bytesRead != size) { 174 file_dialog->log(QString(" error receiving file! (%1/%2)") 175 .arg(bytesRead) 176 .arg(size)); 177 free(recv_buf); 178 return; 179 } else { 180 file_dialog->log(" ok"); 181 } 182 183 QDataStream stream(&fichier); 184 stream.writeRawData(recv_buf, size); 185 fichier.close(); 186 187 file_dialog->addFile(file_path); 188 } 189 190 free(recv_buf); 191 } else if (recv_buf[0] == END) { 192 file_dialog->endOfFiles(); 193 flag_new_seq = true; 194 //end ack 195 UDT::sendmsg(file_socket,&recv_buf[0],1); 196 } 197 } 198 199 void UdtSocket::receiveData(void) { 200 while (1) { 201 int buf_size; 202 int opt_size; 203 UDT::getsockopt(com_socket, 0, UDT_RCVBUF, &buf_size, &opt_size); 204 205 char *buf = (char *)malloc(buf_size); 206 int size; 207 size = UDT::recvmsg(com_socket, buf, buf_size); 208 buf = (char *)realloc(buf, size + 1); 209 210 if (size > 0) { 211 buf[size] = 0; 212 emit dataReady(buf, size + 1); 213 } else { 214 // if(UDT::getlasterror().getErrorCode()!=6002) printf("udt socket: 215 // %s\n",UDT::getlasterror().getErrorMessage()); 216 free(buf); 217 break; 218 } 219 } 220 } 221 222 void UdtSocket::write(const char *buf, qint64 size) { 223 // printf("write\n%s\n",buf); 224 qint64 sent = UDT::sendmsg(com_socket, buf, size, -1, true); 213 } 214 free(uncompressbuf); 215 free(buf); 216 thread()->quit(); 217 } 218 219 qint64 UdtSocket::write(const char *buf, qint64 size,int ttl,bool inOrder) { 220 qint64 sent = UDT::sendmsg(socket, buf, size, ttl, inOrder); 225 221 if (sent != size) { 226 printf(" erreur envoi: %s\n", UDT::getlasterror().getErrorMessage());222 printf("%s, error writting to udt (%s)\n",name.toLocal8Bit().constData(), UDT::getlasterror().getErrorMessage()); 227 223 if (UDT::getlasterror().getErrorCode() == 2001) { 228 224 stop = true; 229 } 230 } 231 } 225 heartbeat_timer->stop(); 226 deleteLater(); 227 } 228 } 229 return sent; 230 } 231 232 int UdtSocket::uncompressBuffer(char *in, ssize_t in_size, char *out, 233 ssize_t *out_size) { 234 int ret; 235 unsigned have; 236 z_stream strm; 237 238 // allocate inflate state 239 strm.zalloc = Z_NULL; 240 strm.zfree = Z_NULL; 241 strm.opaque = Z_NULL; 242 strm.avail_in = 0; 243 strm.next_in = Z_NULL; 244 ret = inflateInit(&strm); 245 if (ret != Z_OK) 246 return ret; 247 248 strm.avail_in = in_size; 249 strm.next_in = (unsigned char *)in; 250 strm.avail_out = COMPRESS_CHUNK; 251 strm.next_out = (unsigned char *)out; 252 253 ret = inflate(&strm, Z_NO_FLUSH); 254 assert(ret != Z_STREAM_ERROR); // state not clobbered 255 switch (ret) { 256 case Z_NEED_DICT: 257 ret = Z_DATA_ERROR; // and fall through 258 case Z_DATA_ERROR: 259 case Z_MEM_ERROR: 260 (void)inflateEnd(&strm); 261 return ret; 262 } 263 have = COMPRESS_CHUNK - strm.avail_out; 264 *out_size = have; 265 266 // clean up and return 267 (void)inflateEnd(&strm); 268 return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR; 269 }
Note:
See TracChangeset
for help on using the changeset viewer.