source: flair-src/trunk/lib/FlairCore/src/ui_com.cpp @ 443

Last change on this file since 443 was 443, checked in by Sanahuja Guillaume, 3 months ago

update buffering (gcs part)

File size: 21.3 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5//  created:    2011/05/01
6//  filename:   ui_com.cpp
7//
8//  author:     Guillaume Sanahuja
9//              Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11//  version:    $Id: $
12//
13//  purpose:    classe permettant la lecture et l'ecriture sur socket UDT
14//
15//
16/*********************************************************************/
17
18#include "ui_com.h"
19#include <cstdlib>
20#include <string.h>
21#include <unistd.h>
22#include "Mutex.h"
23#include "SendData.h"
24#include "communication.h"
25#include "FrameworkManager.h"
26#include "zlib.h"
27#include <assert.h>
28#include "config.h"
29
30#ifdef __XENO__
31#include <pthread.h>
32#include <native/task.h>
33#include <native/task.h>
34#include <fcntl.h>
35#endif
36
37using std::string;
38using namespace flair::core;
39using namespace flair::gui;
40
41ui_com::ui_com(const Object *parent, UDTSOCKET sock)
42    : Thread(parent, "send", 2,16384*2) {
43  // buffer envoi
44  send_buffer = (char *)malloc(3);
45  send_size = 3; // id(1)+period(2)
46
47  // mutex
48  send_mutex = NULL;
49  send_mutex = new Mutex(this, ObjectName());
50
51  socket_fd = sock;
52  connection_lost = false;
53
54#ifdef __XENO__
55 Err("multi buffering is not well implemented in RT\n");
56#endif
57
58#ifdef __XENO__
59  int status;
60  string tmp_name;
61
62  is_running = true;
63
64  // pipe
65  tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
66  // xenomai limitation
67  if (tmp_name.size() > 31)
68    Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
69#ifdef RT_PIPE_SIZE
70  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
71#else
72  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
73#endif
74
75  if (status != 0) {
76                char errorMsg[256];
77    Err("rt_pipe_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
78    // return -1;
79  }
80
81// start user side thread
82#ifdef NRT_STACK_SIZE
83  // Initialize thread creation attributes
84  pthread_attr_t attr;
85  if (pthread_attr_init(&attr) != 0) {
86    Err("pthread_attr_init error\n");
87  }
88
89  if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
90    Err("pthread_attr_setstacksize error\n");
91  }
92
93  if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
94#else  // NRT_STACK_SIZE
95  if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
96#endif // NRT_STACK_SIZE
97  {
98    Err("pthread_create error\n");
99    // return -1;
100  }
101#ifdef NRT_STACK_SIZE
102  if (pthread_attr_destroy(&attr) != 0) {
103    Err("pthread_attr_destroy error\n");
104  }
105#endif
106
107#endif //__XENO__
108  int timeout = 100;
109  if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
110    Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
111    /*
112  timeout=-1;
113  if (UDT::setsockopt(socket_fd, 0, UDT_SNDTIMEO, &timeout, sizeof(int)) != 0)
114    Err("UDT::setsockopt error (UDT_SNDTIMEO)\n");
115*/
116  bool blocking = true;
117  if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
118    Err("UDT::setsockopt error (UDT_SNDSYN)\n");
119
120  if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
121    Err("UDT::setsockopt error (UDT_RCVSYN)\n");
122  //#endif //__XENO__
123
124  Start();
125}
126
127ui_com::~ui_com() {
128//Printf("destruction ui_com\n");
129
130#ifdef __XENO__
131  is_running = false;
132
133  pthread_join(thread, NULL);
134
135  int status = rt_pipe_delete(&pipe);
136  if (status != 0) {
137                char errorMsg[256];
138    Err("rt_pipe_delete error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
139        }
140#endif
141
142  SafeStop();
143
144  if (IsSuspended() == true)
145    Resume();
146
147  Join();
148 
149  char buf=CLOSING_CONNECTION;
150  Send(&buf,1);
151 
152  if (send_buffer != NULL)
153    free(send_buffer);
154  send_buffer = NULL;
155
156  //Printf("destruction ui_com ok\n");
157}
158
159//send, public part; dispatch to SendNRT (nrt) or rt_pipe (rt)
160//rt_pipe is read by user thread which calls SendNRT
161void ui_com::Send(char *buf, ssize_t size,int ttl) {
162  if (connection_lost == true)
163    return;
164
165  char *tosend = buf;
166  if (buf[0] == XML_HEADER) {
167    // cut xml header
168    tosend = strstr(buf, "<root");
169    size -= tosend - buf;
170  }
171
172#ifdef __XENO__
173  //data buf
174  ssize_t nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
175  if (nb_write < 0) {
176                char errorMsg[256];
177    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
178  } else if (nb_write != size) {
179    Err("rt_pipe_write error %i/%i\n", nb_write, size);
180  }
181  //ttl
182  nb_write = rt_pipe_write(&pipe, &ttl, sizeof(ttl), P_NORMAL);
183  if (nb_write < 0) {
184                char errorMsg[256];
185    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
186  } else if (nb_write != sizeof(ttl)) {
187    Err("rt_pipe_write error %i/%i\n", nb_write, size);
188  }
189#else //__XENO__
190  SendNRT(tosend,size,ttl);
191#endif //__XENO__
192}
193
194//send, private part; called to effectively send to udt
195//this is always in nrt
196void ui_com::SendNRT(char *buf, ssize_t size,int ttl) {
197  ssize_t nb_write;
198  bool sendItCompressed = false;
199 
200#ifdef COMPRESS_FRAMES
201  if (buf[0] == XML_HEADER) {
202    sendItCompressed = true;
203  }
204#endif // COMPRESS_FRAMES
205
206  if (sendItCompressed) {
207    char *out;
208    ssize_t out_size;
209    if (compressBuffer(buf, size, &out, &out_size, 9) == Z_OK) {
210      size = out_size;
211      nb_write = UDT::sendmsg(socket_fd, out, size,ttl, true);
212      free(out);
213    } else {
214      Warn("Compress error, sending it uncompressed\n");
215      sendItCompressed = false;
216    }
217  }
218
219  if (!sendItCompressed) {
220    nb_write = UDT::sendmsg(socket_fd, buf, size, ttl, true);
221  }
222
223   //Printf("write %i %i\n",nb_write,size);
224  if (nb_write < 0) {
225    Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
226    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
227        UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
228      connection_lost = true;
229    }
230  } else if (nb_write != size) {
231    Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
232        UDT::getlasterror().getErrorCode(), nb_write, size);
233  }
234}
235
236ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
237  ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
238
239  if (bytesRead < 0) {
240    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
241      Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
242      connection_lost = true;
243    }
244  }
245
246  return bytesRead;
247}
248
249void ui_com::CheckConnection(void) {
250  int32_t val,len;
251 
252  int result=UDT::getsockopt(socket_fd,0,UDT_STATE,&val,&len); 
253   if (result < 0) {
254    Printf("UDT::getsockopt error (%s)\n", UDT::getlasterror().getErrorMessage());
255   }
256 // printf("opt: %i %i %i\n",result,val,len);
257}
258
259//todo: le run (rt) push les données une par une
260//le nrt recupere et decide quand envoyer :
261// -quand toutes les données d'une meme periode sont recues
262// -quand on a atteind le nb buffering
263// -le nrt alloue dynamiquement le buffer d'envoi
264//-> enelver l'alloc du buffer au startup (UpdateDataToSendSize)
265void ui_com::Run(void) {
266  // check endianness
267  char header;
268  if (IsBigEndian()) {
269    header = DATA_BIG_ENDIAN;
270    Printf("System is big endian\n");
271  } else {
272    header = DATA_LITTLE_ENDIAN;
273    Printf("System is little endian\n");
274  }
275
276#ifdef __XENO__
277  WarnUponSwitches(true);
278  printf("\n"); // a revoir pourquoi??
279// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
280#endif
281  // on attend d'avoir des choses à faire
282  Suspend();
283
284  while (!ToBeStopped()) {
285    size_t resume_id;
286    Time min = 0xffffffffffffffffULL;
287
288    // on recpuere l'id de la prochaine execution
289    send_mutex->GetMutex();
290    resume_id = resumeTimes.size();
291    for (size_t i = 0; i < resumeTimes.size(); i++) {
292      if (resumeTimes.at(i) < min && datasToSend.at(i)->IsEnabled() == true) {
293        min = resumeTimes.at(i);
294        resume_id = i;
295      }
296    }
297
298    // attente
299    if (resume_id < resumeTimes.size()) {
300      Time time = resumeTimes.at(resume_id);
301      uint16_t resume_period = datasToSend.at(resume_id)->SendPeriod();
302      send_mutex->ReleaseMutex();
303      // on dort jusqu'a la prochaine execution
304      SleepUntil(time);
305
306      // envoi des donnees
307      int offset = 3;
308      send_buffer[0] = header;
309      send_mutex->GetMutex();
310
311      for (size_t i = 0; i < datasToSend.size(); i++) {
312        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
313          datasToSend.at(i)->CopyDatas(send_buffer + offset);
314          offset += datasToSend.at(i)->SendSize();
315        }
316      }
317      if (offset != 3) {
318        memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t));
319        //printf("send %i %i %i %x %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
320        // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
321        // printf("\n");
322        //Send(send_buffer, offset,resume_period);
323      }
324
325      //test to push datas
326      for (size_t i = 0; i < datasToSend.size(); i++) {
327        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
328          PushDatasToSend(datasToSend.at(i));
329        }
330      }
331      //end test
332     
333      // on planifie la prochaine execution
334      for (size_t i = 0; i < datasToSend.size(); i++) {
335        if (datasToSend.at(i)->SendPeriod() == resume_period) {
336          resumeTimes.at(i) += datasToSend.at(i)->SendPeriod() * 1000000;
337        }
338      }
339      send_mutex->ReleaseMutex();
340      //Printf("%i %lld\n",resume_period,GetTime()/1000000);
341    } else {
342      send_mutex->ReleaseMutex();
343      // rien a faire, suspend
344      //Printf("rien a faire suspend\n");
345      Suspend();
346      //Printf("wake\n");
347      // on planifie la prochaine execution
348      Time time = GetTime();
349      send_mutex->GetMutex();
350      for (size_t i = 0; i < datasToSend.size(); i++) {
351        resumeTimes.at(i) =
352            time + (Time)datasToSend.at(i)->SendPeriod() * 1000000;
353      }
354      send_mutex->ReleaseMutex();
355    }
356  }
357}
358
359
360//called with send_mutex locked
361//in nrt
362//TODO: RT part has to put datas in pipe, then user thread call this
363void ui_com::PushDatasToSend(const SendData *dataToSend) {
364  PushedData_t* ptr=NULL;
365  //check if we already have one buffer for this couple period/nb_buffering
366  for (size_t i = 0; i < pushedDatas.size(); i++) {
367      if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) {
368          ptr=&pushedDatas.at(i);
369        }
370  }
371  if(ptr==NULL) {
372      Warn("no match in buffers\n");
373      return;
374  }
375 
376  if(ptr->actual_size==0) {
377      if (IsBigEndian()) {
378        ptr->buf[0] = MULTIPLE_DATA_BIG_ENDIAN;
379      } else {
380        ptr->buf[0]  = MULTIPLE_DATA_LITTLE_ENDIAN;
381      }
382      ptr->actual_size+=sizeof(char);
383      memcpy(ptr->buf+ptr->actual_size,&(ptr->period),sizeof(uint16_t));
384      ptr->actual_size+=sizeof(uint16_t);
385      memcpy(ptr->buf+ptr->actual_size,&(ptr->nb_buffering),sizeof(uint16_t));
386      ptr->actual_size+=sizeof(uint16_t);
387  }
388 
389  dataToSend->CopyDatas(ptr->buf+ptr->actual_size);
390  ptr->actual_size+=dataToSend->SendSize();
391  Printf("nb buffered %i/%i\n",ptr->actual_size,ptr->final_size);
392  Printf("pushed size %i period %i nb buffering %i\n",dataToSend->SendSize(),dataToSend->SendPeriod(),dataToSend->NbBuffering());
393  if(ptr->actual_size>=ptr->final_size) {//par securité on test aussi le cas ou supérieur
394      Printf("ready to send\n");
395      SendNRT(ptr->buf, ptr->actual_size,ptr->period);
396      //clean
397      ptr->actual_size=0;
398  }
399}
400
401
402#ifdef __XENO__
403void *ui_com::user_thread(void *arg) {
404  int pipe_fd = -1;
405  string devname;
406  char *buf = NULL;
407  ui_com *caller = (ui_com *)arg;
408  int rv;
409  fd_set set;
410  struct timeval timeout;
411  ssize_t nb_read;
412  int ttl;
413
414  buf = (char *)malloc(NRT_PIPE_SIZE);
415  if (buf == NULL) {
416    caller->Err("malloc error\n");
417  }
418
419  devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
420            caller->ObjectName() + "-pipe";
421  while (pipe_fd < 0) {
422    pipe_fd = open(devname.c_str(), O_RDWR);
423    if (pipe_fd < 0 && errno != ENOENT) {
424                        char errorMsg[256];
425      caller->Err("open pipe_fd error (%s)\n", strerror_r(errno, errorMsg, sizeof(errorMsg)));
426                }
427    usleep(1000);
428  }
429
430  while (1) {
431    FD_ZERO(&set);         // clear the set
432    FD_SET(pipe_fd, &set); // add our file descriptor to the set
433
434    timeout.tv_sec = 0;
435    timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
436
437    rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
438
439    if (rv == -1) {
440      if (caller->is_running == false && UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
441        break; // timeout
442      if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
443        caller->Err("epoll_wait, %s, code %i\n",
444                    UDT::getlasterror().getErrorMessage(), UDT::getlasterror().getErrorCode());
445    } else if (rv == 0) {
446      // printf("timeout\n"); // a timeout occured
447      if (caller->is_running == false)
448        break;
449
450    } else {
451      //read data buf and ttl
452      //RT part has made 2 rt_pipe_write; they should be available in nrt
453      //(select ensure at least one data is ready)
454      nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
455      buf[nb_read] = 0;
456      read(pipe_fd, &ttl, sizeof(ttl));
457// printf("envoi\n%s\n",buf);
458
459      caller->SendNRT(buf, nb_read,ttl);
460    }
461  }
462
463  close(pipe_fd);
464  if (buf != NULL)
465    free(buf);
466  pthread_exit(0);
467}
468#endif
469
470int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
471                           ssize_t *out_size, int level) {
472  int ret, flush;
473  unsigned have;
474  z_stream strm;
475
476  /* allocate deflate state */
477  strm.zalloc = Z_NULL;
478  strm.zfree = Z_NULL;
479  strm.opaque = Z_NULL;
480  ret = deflateInit(&strm, level);
481  if (ret != Z_OK)
482    return ret;
483
484  *out = (char *)malloc(COMPRESS_CHUNK);
485  if (!(*out))
486    return Z_BUF_ERROR;
487
488  strm.next_in = (unsigned char *)in;
489  strm.avail_out = COMPRESS_CHUNK;
490  strm.next_out = (unsigned char *)*out;
491  strm.avail_in = in_size;
492  flush = Z_FINISH;
493
494  ret = deflate(&strm, flush); /* no bad return value */
495  if (ret == Z_STREAM_ERROR) {
496    free(*out);
497    return ret;
498  }
499
500  have = COMPRESS_CHUNK - strm.avail_out;
501  *out_size = have;
502  // printf("%i -> %i\n",in_size,have);
503  /* clean up and return */
504  (void)deflateEnd(&strm);
505
506  if (strm.avail_out != 0) {
507    return Z_OK;
508  } else {
509    return Z_STREAM_ERROR;
510  }
511}
512
513int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
514                             unsigned char **out, ssize_t *out_size) {
515  int ret;
516  // unsigned have;
517  z_stream strm;
518
519  /* allocate inflate state */
520  strm.zalloc = Z_NULL;
521  strm.zfree = Z_NULL;
522  strm.opaque = Z_NULL;
523  strm.avail_in = 0;
524  strm.next_in = Z_NULL;
525  ret = inflateInit(&strm);
526  if (ret != Z_OK)
527    return ret;
528
529  *out = (unsigned char *)malloc(COMPRESS_CHUNK);
530  if (!(*out))
531    return Z_BUF_ERROR;
532
533  strm.avail_in = in_size;
534  strm.next_in = in;
535  strm.avail_out = COMPRESS_CHUNK;
536  strm.next_out = *out;
537
538  ret = inflate(&strm, Z_NO_FLUSH);
539  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
540  switch (ret) {
541  case Z_NEED_DICT:
542    ret = Z_DATA_ERROR; /* and fall through */
543  case Z_DATA_ERROR:
544  case Z_MEM_ERROR:
545    (void)inflateEnd(&strm);
546    return ret;
547  }
548  // have = COMPRESS_CHUNK - strm.avail_out;
549
550  /* clean up and return */
551  (void)inflateEnd(&strm);
552  return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
553}
554
555bool ui_com::ConnectionLost() { return connection_lost; }
556
557void ui_com::AddSendData(const SendData *obj) {
558  send_mutex->GetMutex();
559
560  resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
561
562  // on resume en meme temps tout ceux qui ont la meme periode
563  for (size_t i = 0; i < datasToSend.size(); i++) {
564    if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
565      resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i);
566      break;
567    }
568  }
569
570  datasToSend.push_back(obj);
571
572  send_mutex->ReleaseMutex();
573}
574
575//TODO: check if it is RT compatible
576//must be called with mutex locked
577void ui_com::UpdateSendData(const SendData *obj) {
578  // le mutex est deja pris par l'appellant
579  size_t id, i;
580
581  // on recupere l'id
582  for (i = 0; i < datasToSend.size(); i++) {
583    if (datasToSend.at(i) == obj) {
584      id = i;
585      break;
586    }
587  }
588
589  // on resume en meme temps tout ceux qui ont la meme periode
590  for (i = 0; i < datasToSend.size(); i++) {
591    if (i == id)
592      continue;
593    if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
594      resumeTimes.at(id) = resumeTimes.at(i);
595      break;
596    }
597  }
598
599  // si aucun match, on planifie l'execution
600  if (i == datasToSend.size())
601    resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
602
603  if (IsSuspended() == true) Resume();
604
605//update all buffers if necessary, discard datas
606    std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin();
607    while(pushedDatasIterator != pushedDatas.end()) {
608        size_t bufSize=0;
609        for (size_t i = 0; i < datasToSend.size(); i++) {
610            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) {
611                bufSize+=datasToSend.at(i)->SendSize();
612            }
613        }
614        bufSize*=(*pushedDatasIterator).nb_buffering;
615        if(bufSize!=0) bufSize+=sizeof(char)+sizeof(uint16_t)+sizeof(uint16_t);//header+period+nb_buffering
616        if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) {
617            Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize);
618            (*pushedDatasIterator).actual_size=0;
619            (*pushedDatasIterator).final_size=bufSize;
620            free((*pushedDatasIterator).buf);
621            (*pushedDatasIterator).buf=(char*)malloc(bufSize);
622        }
623        if(bufSize==0) {
624            Printf("delete buf\n");
625            free((*pushedDatasIterator).buf);
626            pushedDatasIterator = pushedDatas.erase(pushedDatasIterator);
627        } else {
628            ++pushedDatasIterator;
629        }
630    }
631 
632  //check if we need a new buffer for this couple period/nb_buffering
633  if(obj->IsEnabled()) {
634      bool match=false;
635      for (size_t i = 0; i < pushedDatas.size(); i++) {
636          if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
637              Printf("match item %i, period %i nb buff %i\n",i,obj->SendPeriod(),obj->NbBuffering());
638              match=true;
639            }
640      }
641      //create new one
642      if(!match) {
643        printf("no match\n");
644        PushedData_t tmp;
645        tmp.period=obj->SendPeriod();
646        tmp.nb_buffering=obj->NbBuffering();
647        tmp.actual_size=0;
648        tmp.final_size=0;
649        for (size_t i = 0; i < datasToSend.size(); i++) {
650            Printf("data %i/%i %i %i\n",i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering());
651            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {
652              tmp.final_size+=datasToSend.at(i)->SendSize();
653              Printf("add %i\n",datasToSend.at(i)->SendSize());
654            }
655        }
656        tmp.final_size*=obj->NbBuffering();
657        tmp.final_size+=sizeof(char)+sizeof(uint16_t)+sizeof(uint16_t);//header+period+nb_buffering
658        printf("final size %i\n",tmp.final_size);
659        tmp.buf=(char*)malloc(tmp.final_size);
660        if(tmp.buf!=NULL) {
661            pushedDatas.push_back(tmp);
662        } else {
663            Warn("could not allocate buffer of size %i\n",tmp.final_size);
664        }
665      }
666  }
667  Printf("nb buf %i\n",pushedDatas.size());
668  return;
669}
670
671//allocate a buffer at startup for the worst case
672//(every data send with same pedriod)
673//TODO: dynamic size depending on what gcs asks?
674void ui_com::UpdateDataToSendSize(void) {
675  send_mutex->GetMutex();
676  send_size = 3; // id(1)+period(2)
677  for (size_t i = 0; i < datasToSend.size(); i++) {
678    if (datasToSend[i] != NULL)
679      send_size += datasToSend[i]->SendSize();
680  }
681
682  // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
683  if (send_buffer != NULL)
684    free(send_buffer);
685  send_buffer = (char *)malloc(send_size * sizeof(char));
686  send_mutex->ReleaseMutex();
687}
688
689void ui_com::UpdateDataToSendBufferSize(const SendData *obj) {
690    send_mutex->GetMutex();
691   
692    PushedData_t* ptr=NULL;
693    //get buffer for this couple period/nb_buffering
694    for (size_t i = 0; i < pushedDatas.size(); i++) {
695      if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
696          ptr=&pushedDatas.at(i);
697        }
698    }
699    if(ptr==NULL) {
700        Err("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering());
701        return;
702    }
703   
704    ptr->final_size=sizeof(char)+sizeof(uint16_t)+sizeof(uint16_t);//header+period+nb_buffering
705    ptr->final_size+=obj->SendSize()*obj->NbBuffering();
706    Printf("final %i\n",ptr->final_size);
707    free(ptr->buf);
708    ptr->buf=(char*)malloc(ptr->final_size);
709   
710    send_mutex->ReleaseMutex();
711}
712
713void ui_com::RemoveSendData(const SendData *obj) {
714  // printf("remove_data_to_send %i\n",data_to_send.size());
715
716  send_mutex->GetMutex();
717  // on recupere l'id
718  for (size_t i = 0; i < datasToSend.size(); i++) {
719    if (datasToSend.at(i) == obj) {
720      datasToSend.erase(datasToSend.begin() + i);
721      resumeTimes.erase(resumeTimes.begin() + i);
722      // printf("remove_data_to_send %i ok\n",data_to_send.size());
723      break;
724    }
725  }
726  send_mutex->ReleaseMutex();
727
728  return;
729}
730
731void ui_com::Block(void) { send_mutex->GetMutex(); }
732
733void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracBrowser for help on using the repository browser.