Changeset 439 in flair-src for trunk/lib/FlairCore/src/ui_com.cpp


Ignore:
Timestamp:
08/25/21 13:13:36 (3 years ago)
Author:
Sanahuja Guillaume
Message:

refactor

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/ui_com.cpp

    r253 r439  
    153153}
    154154
     155//send, public part; dispatch to SendNRT (nrt) or rt_pipe (rt)
     156//rt_pipe is read by user thread which calls SendNRT
    155157void ui_com::Send(char *buf, ssize_t size,int ttl) {
    156158  if (connection_lost == true)
     
    158160
    159161  char *tosend = buf;
    160   ssize_t nb_write;
    161 
    162162  if (buf[0] == XML_HEADER) {
    163163    // cut xml header
     
    167167
    168168#ifdef __XENO__
    169   nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
    170 
     169  //data buf
     170  ssize_t nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
    171171  if (nb_write < 0) {
    172172                char errorMsg[256];
     
    175175    Err("rt_pipe_write error %i/%i\n", nb_write, size);
    176176  }
     177  //ttl
     178  nb_write = rt_pipe_write(&pipe, &ttl, sizeof(ttl), P_NORMAL);
     179  if (nb_write < 0) {
     180                char errorMsg[256];
     181    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
     182  } else if (nb_write != sizeof(ttl)) {
     183    Err("rt_pipe_write error %i/%i\n", nb_write, size);
     184  }
    177185#else //__XENO__
    178 
     186  SendNRT(tosend,size,ttl);
     187#endif //__XENO__
     188}
     189
     190//send, private part; called to effectively send to udt
     191//this is always in nrt
     192void ui_com::SendNRT(char *buf, ssize_t size,int ttl) {
     193  ssize_t nb_write;
     194  bool sendItCompressed = false;
     195 
    179196#ifdef COMPRESS_FRAMES
    180   bool sendItCompressed = false;
    181197  if (buf[0] == XML_HEADER) {
    182198    sendItCompressed = true;
    183199  }
     200#endif // COMPRESS_FRAMES
    184201
    185202  if (sendItCompressed) {
    186203    char *out;
    187204    ssize_t out_size;
    188     if (compressBuffer(tosend, size, &out, &out_size, 9) == Z_OK) {
     205    if (compressBuffer(buf, size, &out, &out_size, 9) == Z_OK) {
    189206      size = out_size;
    190207      nb_write = UDT::sendmsg(socket_fd, out, size,ttl, true);
     
    197214
    198215  if (!sendItCompressed) {
    199     nb_write = UDT::sendmsg(socket_fd, tosend, size, ttl, true);
    200   }
    201 
    202 #else // COMPRESS_FRAMES
    203   nb_write = UDT::sendmsg(socket_fd, tosend, size, ttl, true);
    204 #endif // COMPRESS_FRAMES
     216    nb_write = UDT::sendmsg(socket_fd, buf, size, ttl, true);
     217  }
     218
    205219   //Printf("write %i %i\n",nb_write,size);
    206220  if (nb_write < 0) {
     
    214228        UDT::getlasterror().getErrorCode(), nb_write, size);
    215229  }
    216 #endif //__XENO__
    217230}
    218231
     
    240253}
    241254
     255//todo: le run (rt) push les données une par une
     256//le nrt recupere et decide quand envoyer :
     257// -quand toutes les données d'une meme periode sont recues
     258// -quand on a atteind le nb buffering
     259// -le nrt alloue dynamiquement le buffer d'envoi
     260//-> enelver l'alloc du buffer au startup (UpdateDataToSendSize)
    242261void ui_com::Run(void) {
    243262  // check endianness
     
    287306
    288307      for (size_t i = 0; i < data_to_send.size(); i++) {
    289         if (data_to_send.at(i)->SendPeriod() == resume_period &&
    290             data_to_send.at(i)->IsEnabled() == true) {
     308        if (data_to_send.at(i)->SendPeriod() == resume_period && data_to_send.at(i)->IsEnabled() == true) {
    291309          data_to_send.at(i)->CopyDatas(send_buffer + offset);
    292310          offset += data_to_send.at(i)->SendSize();
     
    335353  fd_set set;
    336354  struct timeval timeout;
    337   ssize_t nb_read, nb_write;
     355  ssize_t nb_read;
     356  int ttl;
    338357
    339358  buf = (char *)malloc(NRT_PIPE_SIZE);
     
    363382
    364383    if (rv == -1) {
    365       if (caller->is_running == false &&
    366           UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
     384      if (caller->is_running == false && UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
    367385        break; // timeout
    368386      if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
    369387        caller->Err("epoll_wait, %s, code %i\n",
    370                     UDT::getlasterror().getErrorMessage(),
    371                     UDT::getlasterror().getErrorCode());
     388                    UDT::getlasterror().getErrorMessage(), UDT::getlasterror().getErrorCode());
    372389    } else if (rv == 0) {
    373390      // printf("timeout\n"); // a timeout occured
     
    376393
    377394    } else {
     395      //read data buf and ttl
     396      //RT part has made 2 rt_pipe_write; they should be available in nrt
     397      //(select ensure at least one data is ready)
    378398      nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
    379399      buf[nb_read] = 0;
     400      read(pipe_fd, &ttl, sizeof(ttl));
    380401// printf("envoi\n%s\n",buf);
    381402
    382 #ifdef COMPRESS_FRAMES
    383       char *out;
    384       ssize_t out_size;
    385 
    386       if (buf[0] == XML_HEADER) {
    387         if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) {
    388           nb_read = out_size;
    389           nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true);
    390           free(out);
    391         } else {
    392           caller->Warn("Compress error, sending it uncompressed\n");
    393           nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
    394         }
    395       } else {
    396         nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
    397       }
    398 #else
    399       nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
    400 #endif
    401 
    402       if (nb_write < 0) {
    403         caller->Err("UDT::sendmsg error (%s)\n",
    404                     UDT::getlasterror().getErrorMessage());
    405         if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
    406             UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
    407           caller->connection_lost = true;
    408         }
    409       } else if (nb_write != nb_read) {
    410         caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read);
    411       }
     403      caller->SendNRT(buf, nb_read,ttl);
    412404    }
    413405  }
     
    558550}
    559551
     552//allocate a buffer at startup for the worst case
     553//(every data send with same pedriod)
     554//TODO: dynamic size depending on what gcs asks?
    560555void ui_com::UpdateDataToSendSize(void) {
    561556  send_mutex->GetMutex();
Note: See TracChangeset for help on using the changeset viewer.