Changeset 234 in flair-src for trunk/tools/FlairGCS/src/UdtSocket.cpp


Ignore:
Timestamp:
04/10/18 17:05:27 (6 years ago)
Author:
Sanahuja Guillaume
Message:

create file oscket only when necessary

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/tools/FlairGCS/src/UdtSocket.cpp

    r30 r234  
    44// %flair:license}
    55#include "UdtSocket.h"
     6#include "ConnectionLayout.h"
    67#include <stdio.h>
    78#include <stdlib.h>
     
    1011#include <QDir>
    1112#include <QDate>
    12 #include "file_ui.h"
    1313#include "communication.h"
     14#include <zlib.h>
     15#include <assert.h>
     16#include <QThread>
     17#define COMPRESS_CHUNK 1024*100
    1418
    1519#ifndef WIN32
     
    2024#endif
    2125
     26#define COMPUTE_UDT_STATS_TIMER 2000
     27#define HEARTBEAT_TIMER 200
     28
    2229using namespace std;
    2330
    24 UdtSocket::UdtSocket(UDTSOCKET file_socket, UDTSOCKET com_socket, QString name)
    25     : QObject() {
    26   this->file_socket = file_socket;
    27   this->com_socket = com_socket;
    28   this->name = name;
     31UdtSocket::UdtSocket(UDTSOCKET socket,QString name): QObject() {
     32  this->socket = socket;
     33  this->name=name;
    2934  stop = false;
    30   file_dialog = new file_ui();
    31 
    32   bool blocking = true;
    33   UDT::setsockopt(file_socket, 0, UDT_RCVSYN, &blocking, sizeof(bool));
    34 
     35  destroySocket=true;
     36  socketType=unknown;
     37  total_received=0;
     38 
     39  bool blocking = false;
     40  if (UDT::setsockopt(socket, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
     41    printf("UDT::setsockopt error (UDT_RCVSYN)\n");
     42   
    3543  heartbeat_timer = new QTimer(this);
    3644  connect(heartbeat_timer, SIGNAL(timeout()), this, SLOT(heartbeat()));
    37   heartbeat_timer->start(200);
     45  heartbeat_timer->start(HEARTBEAT_TIMER);
     46 
     47  udtstats_timer = new QTimer(this);
     48  connect(udtstats_timer, SIGNAL(timeout()), this, SLOT(getUTDStats()));
     49  udtstats_timer->start(COMPUTE_UDT_STATS_TIMER);
    3850}
    3951
    4052UdtSocket::~UdtSocket() {
    4153  heartbeat_timer->stop();
    42 
    43   UDT::close(file_socket);
    44   UDT::close(com_socket);
    45 
    46   file_dialog->deleteLater();
     54  udtstats_timer->stop();
     55  if(destroySocket) UDT::close(socket);         
     56}
     57
     58void UdtSocket::setName(QString name) {
     59  printf("  %s is %s\n",this->name.toLocal8Bit().constData(),name.toLocal8Bit().constData());
     60  this->name=name;
    4761}
    4862
     
    5266void UdtSocket::heartbeat(void) {
    5367  char data = WATCHDOG_HEADER;
    54   write(&data, 1);
     68  quint64 sent=write(&data, 1,HEARTBEAT_TIMER,false);
     69  if (sent != 1 && UDT::getlasterror().getErrorCode() == 2001) {
     70    heartbeat_timer->stop();
     71  }
     72 
     73}
     74
     75void UdtSocket::getUTDStats(void) {
     76  float rxRate=((float)total_received/(COMPUTE_UDT_STATS_TIMER/1000))/1000;//in Ko/s
     77  total_received=0;
     78 
     79  UDT::TRACEINFO perf;
     80  if (UDT::ERROR == UDT::perfmon(socket, &perf))   {
     81    printf("perfmon: %s\n",UDT::getlasterror().getErrorMessage());
     82  }/* else {
     83    printf("%s socket stats:\n",name.toLocal8Bit().constData());
     84    printf("total number of sent packets, including retransmissions: %i\n",perf.pktSentTotal);
     85    printf("total number of received packets: %i\n",perf.pktRecvTotal);
     86    printf("total number of lost packets, measured in the sending side: %i\n",perf.pktSndLossTotal);
     87    printf("total number of lost packets, measured in the receiving side: %i\n",perf.pktRcvLossTotal);
     88    printf("total number of retransmitted packets, measured in the sending side: %i\n",perf.pktRetransTotal);
     89    printf("total number of sent ACK packets: %i\n",perf.pktSentACKTotal);
     90    printf("total number of received ACK packets: %i\n",perf.pktRecvACKTotal);
     91    printf("total number of sent NAK packets: %i\n",perf.pktSentNAKTotal);
     92    printf("total number of received NAK packets: %i\n",perf.pktRecvNAKTotal);
     93    printf("round trip time: %fms\n",perf.msRTT);
     94     
     95  }*/
     96  stats=QString("rx rate %1kB/s, round trip %2ms, lost packets %3").arg(rxRate,0,'f',3).arg(perf.msRTT,0,'f',3).arg(perf.pktSndLossTotal);
     97  UDTStats(stats);
     98 
     99}
     100
     101QString UdtSocket::getUDTStats() {
     102  return stats;
    55103}
    56104
    57105void UdtSocket::kill(void) {
    58   printf("disconnected\n");
    59106  stop = true;
    60107  deleteLater();
    61108}
    62109
    63 void UdtSocket::handleConnections(void) {
     110void UdtSocket::receiveData(void) {
     111  int buf_size;
     112  int opt_size;
     113  UDT::getsockopt(socket, 0, UDT_RCVBUF, &buf_size, &opt_size);
     114  char *buf = (char *)malloc(buf_size);
     115  if (!buf) {
     116    printf("error malloc UdtSocket::receiveData buffer\n");
     117    return;
     118  }
     119  char *uncompressbuf=(char *)malloc(COMPRESS_CHUNK);
     120  if (!uncompressbuf) {
     121    printf("error malloc UdtSocket::receiveData uncompress buffer\n");
     122    free(buf);
     123    return;
     124  }
     125  //fprintf(stderr,"receiveData %x\n",thread());
     126 
    64127  while (!stop) {
     128    QCoreApplication::processEvents();
     129   
     130    //we need to use epoll?
     131    //tests are showing that UDT::recvmsg is waiting for the entire timeout before returning
     132    //note that in flair::core the behaviour of UDT::recvmsg timeout is as expected
     133    //is it a difference between client and server??
     134    //tests also show that we need to recreate the eid every time, otherwise wait returns immediately
    65135    int eid = UDT::epoll_create();
    66136    if (eid < 0) {
    67       printf("epoll_create error: %s\n", UDT::getlasterror().getErrorMessage());
    68     }
    69 
    70     if (UDT::epoll_add_usock(eid, file_socket) < 0) {
     137      printf("%s: epoll_create error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage());
     138    }
     139
     140    if (UDT::epoll_add_usock(eid, socket) < 0) {
    71141      if (UDT::getlasterror().getErrorCode() == 5004) {
    72         printf("epoll_add_usock error\n");
    73         break;
     142        printf("disconnected from %s\n",name.toLocal8Bit().constData());
     143        heartbeat_timer->stop();
     144        deleteLater();
     145        stop=true;;
    74146      } else {
    75         printf("epoll_add_usock error: %s\n",
    76                UDT::getlasterror().getErrorMessage());
     147        printf("%s: epoll_add_usock error (%s)\n",name.toLocal8Bit().constData(),UDT::getlasterror().getErrorMessage());
    77148      }
    78149    }
    79     if (UDT::epoll_add_usock(eid, com_socket) < 0) {
    80       if (UDT::getlasterror().getErrorCode() == 5004) {
    81         printf("epoll_add_usock error\n");
    82         break;
    83       } else {
    84         printf("epoll_add_usock error: %s\n",
    85                UDT::getlasterror().getErrorMessage());
    86       }
    87     }
    88 
    89     set<UDTSOCKET> readfds;
    90 
    91     int rv = UDT::epoll_wait(eid, &readfds, NULL, 10);
    92 
     150 
     151    int num = 1;
     152    UDTSOCKET readfds;
     153    int rv = UDT::epoll_wait2(eid, &readfds, &num,NULL, NULL,100);
     154 
    93155    if (rv == -1) {
    94156      if (UDT::getlasterror().getErrorCode() != 6003)
    95157        printf("prob %i\n", UDT::getlasterror().getErrorCode());
    96       // printf("wait\n");
    97     } else if (rv == 0) {
    98       printf("timeout\n"); // a timeout occured
     158    } else if(readfds==socket && num==1 && rv==1) {
     159       
     160      int size=UDT::recvmsg(socket, buf, buf_size);
     161      if (size > 0) {
     162        total_received+=size;
     163       
     164        switch ((unsigned char)buf[0]) {
     165          case ZLIB_HEADER: {
     166            ssize_t out_size;
     167            uncompressBuffer(buf, size, uncompressbuf, &out_size);
     168            if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) {
     169              socketType=gui;
     170              QString remoteName=ConnectionLayout::getDocRootName(uncompressbuf, out_size);
     171              setName(remoteName);
     172              emit newConnectionLayout(remoteName);//connection is Qt::BlockingQueuedConnection
     173            }
     174            emit dataReady(uncompressbuf, out_size);//connection is Qt::BlockingQueuedConnection, as we have only one buffer
     175            break;
     176          }
     177          case START_SENDING_FILES: {
     178            if((unsigned char)uncompressbuf[0]==XML_HEADER && socketType==unknown) {
     179              socketType=log;
     180            }
     181            setName("log files");
     182            heartbeat_timer->stop();
     183            emit newFileUI(socket);
     184            deleteLater();
     185            stop=true;
     186            destroySocket=false;
     187            break;
     188          }
     189          case XML_HEADER:
     190            if(socketType==unknown) {
     191              socketType=gui;
     192              QString remoteName=ConnectionLayout::getDocRootName(buf, size );
     193              setName(remoteName);
     194              emit newConnectionLayout(remoteName);
     195            }
     196          case DATAS_BIG_ENDIAN:
     197          case DATAS_LITTLE_ENDIAN:
     198            emit dataReady(buf, size );
     199            break;
     200        }
     201      } else {
     202        //if(UDT::getlasterror().getErrorCode()!=6002)
     203        printf("udt socket:%s\n",UDT::getlasterror().getErrorMessage());
     204        //UDT::close(socket);//si deconnecté
     205        //free(buf);
     206        //break;
     207      }
    99208    } else {
    100       /*
    101       if(UDT::getlasterror().getErrorCode()==2001) {
    102           UDT::epoll_release(eid);
    103       }*/
    104       for (set<UDTSOCKET>::iterator i = readfds.begin(); i != readfds.end();
    105            i++) {
    106         // printf("a\n");
    107         if (*i == file_socket)
    108           receiveFile();
    109         if (*i == com_socket)
    110           receiveData();
    111       }
    112     }
    113 
    114     UDT::epoll_remove_usock(eid, file_socket);
    115     UDT::epoll_remove_usock(eid, com_socket);
     209      printf("udt socket:%s\n",UDT::getlasterror().getErrorMessage());
     210    }
     211    UDT::epoll_remove_usock(eid, socket);
    116212    UDT::epoll_release(eid);
    117 
    118     QCoreApplication::processEvents();
    119   }
    120   kill();
    121 }
    122 
    123 void UdtSocket::receiveFile(void) {
    124   char *recv_buf;
    125   int bytesRead;
    126   static bool flag_new_seq = true;
    127   static QString folder_name;
    128 
    129   // receive file info
    130   recv_buf = (char *)malloc(1024);
    131   bytesRead = UDT::recvmsg(file_socket, recv_buf, 1024);
    132 
    133   if (bytesRead <= 0) {
    134     free(recv_buf);
    135     return;
    136   }
    137 
    138   int size;
    139   memcpy(&size, &recv_buf[1], sizeof(int));
    140   if (recv_buf[0] == FILE_INFO_BIG_ENDIAN)
    141     size = qFromBigEndian(size);
    142 
    143   // printf("file_ui recu %i %x\n",bytesRead,recv_buf[0]);
    144   if ((recv_buf[0] == FILE_INFO_LITTLE_ENDIAN ||
    145        recv_buf[0] == FILE_INFO_BIG_ENDIAN) &&
    146       size > 0) {
    147     if (flag_new_seq == true) {
    148       // create directory for storage
    149       QDateTime dateTime = QDateTime::currentDateTime();
    150       folder_name = dateTime.toString("yyyyMMdd_hhmm") + "_" + name;
    151       if (QDir().exists(folder_name) == true) {
    152         folder_name = dateTime.toString("yyyyMMdd_hhmm_ss") + "_" + name;
    153       }
    154       QDir().mkdir(folder_name);
    155 
    156       flag_new_seq = false;
    157       file_dialog->log("Creating directory " + folder_name);
    158     }
    159 
    160     QString file_name =
    161         QString::fromAscii((const char *)&recv_buf[5], bytesRead - 5);
    162     QString file_path = folder_name + "/" + file_name;
    163     file_dialog->log(
    164         QString("receiving %1 (%2 bytes)").arg(file_name).arg(size));
    165     QFile fichier(file_path);
    166 
    167     if (!fichier.open(QIODevice::WriteOnly)) {
    168       file_dialog->log("      could not write to file!");
    169     } else {
    170       // receive file
    171       recv_buf = (char *)realloc((void *)recv_buf, size);
    172       bytesRead = UDT::recvmsg(file_socket, recv_buf, size);
    173       if (bytesRead != size) {
    174         file_dialog->log(QString("      error receiving file! (%1/%2)")
    175                              .arg(bytesRead)
    176                              .arg(size));
    177         free(recv_buf);
    178         return;
    179       } else {
    180         file_dialog->log("      ok");
    181       }
    182 
    183       QDataStream stream(&fichier);
    184       stream.writeRawData(recv_buf, size);
    185       fichier.close();
    186 
    187       file_dialog->addFile(file_path);
    188     }
    189 
    190     free(recv_buf);
    191   } else if (recv_buf[0] == END) {
    192     file_dialog->endOfFiles();
    193     flag_new_seq = true;
    194     //end ack
    195     UDT::sendmsg(file_socket,&recv_buf[0],1);
    196   }
    197 }
    198 
    199 void UdtSocket::receiveData(void) {
    200   while (1) {
    201     int buf_size;
    202     int opt_size;
    203     UDT::getsockopt(com_socket, 0, UDT_RCVBUF, &buf_size, &opt_size);
    204 
    205     char *buf = (char *)malloc(buf_size);
    206     int size;
    207     size = UDT::recvmsg(com_socket, buf, buf_size);
    208     buf = (char *)realloc(buf, size + 1);
    209 
    210     if (size > 0) {
    211       buf[size] = 0;
    212       emit dataReady(buf, size + 1);
    213     } else {
    214       // if(UDT::getlasterror().getErrorCode()!=6002) printf("udt socket:
    215       // %s\n",UDT::getlasterror().getErrorMessage());
    216       free(buf);
    217       break;
    218     }
    219   }
    220 }
    221 
    222 void UdtSocket::write(const char *buf, qint64 size) {
    223   // printf("write\n%s\n",buf);
    224   qint64 sent = UDT::sendmsg(com_socket, buf, size, -1, true);
     213  }
     214  free(uncompressbuf);
     215  free(buf);
     216  thread()->quit();
     217}
     218
     219qint64 UdtSocket::write(const char *buf, qint64 size,int ttl,bool inOrder) {
     220  qint64 sent = UDT::sendmsg(socket, buf, size, ttl, inOrder);
    225221  if (sent != size) {
    226     printf("erreur envoi: %s\n", UDT::getlasterror().getErrorMessage());
     222    printf("%s, error writting to udt (%s)\n",name.toLocal8Bit().constData(), UDT::getlasterror().getErrorMessage());
    227223    if (UDT::getlasterror().getErrorCode() == 2001) {
    228224      stop = true;
    229     }
    230   }
    231 }
     225      heartbeat_timer->stop();
     226      deleteLater();
     227    }
     228  }
     229  return sent;
     230}
     231
     232int UdtSocket::uncompressBuffer(char *in, ssize_t in_size, char *out,
     233                                       ssize_t *out_size) {
     234  int ret;
     235  unsigned have;
     236  z_stream strm;
     237
     238  // allocate inflate state
     239  strm.zalloc = Z_NULL;
     240  strm.zfree = Z_NULL;
     241  strm.opaque = Z_NULL;
     242  strm.avail_in = 0;
     243  strm.next_in = Z_NULL;
     244  ret = inflateInit(&strm);
     245  if (ret != Z_OK)
     246    return ret;
     247
     248  strm.avail_in = in_size;
     249  strm.next_in = (unsigned char *)in;
     250  strm.avail_out = COMPRESS_CHUNK;
     251  strm.next_out = (unsigned char *)out;
     252
     253  ret = inflate(&strm, Z_NO_FLUSH);
     254  assert(ret != Z_STREAM_ERROR); // state not clobbered
     255  switch (ret) {
     256  case Z_NEED_DICT:
     257    ret = Z_DATA_ERROR; // and fall through
     258  case Z_DATA_ERROR:
     259  case Z_MEM_ERROR:
     260    (void)inflateEnd(&strm);
     261    return ret;
     262  }
     263  have = COMPRESS_CHUNK - strm.avail_out;
     264  *out_size = have;
     265
     266  // clean up and return
     267  (void)inflateEnd(&strm);
     268  return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
     269}
Note: See TracChangeset for help on using the changeset viewer.