source: flair-src/trunk/lib/FlairCore/src/ui_com.cpp @ 67

Last change on this file since 67 was 15, checked in by Bayard Gildas, 6 years ago

sources reformatted with flair-format-dir script

File size: 15.0 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5//  created:    2011/05/01
6//  filename:   ui_com.cpp
7//
8//  author:     Guillaume Sanahuja
9//              Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11//  version:    $Id: $
12//
13//  purpose:    classe permettant la lecture et l'ecriture sur socket UDT
14//
15//
16/*********************************************************************/
17
18#include "ui_com.h"
19#include <cstdlib>
20#include <string.h>
21#include <unistd.h>
22#include "Mutex.h"
23#include "SendData.h"
24#include "communication.h"
25#include "FrameworkManager.h"
26#include "zlib.h"
27#include <assert.h>
28#include "config.h"
29
30#ifdef __XENO__
31#include <pthread.h>
32#include <native/task.h>
33#include <native/task.h>
34#include <fcntl.h>
35#endif
36
37using std::string;
38using namespace flair::core;
39using namespace flair::gui;
40
41ui_com::ui_com(const Object *parent, UDTSOCKET sock)
42    : Thread(parent, "send", 2) {
43  // buffer envoi
44  send_buffer = (char *)malloc(3);
45  send_size = 3; // id(1)+period(2)
46
47  // mutex
48  send_mutex = NULL;
49  send_mutex = new Mutex(this, ObjectName());
50
51  socket_fd = sock;
52  connection_lost = false;
53
54#ifdef __XENO__
55  int status;
56  string tmp_name;
57
58  is_running = true;
59
60  // pipe
61  tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
62  // xenomai limitation
63  if (tmp_name.size() > 31)
64    Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
65#ifdef RT_PIPE_SIZE
66  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
67#else
68  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
69#endif
70
71  if (status != 0) {
72    Err("rt_pipe_create error (%s)\n", strerror(-status));
73    // return -1;
74  }
75
76// start user side thread
77#ifdef NRT_STACK_SIZE
78  // Initialize thread creation attributes
79  pthread_attr_t attr;
80  if (pthread_attr_init(&attr) != 0) {
81    Err("pthread_attr_init error\n");
82  }
83
84  if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
85    Err("pthread_attr_setstacksize error\n");
86  }
87
88  if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
89#else  // NRT_STACK_SIZE
90  if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
91#endif // NRT_STACK_SIZE
92  {
93    Err("pthread_create error\n");
94    // return -1;
95  }
96#ifdef NRT_STACK_SIZE
97  if (pthread_attr_destroy(&attr) != 0) {
98    Err("pthread_attr_destroy error\n");
99  }
100#endif
101
102#endif //__XENO__
103  int timeout = 100;
104  if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
105    Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
106
107  bool blocking = true;
108  if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
109    Err("UDT::setsockopt error (UDT_SNDSYN)\n");
110
111  if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
112    Err("UDT::setsockopt error (UDT_RCVSYN)\n");
113  //#endif //__XENO__
114
115  Start();
116}
117
118ui_com::~ui_com() {
119// printf("destruction ui_com\n");
120
121#ifdef __XENO__
122  is_running = false;
123
124  pthread_join(thread, NULL);
125
126  int status = rt_pipe_delete(&pipe);
127  if (status != 0)
128    Err("rt_pipe_delete error (%s)\n", strerror(-status));
129#endif
130
131  SafeStop();
132
133  if (IsSuspended() == true)
134    Resume();
135
136  Join();
137
138  if (send_buffer != NULL)
139    free(send_buffer);
140  send_buffer = NULL;
141
142  // printf("destruction ui_com ok\n");
143}
144
145void ui_com::Send(char *buf, ssize_t size) {
146  if (connection_lost == true)
147    return;
148
149  char *tosend = buf;
150  ssize_t nb_write;
151
152  if (buf[0] == XML_HEADER) {
153    // cut xml header
154    tosend = strstr(buf, "<root");
155    size -= tosend - buf;
156  }
157
158#ifdef __XENO__
159  nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
160
161  if (nb_write < 0) {
162    Err("rt_pipe_write error (%s)\n", strerror(-nb_write));
163  } else if (nb_write != size) {
164    Err("rt_pipe_write error %i/%i\n", nb_write, size);
165  }
166#else //__XENO__
167
168#ifdef COMPRESS_FRAMES
169  bool sendItCompressed = false;
170  if (buf[0] == XML_HEADER) {
171    sendItCompressed = true;
172  }
173
174  if (sendItCompressed) {
175    char *out;
176    ssize_t out_size;
177    if (compressBuffer(tosend, size, &out, &out_size, 9) == Z_OK) {
178      size = out_size;
179      nb_write = UDT::sendmsg(socket_fd, out, size, -1, true);
180      free(out);
181    } else {
182      Warn("Compress error, sending it uncompressed\n");
183      sendItCompressed = false;
184    }
185  }
186
187  if (!sendItCompressed) {
188    nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
189  }
190
191#else // COMPRESS_FRAMES
192  nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
193#endif // COMPRESS_FRAMES
194  // Printf("write %i %i\n",nb_write,size);
195  if (nb_write < 0) {
196    Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
197    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
198        UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
199      connection_lost = true;
200    }
201  } else if (nb_write != size) {
202    Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
203        UDT::getlasterror().getErrorCode(), nb_write, size);
204  }
205#endif //__XENO__
206}
207
208ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
209  ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
210
211  if (bytesRead < 0) {
212    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
213      Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
214      connection_lost = true;
215    }
216  }
217
218  return bytesRead;
219}
220
221void ui_com::Run(void) {
222  // check endianness
223  char header;
224  if (IsBigEndian()) {
225    header = DATAS_BIG_ENDIAN;
226    Printf("System is big endian\n");
227  } else {
228    header = DATAS_LITTLE_ENDIAN;
229    Printf("System is little endian\n");
230  }
231
232#ifdef __XENO__
233  WarnUponSwitches(true);
234  printf("\n"); // a revoir pourquoi??
235// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
236#endif
237  // on attend d'avoir des choses à faire
238  Suspend();
239
240  while (!ToBeStopped()) {
241    size_t resume_id;
242    Time min = 0xffffffffffffffffULL;
243
244    // on recpuere l'id de la prochaine execution
245    send_mutex->GetMutex();
246    resume_id = resume_time.size();
247    for (size_t i = 0; i < resume_time.size(); i++) {
248      if (resume_time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) {
249        min = resume_time.at(i);
250        resume_id = i;
251      }
252    }
253
254    // attente
255    if (resume_id < resume_time.size()) {
256      Time time = resume_time.at(resume_id);
257      uint16_t resume_period = data_to_send.at(resume_id)->SendPeriod();
258      send_mutex->ReleaseMutex();
259      // on dort jusqu'a la prochaine execution
260      SleepUntil(time);
261
262      // envoi des donnees
263      int offset = 3;
264      send_buffer[0] = header;
265      send_mutex->GetMutex();
266
267      for (size_t i = 0; i < data_to_send.size(); i++) {
268        if (data_to_send.at(i)->SendPeriod() == resume_period &&
269            data_to_send.at(i)->IsEnabled() == true) {
270          data_to_send.at(i)->CopyDatas(send_buffer + offset);
271          offset += data_to_send.at(i)->SendSize();
272        }
273      }
274      if (offset != 3) {
275        memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t));
276        // printf("send %i %i %i %x
277        // %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
278        // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
279        // printf("\n");
280        Send(send_buffer, offset);
281      }
282
283      // on planifie la prochaine execution
284      for (size_t i = 0; i < data_to_send.size(); i++) {
285        if (data_to_send.at(i)->SendPeriod() == resume_period) {
286          resume_time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000;
287        }
288      }
289      send_mutex->ReleaseMutex();
290      // Printf("%i %lld\n",resume_period,GetTime()/1000000);
291    } else {
292      send_mutex->ReleaseMutex();
293      // rien a faire, suspend
294      // Printf("rien a faire suspend\n");
295      Suspend();
296      // Printf("wake\n");
297      // on planifie la prochaine execution
298      Time time = GetTime();
299      send_mutex->GetMutex();
300      for (size_t i = 0; i < data_to_send.size(); i++) {
301        resume_time.at(i) =
302            time + (Time)data_to_send.at(i)->SendPeriod() * 1000000;
303      }
304      send_mutex->ReleaseMutex();
305    }
306  }
307}
308#ifdef __XENO__
309void *ui_com::user_thread(void *arg) {
310  int pipe_fd = -1;
311  string devname;
312  char *buf = NULL;
313  ui_com *caller = (ui_com *)arg;
314  int rv;
315  fd_set set;
316  struct timeval timeout;
317  ssize_t nb_read, nb_write;
318
319  buf = (char *)malloc(NRT_PIPE_SIZE);
320  if (buf == NULL) {
321    caller->Err("malloc error\n");
322  }
323
324  devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
325            caller->ObjectName() + "-pipe";
326  while (pipe_fd < 0) {
327    pipe_fd = open(devname.c_str(), O_RDWR);
328    if (pipe_fd < 0 && errno != ENOENT)
329      caller->Err("open pipe_fd error (%s)\n", strerror(-errno));
330    usleep(1000);
331  }
332
333  while (1) {
334    FD_ZERO(&set);         // clear the set
335    FD_SET(pipe_fd, &set); // add our file descriptor to the set
336
337    timeout.tv_sec = 0;
338    timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
339
340    rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
341
342    if (rv == -1) {
343      if (caller->is_running == false &&
344          UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
345        break; // timeout
346      if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
347        caller->Err("epoll_wait, %s, code %i\n",
348                    UDT::getlasterror().getErrorMessage(),
349                    UDT::getlasterror().getErrorCode());
350    } else if (rv == 0) {
351      // printf("timeout\n"); // a timeout occured
352      if (caller->is_running == false)
353        break;
354
355    } else {
356      nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
357      buf[nb_read] = 0;
358// printf("envoi\n%s\n",buf);
359
360#ifdef COMPRESS_FRAMES
361      char *out;
362      ssize_t out_size;
363
364      if (buf[0] == XML_HEADER) {
365        if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) {
366          nb_read = out_size;
367          nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true);
368          free(out);
369        } else {
370          caller->Warn("Compress error, sending it uncompressed\n");
371          nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
372        }
373      } else {
374        nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
375      }
376#else
377      nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
378#endif
379
380      if (nb_write < 0) {
381        caller->Err("UDT::sendmsg error (%s)\n",
382                    UDT::getlasterror().getErrorMessage());
383        if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
384            UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
385          caller->connection_lost = true;
386        }
387      } else if (nb_write != nb_read) {
388        caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read);
389      }
390    }
391  }
392
393  close(pipe_fd);
394  if (buf != NULL)
395    free(buf);
396  pthread_exit(0);
397}
398#endif
399
400int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
401                           ssize_t *out_size, int level) {
402  int ret, flush;
403  unsigned have;
404  z_stream strm;
405
406  /* allocate deflate state */
407  strm.zalloc = Z_NULL;
408  strm.zfree = Z_NULL;
409  strm.opaque = Z_NULL;
410  ret = deflateInit(&strm, level);
411  if (ret != Z_OK)
412    return ret;
413
414  *out = (char *)malloc(COMPRESS_CHUNK);
415  if (!(*out))
416    return Z_BUF_ERROR;
417
418  strm.next_in = (unsigned char *)in;
419  strm.avail_out = COMPRESS_CHUNK;
420  strm.next_out = (unsigned char *)*out;
421  strm.avail_in = in_size;
422  flush = Z_FINISH;
423
424  ret = deflate(&strm, flush); /* no bad return value */
425  if (ret == Z_STREAM_ERROR) {
426    free(*out);
427    return ret;
428  }
429
430  have = COMPRESS_CHUNK - strm.avail_out;
431  *out_size = have;
432  // printf("%i -> %i\n",in_size,have);
433  /* clean up and return */
434  (void)deflateEnd(&strm);
435
436  if (strm.avail_out != 0) {
437    return Z_OK;
438  } else {
439    return Z_STREAM_ERROR;
440  }
441}
442
443int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
444                             unsigned char **out, ssize_t *out_size) {
445  int ret;
446  // unsigned have;
447  z_stream strm;
448
449  /* allocate inflate state */
450  strm.zalloc = Z_NULL;
451  strm.zfree = Z_NULL;
452  strm.opaque = Z_NULL;
453  strm.avail_in = 0;
454  strm.next_in = Z_NULL;
455  ret = inflateInit(&strm);
456  if (ret != Z_OK)
457    return ret;
458
459  *out = (unsigned char *)malloc(COMPRESS_CHUNK);
460  if (!(*out))
461    return Z_BUF_ERROR;
462
463  strm.avail_in = in_size;
464  strm.next_in = in;
465  strm.avail_out = COMPRESS_CHUNK;
466  strm.next_out = *out;
467
468  ret = inflate(&strm, Z_NO_FLUSH);
469  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
470  switch (ret) {
471  case Z_NEED_DICT:
472    ret = Z_DATA_ERROR; /* and fall through */
473  case Z_DATA_ERROR:
474  case Z_MEM_ERROR:
475    (void)inflateEnd(&strm);
476    return ret;
477  }
478  // have = COMPRESS_CHUNK - strm.avail_out;
479
480  /* clean up and return */
481  (void)inflateEnd(&strm);
482  return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
483}
484
485bool ui_com::ConnectionLost() { return connection_lost; }
486
487void ui_com::AddSendData(const SendData *obj) {
488  send_mutex->GetMutex();
489
490  resume_time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
491
492  // on resume en meme temps tout ceux qui ont la meme periode
493  for (size_t i = 0; i < data_to_send.size(); i++) {
494    if (data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
495      resume_time.at(resume_time.size() - 1) = resume_time.at(i);
496      break;
497    }
498  }
499
500  data_to_send.push_back(obj);
501
502  send_mutex->ReleaseMutex();
503}
504
505void ui_com::UpdateSendData(const SendData *obj) {
506  // le mutex est deja pris par l'appellant
507  size_t id, i;
508
509  // on recupere l'id
510  for (i = 0; i < data_to_send.size(); i++) {
511    if (data_to_send.at(i) == obj) {
512      id = i;
513      break;
514    }
515  }
516
517  // on resume en meme temps tout ceux qui ont la meme periode
518  for (i = 0; i < data_to_send.size(); i++) {
519    if (i == id)
520      continue;
521    if (data_to_send.at(i)->IsEnabled() == true &&
522        data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
523      resume_time.at(id) = resume_time.at(i);
524      break;
525    }
526  }
527
528  // si aucun match, on planifie l'execution
529  if (i == data_to_send.size())
530    resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
531
532  if (IsSuspended() == true)
533    Resume();
534
535  return;
536}
537
538void ui_com::UpdateDataToSendSize(void) {
539  send_mutex->GetMutex();
540  send_size = 3; // id(1)+period(2)
541  for (size_t i = 0; i < data_to_send.size(); i++) {
542    if (data_to_send[i] != NULL)
543      send_size += data_to_send[i]->SendSize();
544  }
545
546  // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
547  if (send_buffer != NULL)
548    free(send_buffer);
549  send_buffer = (char *)malloc(send_size * sizeof(char));
550  send_mutex->ReleaseMutex();
551}
552
553void ui_com::RemoveSendData(const SendData *obj) {
554  // printf("remove_data_to_send %i\n",data_to_send.size());
555
556  send_mutex->GetMutex();
557  // on recupere l'id
558  for (size_t i = 0; i < data_to_send.size(); i++) {
559    if (data_to_send.at(i) == obj) {
560      data_to_send.erase(data_to_send.begin() + i);
561      resume_time.erase(resume_time.begin() + i);
562      // printf("remove_data_to_send %i ok\n",data_to_send.size());
563      break;
564    }
565  }
566  send_mutex->ReleaseMutex();
567
568  return;
569}
570
571void ui_com::Block(void) { send_mutex->GetMutex(); }
572
573void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracBrowser for help on using the repository browser.