Changeset 15 in flair-src for trunk/lib/FlairCore/src/ui_com.cpp


Ignore:
Timestamp:
Apr 8, 2016, 3:40:57 PM (6 years ago)
Author:
Bayard Gildas
Message:

sources reformatted with flair-format-dir script

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/ui_com.cpp

    r2 r15  
    3939using namespace flair::gui;
    4040
    41 ui_com::ui_com(const Object *parent,UDTSOCKET sock): Thread(parent,"send",2)
    42 {
    43     //buffer envoi
    44     send_buffer=(char*)malloc(3);
    45     send_size=3;//id(1)+period(2)
    46 
    47     //mutex
    48     send_mutex=NULL;
    49     send_mutex=new Mutex(this,ObjectName());
    50 
    51     socket_fd=sock;
    52     connection_lost=false;
     41ui_com::ui_com(const Object *parent, UDTSOCKET sock)
     42    : Thread(parent, "send", 2) {
     43  // buffer envoi
     44  send_buffer = (char *)malloc(3);
     45  send_size = 3; // id(1)+period(2)
     46
     47  // mutex
     48  send_mutex = NULL;
     49  send_mutex = new Mutex(this, ObjectName());
     50
     51  socket_fd = sock;
     52  connection_lost = false;
    5353
    5454#ifdef __XENO__
    55     int status;
    56     string tmp_name;
    57 
    58     is_running=true;
    59 
    60     //pipe
    61     tmp_name= getFrameworkManager()->ObjectName() + "-" + ObjectName()+ "-pipe";
    62     //xenomai limitation
    63     if(tmp_name.size()>31) Err("rt_pipe_create error (%s is too long)\n",tmp_name.c_str());
     55  int status;
     56  string tmp_name;
     57
     58  is_running = true;
     59
     60  // pipe
     61  tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
     62  // xenomai limitation
     63  if (tmp_name.size() > 31)
     64    Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
    6465#ifdef RT_PIPE_SIZE
    65     status=rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO,RT_PIPE_SIZE);
     66  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
    6667#else
    67     status=rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO,0);
    68 #endif
    69 
    70     if(status!=0)
    71     {
    72         Err("rt_pipe_create error (%s)\n",strerror(-status));
    73         //return -1;
    74     }
    75 
    76     //start user side thread
     68  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
     69#endif
     70
     71  if (status != 0) {
     72    Err("rt_pipe_create error (%s)\n", strerror(-status));
     73    // return -1;
     74  }
     75
     76// start user side thread
    7777#ifdef NRT_STACK_SIZE
    78     // Initialize thread creation attributes
    79     pthread_attr_t attr;
    80     if(pthread_attr_init(&attr) != 0)
    81     {
    82         Err("pthread_attr_init error\n");
    83     }
    84 
    85     if(pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0)
    86     {
    87        Err("pthread_attr_setstacksize error\n");
    88     }
    89 
    90     if(pthread_create(&thread,  &attr, user_thread, (void*)this) < 0)
    91 #else //NRT_STACK_SIZE
    92     if(pthread_create(&thread, NULL, user_thread, (void*)this) < 0)
    93 #endif //NRT_STACK_SIZE
    94     {
    95         Err("pthread_create error\n");
    96       //return -1;
    97     }
     78  // Initialize thread creation attributes
     79  pthread_attr_t attr;
     80  if (pthread_attr_init(&attr) != 0) {
     81    Err("pthread_attr_init error\n");
     82  }
     83
     84  if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
     85    Err("pthread_attr_setstacksize error\n");
     86  }
     87
     88  if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
     89#else  // NRT_STACK_SIZE
     90  if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
     91#endif // NRT_STACK_SIZE
     92  {
     93    Err("pthread_create error\n");
     94    // return -1;
     95  }
    9896#ifdef NRT_STACK_SIZE
    99     if(pthread_attr_destroy(&attr) != 0)
    100     {
    101         Err("pthread_attr_destroy error\n");
    102     }
    103 #endif
    104 
    105 #endif//__XENO__
    106     int timeout=100;
    107     if(UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int))!=0) Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
    108 
    109     bool blocking = true;
    110     if(UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool))!=0) Err("UDT::setsockopt error (UDT_SNDSYN)\n");
    111 
    112     if(UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool))!=0) Err("UDT::setsockopt error (UDT_RCVSYN)\n");
    113 //#endif //__XENO__
    114 
    115     Start();
    116 }
    117 
    118 
    119 ui_com::~ui_com()
    120 {
    121     //printf("destruction ui_com\n");
     97  if (pthread_attr_destroy(&attr) != 0) {
     98    Err("pthread_attr_destroy error\n");
     99  }
     100#endif
     101
     102#endif //__XENO__
     103  int timeout = 100;
     104  if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
     105    Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
     106
     107  bool blocking = true;
     108  if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
     109    Err("UDT::setsockopt error (UDT_SNDSYN)\n");
     110
     111  if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
     112    Err("UDT::setsockopt error (UDT_RCVSYN)\n");
     113  //#endif //__XENO__
     114
     115  Start();
     116}
     117
     118ui_com::~ui_com() {
     119// printf("destruction ui_com\n");
    122120
    123121#ifdef __XENO__
    124     is_running=false;
    125 
    126     pthread_join(thread, NULL);
    127 
    128     int status=rt_pipe_delete(&pipe);
    129     if(status!=0) Err("rt_pipe_delete error (%s)\n",strerror(-status));
    130 #endif
    131 
    132     SafeStop();
    133 
    134     if(IsSuspended()==true) Resume();
    135 
    136     Join();
    137 
    138     if(send_buffer!=NULL) free(send_buffer);
    139     send_buffer=NULL;
    140 
    141     //printf("destruction ui_com ok\n");
    142 }
    143 
    144 void ui_com::Send(char* buf,ssize_t size) {
    145     if(connection_lost==true) return;
    146 
    147     char* tosend=buf;
    148     ssize_t nb_write;
    149 
    150     if(buf[0]==XML_HEADER) {
    151         //cut xml header
    152         tosend=strstr(buf,"<root");
    153         size-=tosend-buf;
    154     }
     122  is_running = false;
     123
     124  pthread_join(thread, NULL);
     125
     126  int status = rt_pipe_delete(&pipe);
     127  if (status != 0)
     128    Err("rt_pipe_delete error (%s)\n", strerror(-status));
     129#endif
     130
     131  SafeStop();
     132
     133  if (IsSuspended() == true)
     134    Resume();
     135
     136  Join();
     137
     138  if (send_buffer != NULL)
     139    free(send_buffer);
     140  send_buffer = NULL;
     141
     142  // printf("destruction ui_com ok\n");
     143}
     144
     145void ui_com::Send(char *buf, ssize_t size) {
     146  if (connection_lost == true)
     147    return;
     148
     149  char *tosend = buf;
     150  ssize_t nb_write;
     151
     152  if (buf[0] == XML_HEADER) {
     153    // cut xml header
     154    tosend = strstr(buf, "<root");
     155    size -= tosend - buf;
     156  }
    155157
    156158#ifdef __XENO__
    157     nb_write=rt_pipe_write(&pipe,tosend,size,P_NORMAL);
    158 
    159     if(nb_write<0)
    160     {
    161         Err("rt_pipe_write error (%s)\n",strerror(-nb_write));
    162     }
    163     else if (nb_write != size)
    164     {
    165         Err("rt_pipe_write error %i/%i\n",nb_write,size);
    166     }
     159  nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
     160
     161  if (nb_write < 0) {
     162    Err("rt_pipe_write error (%s)\n", strerror(-nb_write));
     163  } else if (nb_write != size) {
     164    Err("rt_pipe_write error %i/%i\n", nb_write, size);
     165  }
    167166#else //__XENO__
    168167
    169168#ifdef COMPRESS_FRAMES
    170     bool sendItCompressed=false;
    171     if(buf[0]==XML_HEADER) {
    172         sendItCompressed=true;
    173     }
    174 
    175     if(sendItCompressed) {
    176         char *out;
    177         ssize_t out_size;
    178         if(compressBuffer(tosend, size,&out,&out_size, 9)==Z_OK) {
    179             size=out_size;
    180             nb_write = UDT::sendmsg(socket_fd,out,size, -1, true);
    181             free(out);
     169  bool sendItCompressed = false;
     170  if (buf[0] == XML_HEADER) {
     171    sendItCompressed = true;
     172  }
     173
     174  if (sendItCompressed) {
     175    char *out;
     176    ssize_t out_size;
     177    if (compressBuffer(tosend, size, &out, &out_size, 9) == Z_OK) {
     178      size = out_size;
     179      nb_write = UDT::sendmsg(socket_fd, out, size, -1, true);
     180      free(out);
     181    } else {
     182      Warn("Compress error, sending it uncompressed\n");
     183      sendItCompressed = false;
     184    }
     185  }
     186
     187  if (!sendItCompressed) {
     188    nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
     189  }
     190
     191#else // COMPRESS_FRAMES
     192  nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
     193#endif // COMPRESS_FRAMES
     194  // Printf("write %i %i\n",nb_write,size);
     195  if (nb_write < 0) {
     196    Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
     197    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
     198        UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
     199      connection_lost = true;
     200    }
     201  } else if (nb_write != size) {
     202    Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
     203        UDT::getlasterror().getErrorCode(), nb_write, size);
     204  }
     205#endif //__XENO__
     206}
     207
     208ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
     209  ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
     210
     211  if (bytesRead < 0) {
     212    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
     213      Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
     214      connection_lost = true;
     215    }
     216  }
     217
     218  return bytesRead;
     219}
     220
     221void ui_com::Run(void) {
     222  // check endianness
     223  char header;
     224  if (IsBigEndian()) {
     225    header = DATAS_BIG_ENDIAN;
     226    Printf("System is big endian\n");
     227  } else {
     228    header = DATAS_LITTLE_ENDIAN;
     229    Printf("System is little endian\n");
     230  }
     231
     232#ifdef __XENO__
     233  WarnUponSwitches(true);
     234  printf("\n"); // a revoir pourquoi??
     235// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
     236#endif
     237  // on attend d'avoir des choses à faire
     238  Suspend();
     239
     240  while (!ToBeStopped()) {
     241    size_t resume_id;
     242    Time min = 0xffffffffffffffffULL;
     243
     244    // on recpuere l'id de la prochaine execution
     245    send_mutex->GetMutex();
     246    resume_id = resume_time.size();
     247    for (size_t i = 0; i < resume_time.size(); i++) {
     248      if (resume_time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) {
     249        min = resume_time.at(i);
     250        resume_id = i;
     251      }
     252    }
     253
     254    // attente
     255    if (resume_id < resume_time.size()) {
     256      Time time = resume_time.at(resume_id);
     257      uint16_t resume_period = data_to_send.at(resume_id)->SendPeriod();
     258      send_mutex->ReleaseMutex();
     259      // on dort jusqu'a la prochaine execution
     260      SleepUntil(time);
     261
     262      // envoi des donnees
     263      int offset = 3;
     264      send_buffer[0] = header;
     265      send_mutex->GetMutex();
     266
     267      for (size_t i = 0; i < data_to_send.size(); i++) {
     268        if (data_to_send.at(i)->SendPeriod() == resume_period &&
     269            data_to_send.at(i)->IsEnabled() == true) {
     270          data_to_send.at(i)->CopyDatas(send_buffer + offset);
     271          offset += data_to_send.at(i)->SendSize();
     272        }
     273      }
     274      if (offset != 3) {
     275        memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t));
     276        // printf("send %i %i %i %x
     277        // %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
     278        // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
     279        // printf("\n");
     280        Send(send_buffer, offset);
     281      }
     282
     283      // on planifie la prochaine execution
     284      for (size_t i = 0; i < data_to_send.size(); i++) {
     285        if (data_to_send.at(i)->SendPeriod() == resume_period) {
     286          resume_time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000;
     287        }
     288      }
     289      send_mutex->ReleaseMutex();
     290      // Printf("%i %lld\n",resume_period,GetTime()/1000000);
     291    } else {
     292      send_mutex->ReleaseMutex();
     293      // rien a faire, suspend
     294      // Printf("rien a faire suspend\n");
     295      Suspend();
     296      // Printf("wake\n");
     297      // on planifie la prochaine execution
     298      Time time = GetTime();
     299      send_mutex->GetMutex();
     300      for (size_t i = 0; i < data_to_send.size(); i++) {
     301        resume_time.at(i) =
     302            time + (Time)data_to_send.at(i)->SendPeriod() * 1000000;
     303      }
     304      send_mutex->ReleaseMutex();
     305    }
     306  }
     307}
     308#ifdef __XENO__
     309void *ui_com::user_thread(void *arg) {
     310  int pipe_fd = -1;
     311  string devname;
     312  char *buf = NULL;
     313  ui_com *caller = (ui_com *)arg;
     314  int rv;
     315  fd_set set;
     316  struct timeval timeout;
     317  ssize_t nb_read, nb_write;
     318
     319  buf = (char *)malloc(NRT_PIPE_SIZE);
     320  if (buf == NULL) {
     321    caller->Err("malloc error\n");
     322  }
     323
     324  devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
     325            caller->ObjectName() + "-pipe";
     326  while (pipe_fd < 0) {
     327    pipe_fd = open(devname.c_str(), O_RDWR);
     328    if (pipe_fd < 0 && errno != ENOENT)
     329      caller->Err("open pipe_fd error (%s)\n", strerror(-errno));
     330    usleep(1000);
     331  }
     332
     333  while (1) {
     334    FD_ZERO(&set);         // clear the set
     335    FD_SET(pipe_fd, &set); // add our file descriptor to the set
     336
     337    timeout.tv_sec = 0;
     338    timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
     339
     340    rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
     341
     342    if (rv == -1) {
     343      if (caller->is_running == false &&
     344          UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
     345        break; // timeout
     346      if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
     347        caller->Err("epoll_wait, %s, code %i\n",
     348                    UDT::getlasterror().getErrorMessage(),
     349                    UDT::getlasterror().getErrorCode());
     350    } else if (rv == 0) {
     351      // printf("timeout\n"); // a timeout occured
     352      if (caller->is_running == false)
     353        break;
     354
     355    } else {
     356      nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
     357      buf[nb_read] = 0;
     358// printf("envoi\n%s\n",buf);
     359
     360#ifdef COMPRESS_FRAMES
     361      char *out;
     362      ssize_t out_size;
     363
     364      if (buf[0] == XML_HEADER) {
     365        if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) {
     366          nb_read = out_size;
     367          nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true);
     368          free(out);
    182369        } else {
    183             Warn("Compress error, sending it uncompressed\n");
    184             sendItCompressed=false;
     370          caller->Warn("Compress error, sending it uncompressed\n");
     371          nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
    185372        }
    186     }
    187 
    188     if(!sendItCompressed) {
    189         nb_write = UDT::sendmsg(socket_fd,tosend,size, -1, true);
    190     }
    191 
    192 #else //COMPRESS_FRAMES
    193     nb_write = UDT::sendmsg(socket_fd,tosend,size, -1, true);
    194 #endif //COMPRESS_FRAMES
    195 //Printf("write %i %i\n",nb_write,size);
    196     if(nb_write<0) {
    197         Err("UDT::sendmsg error (%s)\n",UDT::getlasterror().getErrorMessage());
    198         if(UDT::getlasterror().getErrorCode()==CUDTException::ECONNLOST || UDT::getlasterror().getErrorCode()==CUDTException::EINVSOCK) {
    199             connection_lost=true;
     373      } else {
     374        nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
     375      }
     376#else
     377      nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
     378#endif
     379
     380      if (nb_write < 0) {
     381        caller->Err("UDT::sendmsg error (%s)\n",
     382                    UDT::getlasterror().getErrorMessage());
     383        if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
     384            UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
     385          caller->connection_lost = true;
    200386        }
    201     } else if (nb_write != size) {
    202         Err("%s, code %i (%ld/%ld)\n",UDT::getlasterror().getErrorMessage(),UDT::getlasterror().getErrorCode(),nb_write,size);
    203     }
    204 #endif //__XENO__
    205 }
    206 
    207 ssize_t ui_com::Receive(char* buf,ssize_t buf_size) {
    208     ssize_t bytesRead= UDT::recvmsg(socket_fd,buf,buf_size);
    209 
    210     if(bytesRead<0) {
    211         if(UDT::getlasterror().getErrorCode()==CUDTException::ECONNLOST) {
    212             Err("UDT::recvmsg error (%s)\n",UDT::getlasterror().getErrorMessage());
    213             connection_lost=true;
    214         }
    215     }
    216 
    217     return bytesRead;
    218 }
    219 
    220 void ui_com::Run(void)
    221 {
    222     //check endianness
    223     char header;
    224     if(IsBigEndian())
    225     {
    226         header=DATAS_BIG_ENDIAN;
    227         Printf("System is big endian\n");
    228     }
    229     else
    230     {
    231         header=DATAS_LITTLE_ENDIAN;
    232         Printf("System is little endian\n");
    233     }
    234 
    235 #ifdef __XENO__
    236     WarnUponSwitches(true);
    237     printf("\n");//a revoir pourquoi??
    238     //sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
    239 #endif
    240     //on attend d'avoir des choses à faire
    241     Suspend();
    242 
    243     while(!ToBeStopped())
    244     {
    245         size_t resume_id;
    246         Time min=0xffffffffffffffffULL;
    247 
    248         //on recpuere l'id de la prochaine execution
    249         send_mutex->GetMutex();
    250         resume_id=resume_time.size();
    251         for(size_t i=0;i<resume_time.size();i++)
    252         {
    253             if(resume_time.at(i)<min && data_to_send.at(i)->IsEnabled()==true)
    254             {
    255                 min=resume_time.at(i);
    256                 resume_id=i;
    257             }
    258         }
    259 
    260         //attente
    261         if(resume_id<resume_time.size())
    262         {
    263             Time time=resume_time.at(resume_id);
    264             uint16_t resume_period=data_to_send.at(resume_id)->SendPeriod();
    265             send_mutex->ReleaseMutex();
    266             //on dort jusqu'a la prochaine execution
    267             SleepUntil(time);
    268 
    269             //envoi des donnees
    270             int offset=3;
    271             send_buffer[0]=header;
    272             send_mutex->GetMutex();
    273 
    274             for(size_t i=0;i<data_to_send.size();i++) {
    275                 if(data_to_send.at(i)->SendPeriod()==resume_period && data_to_send.at(i)->IsEnabled()==true) {
    276                     data_to_send.at(i)->CopyDatas(send_buffer+offset);
    277                     offset+=data_to_send.at(i)->SendSize();
    278                 }
    279             }
    280             if(offset!=3) {
    281                 memcpy(&send_buffer[1],&resume_period,sizeof(uint16_t));
    282                 //printf("send %i %i %i %x %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
    283                 //for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
    284                 //printf("\n");
    285                 Send(send_buffer,offset);
    286             }
    287 
    288             //on planifie la prochaine execution
    289             for(size_t i=0;i<data_to_send.size();i++)
    290             {
    291                 if(data_to_send.at(i)->SendPeriod()==resume_period)
    292                 {
    293                     resume_time.at(i)+=data_to_send.at(i)->SendPeriod()*1000000;
    294                 }
    295             }
    296             send_mutex->ReleaseMutex();
    297             //Printf("%i %lld\n",resume_period,GetTime()/1000000);
    298         }
    299         else
    300         {
    301             send_mutex->ReleaseMutex();
    302             //rien a faire, suspend
    303             //Printf("rien a faire suspend\n");
    304             Suspend();
    305             //Printf("wake\n");
    306             //on planifie la prochaine execution
    307             Time time=GetTime();
    308             send_mutex->GetMutex();
    309             for(size_t i=0;i<data_to_send.size();i++)
    310             {
    311                 resume_time.at(i)=time+(Time)data_to_send.at(i)->SendPeriod()*1000000;
    312             }
    313             send_mutex->ReleaseMutex();
    314         }
    315     }
    316 }
    317 #ifdef __XENO__
    318 void* ui_com::user_thread(void * arg)
    319 {
    320     int pipe_fd=-1;
    321     string devname;
    322     char* buf=NULL;
    323     ui_com *caller = (ui_com*)arg;
    324     int rv;
    325     fd_set set;
    326     struct timeval timeout;
    327     ssize_t nb_read,nb_write;
    328 
    329     buf=(char*)malloc(NRT_PIPE_SIZE);
    330     if(buf==NULL)
    331     {
    332         caller->Err("malloc error\n");
    333     }
    334 
    335     devname= NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" + caller->ObjectName()+ "-pipe";
    336     while(pipe_fd<0)
    337     {
    338         pipe_fd = open(devname.c_str(), O_RDWR);
    339         if (pipe_fd < 0 && errno!=ENOENT) caller->Err("open pipe_fd error (%s)\n",strerror(-errno));
    340         usleep(1000);
    341     }
    342 
    343     while(1)
    344     {
    345         FD_ZERO(&set); // clear the set
    346         FD_SET(pipe_fd, &set); // add our file descriptor to the set
    347 
    348         timeout.tv_sec = 0;
    349         timeout.tv_usec = SELECT_TIMEOUT_MS*1000;
    350 
    351         rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
    352 
    353         if(rv == -1)
    354         {
    355             if(caller->is_running==false && UDT::getlasterror().getErrorCode()==CUDTException::ETIMEOUT) break;//timeout
    356             if(UDT::getlasterror().getErrorCode()!=CUDTException::ETIMEOUT) caller->Err("epoll_wait, %s, code %i\n",UDT::getlasterror().getErrorMessage(),UDT::getlasterror().getErrorCode());
    357         }
    358         else if(rv == 0)
    359         {
    360             //printf("timeout\n"); // a timeout occured
    361             if(caller->is_running==false) break;
    362 
    363         }
    364         else
    365         {
    366             nb_read=read(pipe_fd,buf,NRT_PIPE_SIZE);
    367             buf[nb_read]=0;
    368             //printf("envoi\n%s\n",buf);
    369 
    370 #ifdef COMPRESS_FRAMES
    371             char *out;
    372             ssize_t out_size;
    373 
    374             if(buf[0]==XML_HEADER) {
    375                 if(compressBuffer(buf, nb_read,&out,&out_size, 9)==Z_OK) {
    376                     nb_read=out_size;
    377                     nb_write = UDT::sendmsg(caller->socket_fd,out,out_size, -1, true);
    378                     free(out);
    379                 } else {
    380                     caller->Warn("Compress error, sending it uncompressed\n");
    381                     nb_write = UDT::sendmsg(caller->socket_fd, buf,nb_read, -1, true);
    382                 }
    383             } else {
    384                 nb_write = UDT::sendmsg(caller->socket_fd, buf,nb_read, -1, true);
    385             }
    386 #else
    387             nb_write = UDT::sendmsg(caller->socket_fd, buf,nb_read, -1, true);
    388 #endif
    389 
    390             if(nb_write<0)
    391             {
    392                 caller->Err("UDT::sendmsg error (%s)\n",UDT::getlasterror().getErrorMessage());
    393                 if(UDT::getlasterror().getErrorCode()==CUDTException::ECONNLOST || UDT::getlasterror().getErrorCode()==CUDTException::EINVSOCK)
    394                 {
    395                     caller->connection_lost=true;
    396                 }
    397             }
    398             else if (nb_write != nb_read)
    399             {
    400                 caller->Err("UDT::sendmsg error %i/%i\n",nb_write,nb_read);
    401             }
    402         }
    403     }
    404 
    405     close(pipe_fd);
    406     if(buf!=NULL) free(buf);
    407     pthread_exit(0);
    408 }
    409 #endif
    410 
    411 int ui_com::compressBuffer(char *in, ssize_t in_size,char **out,ssize_t *out_size, int level)
    412 {
    413     int ret, flush;
    414     unsigned have;
    415     z_stream strm;
    416 
    417     /* allocate deflate state */
    418     strm.zalloc = Z_NULL;
    419     strm.zfree = Z_NULL;
    420     strm.opaque = Z_NULL;
    421     ret = deflateInit(&strm, level);
    422     if (ret != Z_OK)
    423         return ret;
    424 
    425     *out=(char*)malloc(COMPRESS_CHUNK);
    426     if(!(*out))
    427         return Z_BUF_ERROR;
    428 
    429     strm.next_in = (unsigned char*)in;
    430     strm.avail_out =  COMPRESS_CHUNK;
    431     strm.next_out = (unsigned char*)*out;
    432     strm.avail_in=in_size;
    433     flush=Z_FINISH;
    434 
    435     ret = deflate(&strm, flush);    /* no bad return value */
    436     if (ret == Z_STREAM_ERROR) {
    437         free(*out);
    438         return ret;
    439     }
    440 
    441     have = COMPRESS_CHUNK - strm.avail_out;
    442     *out_size=have;
    443 //printf("%i -> %i\n",in_size,have);
    444     /* clean up and return */
    445     (void)deflateEnd(&strm);
    446 
    447     if(strm.avail_out!=0) {
    448         return Z_OK;
    449     } else {
    450         return Z_STREAM_ERROR;
    451     }
    452 }
    453 
    454 int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,unsigned char **out,ssize_t *out_size)
    455 {
    456     int ret;
    457     //unsigned have;
    458     z_stream strm;
    459 
    460     /* allocate inflate state */
    461     strm.zalloc = Z_NULL;
    462     strm.zfree = Z_NULL;
    463     strm.opaque = Z_NULL;
    464     strm.avail_in = 0;
    465     strm.next_in = Z_NULL;
    466     ret = inflateInit(&strm);
    467     if (ret != Z_OK)
    468         return ret;
    469 
    470     *out=(unsigned char*)malloc(COMPRESS_CHUNK);
    471     if(!(*out))
    472         return Z_BUF_ERROR;
    473 
    474     strm.avail_in = in_size;
    475     strm.next_in = in;
    476     strm.avail_out = COMPRESS_CHUNK;
    477     strm.next_out = *out;
    478 
    479     ret = inflate(&strm, Z_NO_FLUSH);
    480     assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
    481     switch (ret) {
    482     case Z_NEED_DICT:
    483         ret = Z_DATA_ERROR;     /* and fall through */
    484     case Z_DATA_ERROR:
    485     case Z_MEM_ERROR:
    486         (void)inflateEnd(&strm);
    487         return ret;
    488     }
    489     //have = COMPRESS_CHUNK - strm.avail_out;
    490 
    491     /* clean up and return */
     387      } else if (nb_write != nb_read) {
     388        caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read);
     389      }
     390    }
     391  }
     392
     393  close(pipe_fd);
     394  if (buf != NULL)
     395    free(buf);
     396  pthread_exit(0);
     397}
     398#endif
     399
     400int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
     401                           ssize_t *out_size, int level) {
     402  int ret, flush;
     403  unsigned have;
     404  z_stream strm;
     405
     406  /* allocate deflate state */
     407  strm.zalloc = Z_NULL;
     408  strm.zfree = Z_NULL;
     409  strm.opaque = Z_NULL;
     410  ret = deflateInit(&strm, level);
     411  if (ret != Z_OK)
     412    return ret;
     413
     414  *out = (char *)malloc(COMPRESS_CHUNK);
     415  if (!(*out))
     416    return Z_BUF_ERROR;
     417
     418  strm.next_in = (unsigned char *)in;
     419  strm.avail_out = COMPRESS_CHUNK;
     420  strm.next_out = (unsigned char *)*out;
     421  strm.avail_in = in_size;
     422  flush = Z_FINISH;
     423
     424  ret = deflate(&strm, flush); /* no bad return value */
     425  if (ret == Z_STREAM_ERROR) {
     426    free(*out);
     427    return ret;
     428  }
     429
     430  have = COMPRESS_CHUNK - strm.avail_out;
     431  *out_size = have;
     432  // printf("%i -> %i\n",in_size,have);
     433  /* clean up and return */
     434  (void)deflateEnd(&strm);
     435
     436  if (strm.avail_out != 0) {
     437    return Z_OK;
     438  } else {
     439    return Z_STREAM_ERROR;
     440  }
     441}
     442
     443int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
     444                             unsigned char **out, ssize_t *out_size) {
     445  int ret;
     446  // unsigned have;
     447  z_stream strm;
     448
     449  /* allocate inflate state */
     450  strm.zalloc = Z_NULL;
     451  strm.zfree = Z_NULL;
     452  strm.opaque = Z_NULL;
     453  strm.avail_in = 0;
     454  strm.next_in = Z_NULL;
     455  ret = inflateInit(&strm);
     456  if (ret != Z_OK)
     457    return ret;
     458
     459  *out = (unsigned char *)malloc(COMPRESS_CHUNK);
     460  if (!(*out))
     461    return Z_BUF_ERROR;
     462
     463  strm.avail_in = in_size;
     464  strm.next_in = in;
     465  strm.avail_out = COMPRESS_CHUNK;
     466  strm.next_out = *out;
     467
     468  ret = inflate(&strm, Z_NO_FLUSH);
     469  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
     470  switch (ret) {
     471  case Z_NEED_DICT:
     472    ret = Z_DATA_ERROR; /* and fall through */
     473  case Z_DATA_ERROR:
     474  case Z_MEM_ERROR:
    492475    (void)inflateEnd(&strm);
    493     return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
    494 }
    495 
    496 bool ui_com::ConnectionLost()
    497 {
    498     return connection_lost;
    499 }
    500 
    501 void ui_com::AddSendData(const SendData *obj)
    502 {
    503     send_mutex->GetMutex();
    504 
    505     resume_time.push_back(GetTime()+(Time)obj->SendPeriod()*1000000);
    506 
    507     //on resume en meme temps tout ceux qui ont la meme periode
    508     for(size_t i=0;i<data_to_send.size();i++)
    509     {
    510         if(data_to_send.at(i)->SendPeriod()==obj->SendPeriod())
    511         {
    512             resume_time.at(resume_time.size()-1)=resume_time.at(i);
    513             break;
    514         }
    515     }
    516 
    517     data_to_send.push_back(obj);
    518 
    519     send_mutex->ReleaseMutex();
    520 }
    521 
    522 void ui_com::UpdateSendData(const SendData *obj)
    523 {
    524     //le mutex est deja pris par l'appellant
    525     size_t id,i;
    526 
    527     //on recupere l'id
    528     for(i=0;i<data_to_send.size();i++)
    529     {
    530         if(data_to_send.at(i)==obj)
    531         {
    532             id=i;
    533             break;
    534         }
    535     }
    536 
    537     //on resume en meme temps tout ceux qui ont la meme periode
    538     for(i=0;i<data_to_send.size();i++)
    539     {
    540         if(i==id) continue;
    541         if(data_to_send.at(i)->IsEnabled()==true && data_to_send.at(i)->SendPeriod()==obj->SendPeriod())
    542         {
    543             resume_time.at(id)=resume_time.at(i);
    544             break;
    545         }
    546     }
    547 
    548     //si aucun match, on planifie l'execution
    549     if(i==data_to_send.size()) resume_time.at(id)=GetTime()+(Time)obj->SendPeriod()*1000000;
    550 
    551     if(IsSuspended()==true) Resume();
    552 
    553     return;
    554 }
    555 
    556 void ui_com::UpdateDataToSendSize(void)
    557 {
    558     send_mutex->GetMutex();
    559     send_size=3;//id(1)+period(2)
    560     for(size_t i=0;i<data_to_send.size();i++)
    561     {
    562         if(data_to_send[i]!=NULL) send_size+=data_to_send[i]->SendSize();
    563     }
    564 
    565     //send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
    566     if(send_buffer!=NULL) free(send_buffer);
    567     send_buffer=(char*)malloc(send_size*sizeof(char));
    568     send_mutex->ReleaseMutex();
    569 }
    570 
    571 void ui_com::RemoveSendData(const SendData *obj)
    572 {
    573     //printf("remove_data_to_send %i\n",data_to_send.size());
    574 
    575     send_mutex->GetMutex();
    576     //on recupere l'id
    577     for(size_t i=0;i<data_to_send.size();i++)
    578     {
    579         if(data_to_send.at(i)==obj)
    580         {
    581             data_to_send.erase (data_to_send.begin()+i);
    582             resume_time.erase (resume_time.begin()+i);
    583             //printf("remove_data_to_send %i ok\n",data_to_send.size());
    584             break;
    585         }
    586     }
    587     send_mutex->ReleaseMutex();
    588 
    589     return;
    590 }
    591 
    592 void ui_com::Block(void)
    593 {
    594     send_mutex->GetMutex();
    595 }
    596 
    597 void ui_com::UnBlock(void)
    598 {
    599     send_mutex->ReleaseMutex();
    600 }
     476    return ret;
     477  }
     478  // have = COMPRESS_CHUNK - strm.avail_out;
     479
     480  /* clean up and return */
     481  (void)inflateEnd(&strm);
     482  return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
     483}
     484
     485bool ui_com::ConnectionLost() { return connection_lost; }
     486
     487void ui_com::AddSendData(const SendData *obj) {
     488  send_mutex->GetMutex();
     489
     490  resume_time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
     491
     492  // on resume en meme temps tout ceux qui ont la meme periode
     493  for (size_t i = 0; i < data_to_send.size(); i++) {
     494    if (data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
     495      resume_time.at(resume_time.size() - 1) = resume_time.at(i);
     496      break;
     497    }
     498  }
     499
     500  data_to_send.push_back(obj);
     501
     502  send_mutex->ReleaseMutex();
     503}
     504
     505void ui_com::UpdateSendData(const SendData *obj) {
     506  // le mutex est deja pris par l'appellant
     507  size_t id, i;
     508
     509  // on recupere l'id
     510  for (i = 0; i < data_to_send.size(); i++) {
     511    if (data_to_send.at(i) == obj) {
     512      id = i;
     513      break;
     514    }
     515  }
     516
     517  // on resume en meme temps tout ceux qui ont la meme periode
     518  for (i = 0; i < data_to_send.size(); i++) {
     519    if (i == id)
     520      continue;
     521    if (data_to_send.at(i)->IsEnabled() == true &&
     522        data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
     523      resume_time.at(id) = resume_time.at(i);
     524      break;
     525    }
     526  }
     527
     528  // si aucun match, on planifie l'execution
     529  if (i == data_to_send.size())
     530    resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
     531
     532  if (IsSuspended() == true)
     533    Resume();
     534
     535  return;
     536}
     537
     538void ui_com::UpdateDataToSendSize(void) {
     539  send_mutex->GetMutex();
     540  send_size = 3; // id(1)+period(2)
     541  for (size_t i = 0; i < data_to_send.size(); i++) {
     542    if (data_to_send[i] != NULL)
     543      send_size += data_to_send[i]->SendSize();
     544  }
     545
     546  // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
     547  if (send_buffer != NULL)
     548    free(send_buffer);
     549  send_buffer = (char *)malloc(send_size * sizeof(char));
     550  send_mutex->ReleaseMutex();
     551}
     552
     553void ui_com::RemoveSendData(const SendData *obj) {
     554  // printf("remove_data_to_send %i\n",data_to_send.size());
     555
     556  send_mutex->GetMutex();
     557  // on recupere l'id
     558  for (size_t i = 0; i < data_to_send.size(); i++) {
     559    if (data_to_send.at(i) == obj) {
     560      data_to_send.erase(data_to_send.begin() + i);
     561      resume_time.erase(resume_time.begin() + i);
     562      // printf("remove_data_to_send %i ok\n",data_to_send.size());
     563      break;
     564    }
     565  }
     566  send_mutex->ReleaseMutex();
     567
     568  return;
     569}
     570
     571void ui_com::Block(void) { send_mutex->GetMutex(); }
     572
     573void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracChangeset for help on using the changeset viewer.