source: flair-src/trunk/lib/FlairCore/src/ui_com.cpp @ 442

Last change on this file since 442 was 442, checked in by Sanahuja Guillaume, 3 months ago

update buffering

File size: 21.3 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5//  created:    2011/05/01
6//  filename:   ui_com.cpp
7//
8//  author:     Guillaume Sanahuja
9//              Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11//  version:    $Id: $
12//
13//  purpose:    classe permettant la lecture et l'ecriture sur socket UDT
14//
15//
16/*********************************************************************/
17
18#include "ui_com.h"
19#include <cstdlib>
20#include <string.h>
21#include <unistd.h>
22#include "Mutex.h"
23#include "SendData.h"
24#include "communication.h"
25#include "FrameworkManager.h"
26#include "zlib.h"
27#include <assert.h>
28#include "config.h"
29
30#ifdef __XENO__
31#include <pthread.h>
32#include <native/task.h>
33#include <native/task.h>
34#include <fcntl.h>
35#endif
36
37using std::string;
38using namespace flair::core;
39using namespace flair::gui;
40
41ui_com::ui_com(const Object *parent, UDTSOCKET sock)
42    : Thread(parent, "send", 2,16384*2) {
43  // buffer envoi
44  send_buffer = (char *)malloc(3);
45  send_size = 3; // id(1)+period(2)
46
47  // mutex
48  send_mutex = NULL;
49  send_mutex = new Mutex(this, ObjectName());
50
51  socket_fd = sock;
52  connection_lost = false;
53
54#ifdef __XENO__
55 Err("multi buffering is not well implemented in RT\n");
56#endif
57
58#ifdef __XENO__
59  int status;
60  string tmp_name;
61
62  is_running = true;
63
64  // pipe
65  tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
66  // xenomai limitation
67  if (tmp_name.size() > 31)
68    Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
69#ifdef RT_PIPE_SIZE
70  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
71#else
72  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
73#endif
74
75  if (status != 0) {
76                char errorMsg[256];
77    Err("rt_pipe_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
78    // return -1;
79  }
80
81// start user side thread
82#ifdef NRT_STACK_SIZE
83  // Initialize thread creation attributes
84  pthread_attr_t attr;
85  if (pthread_attr_init(&attr) != 0) {
86    Err("pthread_attr_init error\n");
87  }
88
89  if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
90    Err("pthread_attr_setstacksize error\n");
91  }
92
93  if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
94#else  // NRT_STACK_SIZE
95  if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
96#endif // NRT_STACK_SIZE
97  {
98    Err("pthread_create error\n");
99    // return -1;
100  }
101#ifdef NRT_STACK_SIZE
102  if (pthread_attr_destroy(&attr) != 0) {
103    Err("pthread_attr_destroy error\n");
104  }
105#endif
106
107#endif //__XENO__
108  int timeout = 100;
109  if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
110    Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
111    /*
112  timeout=-1;
113  if (UDT::setsockopt(socket_fd, 0, UDT_SNDTIMEO, &timeout, sizeof(int)) != 0)
114    Err("UDT::setsockopt error (UDT_SNDTIMEO)\n");
115*/
116  bool blocking = true;
117  if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
118    Err("UDT::setsockopt error (UDT_SNDSYN)\n");
119
120  if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
121    Err("UDT::setsockopt error (UDT_RCVSYN)\n");
122  //#endif //__XENO__
123
124  Start();
125}
126
127ui_com::~ui_com() {
128//Printf("destruction ui_com\n");
129
130#ifdef __XENO__
131  is_running = false;
132
133  pthread_join(thread, NULL);
134
135  int status = rt_pipe_delete(&pipe);
136  if (status != 0) {
137                char errorMsg[256];
138    Err("rt_pipe_delete error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
139        }
140#endif
141
142  SafeStop();
143
144  if (IsSuspended() == true)
145    Resume();
146
147  Join();
148 
149  char buf=CLOSING_CONNECTION;
150  Send(&buf,1);
151 
152  if (send_buffer != NULL)
153    free(send_buffer);
154  send_buffer = NULL;
155
156  //Printf("destruction ui_com ok\n");
157}
158
159//send, public part; dispatch to SendNRT (nrt) or rt_pipe (rt)
160//rt_pipe is read by user thread which calls SendNRT
161void ui_com::Send(char *buf, ssize_t size,int ttl) {
162  if (connection_lost == true)
163    return;
164
165  char *tosend = buf;
166  if (buf[0] == XML_HEADER) {
167    // cut xml header
168    tosend = strstr(buf, "<root");
169    size -= tosend - buf;
170  }
171
172#ifdef __XENO__
173  //data buf
174  ssize_t nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
175  if (nb_write < 0) {
176                char errorMsg[256];
177    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
178  } else if (nb_write != size) {
179    Err("rt_pipe_write error %i/%i\n", nb_write, size);
180  }
181  //ttl
182  nb_write = rt_pipe_write(&pipe, &ttl, sizeof(ttl), P_NORMAL);
183  if (nb_write < 0) {
184                char errorMsg[256];
185    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
186  } else if (nb_write != sizeof(ttl)) {
187    Err("rt_pipe_write error %i/%i\n", nb_write, size);
188  }
189#else //__XENO__
190  SendNRT(tosend,size,ttl);
191#endif //__XENO__
192}
193
194//send, private part; called to effectively send to udt
195//this is always in nrt
196void ui_com::SendNRT(char *buf, ssize_t size,int ttl) {
197  ssize_t nb_write;
198  bool sendItCompressed = false;
199 
200#ifdef COMPRESS_FRAMES
201  if (buf[0] == XML_HEADER) {
202    sendItCompressed = true;
203  }
204#endif // COMPRESS_FRAMES
205
206  if (sendItCompressed) {
207    char *out;
208    ssize_t out_size;
209    if (compressBuffer(buf, size, &out, &out_size, 9) == Z_OK) {
210      size = out_size;
211      nb_write = UDT::sendmsg(socket_fd, out, size,ttl, true);
212      free(out);
213    } else {
214      Warn("Compress error, sending it uncompressed\n");
215      sendItCompressed = false;
216    }
217  }
218
219  if (!sendItCompressed) {
220    nb_write = UDT::sendmsg(socket_fd, buf, size, ttl, true);
221  }
222
223   //Printf("write %i %i\n",nb_write,size);
224  if (nb_write < 0) {
225    Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
226    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
227        UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
228      connection_lost = true;
229    }
230  } else if (nb_write != size) {
231    Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
232        UDT::getlasterror().getErrorCode(), nb_write, size);
233  }
234}
235
236ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
237  ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
238
239  if (bytesRead < 0) {
240    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
241      Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
242      connection_lost = true;
243    }
244  }
245
246  return bytesRead;
247}
248
249void ui_com::CheckConnection(void) {
250  int32_t val,len;
251 
252  int result=UDT::getsockopt(socket_fd,0,UDT_STATE,&val,&len); 
253   if (result < 0) {
254    Printf("UDT::getsockopt error (%s)\n", UDT::getlasterror().getErrorMessage());
255   }
256 // printf("opt: %i %i %i\n",result,val,len);
257}
258
259//todo: le run (rt) push les données une par une
260//le nrt recupere et decide quand envoyer :
261// -quand toutes les données d'une meme periode sont recues
262// -quand on a atteind le nb buffering
263// -le nrt alloue dynamiquement le buffer d'envoi
264//-> enelver l'alloc du buffer au startup (UpdateDataToSendSize)
265void ui_com::Run(void) {
266  // check endianness
267  char header;
268  if (IsBigEndian()) {
269    header = DATA_BIG_ENDIAN;
270    Printf("System is big endian\n");
271  } else {
272    header = DATA_LITTLE_ENDIAN;
273    Printf("System is little endian\n");
274  }
275
276#ifdef __XENO__
277  WarnUponSwitches(true);
278  printf("\n"); // a revoir pourquoi??
279// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
280#endif
281  // on attend d'avoir des choses à faire
282  Suspend();
283
284  while (!ToBeStopped()) {
285    size_t resume_id;
286    Time min = 0xffffffffffffffffULL;
287
288    // on recpuere l'id de la prochaine execution
289    send_mutex->GetMutex();
290    resume_id = resumeTimes.size();
291    for (size_t i = 0; i < resumeTimes.size(); i++) {
292      if (resumeTimes.at(i) < min && datasToSend.at(i)->IsEnabled() == true) {
293        min = resumeTimes.at(i);
294        resume_id = i;
295      }
296    }
297
298    // attente
299    if (resume_id < resumeTimes.size()) {
300      Time time = resumeTimes.at(resume_id);
301      uint16_t resume_period = datasToSend.at(resume_id)->SendPeriod();
302      send_mutex->ReleaseMutex();
303      // on dort jusqu'a la prochaine execution
304      SleepUntil(time);
305
306      // envoi des donnees
307      int offset = 3;
308      send_buffer[0] = header;
309      send_mutex->GetMutex();
310
311      for (size_t i = 0; i < datasToSend.size(); i++) {
312        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
313          datasToSend.at(i)->CopyDatas(send_buffer + offset);
314          offset += datasToSend.at(i)->SendSize();
315        }
316      }
317      if (offset != 3) {
318        memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t));
319        //printf("send %i %i %i %x %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
320        // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
321        // printf("\n");
322        Send(send_buffer, offset,resume_period);
323      }
324
325      //test to push datas
326      for (size_t i = 0; i < datasToSend.size(); i++) {
327        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
328          PushDatasToSend(datasToSend.at(i));
329        }
330      }
331      //end test
332     
333      // on planifie la prochaine execution
334      for (size_t i = 0; i < datasToSend.size(); i++) {
335        if (datasToSend.at(i)->SendPeriod() == resume_period) {
336          resumeTimes.at(i) += datasToSend.at(i)->SendPeriod() * 1000000;
337        }
338      }
339      send_mutex->ReleaseMutex();
340      //Printf("%i %lld\n",resume_period,GetTime()/1000000);
341    } else {
342      send_mutex->ReleaseMutex();
343      // rien a faire, suspend
344      //Printf("rien a faire suspend\n");
345      Suspend();
346      //Printf("wake\n");
347      // on planifie la prochaine execution
348      Time time = GetTime();
349      send_mutex->GetMutex();
350      for (size_t i = 0; i < datasToSend.size(); i++) {
351        resumeTimes.at(i) =
352            time + (Time)datasToSend.at(i)->SendPeriod() * 1000000;
353      }
354      send_mutex->ReleaseMutex();
355    }
356  }
357}
358
359
360//called with send_mutex locked
361//in nrt
362//TODO: RT part has to put datas in pipe, then user thread call this
363void ui_com::PushDatasToSend(const SendData *dataToSend) {
364  PushedData_t* ptr=NULL;
365  //check if we already have one buffer for this couple period/nb_buffering
366  for (size_t i = 0; i < pushedDatas.size(); i++) {
367      if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) {
368          ptr=&pushedDatas.at(i);
369        }
370  }
371  if(ptr==NULL) {
372      Warn("no match in buffers\n");
373      return;
374  }
375 
376  if(ptr->actual_size==0) {
377      if (IsBigEndian()) {
378        ptr->buf[0] = MULTIPLE_DATA_BIG_ENDIAN;
379      } else {
380        ptr->buf[0]  = MULTIPLE_DATA_LITTLE_ENDIAN;
381      }
382      ptr->actual_size+=sizeof(char);
383      memcpy(ptr->buf+ptr->actual_size,&(ptr->period),sizeof(uint16_t));
384      ptr->actual_size+=sizeof(uint16_t);
385      memcpy(ptr->buf+ptr->actual_size,&(ptr->nb_buffering),sizeof(uint16_t));
386      ptr->actual_size+=sizeof(uint16_t);
387  }
388 
389  dataToSend->CopyDatas(ptr->buf+ptr->actual_size);
390  ptr->actual_size+=dataToSend->SendSize();
391  Printf("nb buffered %i/%i\n",ptr->actual_size,ptr->final_size);
392  Printf("pushed size %i period %i nb buffering %i\n",dataToSend->SendSize(),dataToSend->SendPeriod(),dataToSend->NbBuffering());
393  if(ptr->actual_size>=ptr->final_size) {//par securité on test aussi le cas ou supérieur
394      Printf("ready to send\n");
395      //clean
396      ptr->actual_size=0;
397  }
398}
399
400
401#ifdef __XENO__
402void *ui_com::user_thread(void *arg) {
403  int pipe_fd = -1;
404  string devname;
405  char *buf = NULL;
406  ui_com *caller = (ui_com *)arg;
407  int rv;
408  fd_set set;
409  struct timeval timeout;
410  ssize_t nb_read;
411  int ttl;
412
413  buf = (char *)malloc(NRT_PIPE_SIZE);
414  if (buf == NULL) {
415    caller->Err("malloc error\n");
416  }
417
418  devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
419            caller->ObjectName() + "-pipe";
420  while (pipe_fd < 0) {
421    pipe_fd = open(devname.c_str(), O_RDWR);
422    if (pipe_fd < 0 && errno != ENOENT) {
423                        char errorMsg[256];
424      caller->Err("open pipe_fd error (%s)\n", strerror_r(errno, errorMsg, sizeof(errorMsg)));
425                }
426    usleep(1000);
427  }
428
429  while (1) {
430    FD_ZERO(&set);         // clear the set
431    FD_SET(pipe_fd, &set); // add our file descriptor to the set
432
433    timeout.tv_sec = 0;
434    timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
435
436    rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
437
438    if (rv == -1) {
439      if (caller->is_running == false && UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
440        break; // timeout
441      if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
442        caller->Err("epoll_wait, %s, code %i\n",
443                    UDT::getlasterror().getErrorMessage(), UDT::getlasterror().getErrorCode());
444    } else if (rv == 0) {
445      // printf("timeout\n"); // a timeout occured
446      if (caller->is_running == false)
447        break;
448
449    } else {
450      //read data buf and ttl
451      //RT part has made 2 rt_pipe_write; they should be available in nrt
452      //(select ensure at least one data is ready)
453      nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
454      buf[nb_read] = 0;
455      read(pipe_fd, &ttl, sizeof(ttl));
456// printf("envoi\n%s\n",buf);
457
458      caller->SendNRT(buf, nb_read,ttl);
459    }
460  }
461
462  close(pipe_fd);
463  if (buf != NULL)
464    free(buf);
465  pthread_exit(0);
466}
467#endif
468
469int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
470                           ssize_t *out_size, int level) {
471  int ret, flush;
472  unsigned have;
473  z_stream strm;
474
475  /* allocate deflate state */
476  strm.zalloc = Z_NULL;
477  strm.zfree = Z_NULL;
478  strm.opaque = Z_NULL;
479  ret = deflateInit(&strm, level);
480  if (ret != Z_OK)
481    return ret;
482
483  *out = (char *)malloc(COMPRESS_CHUNK);
484  if (!(*out))
485    return Z_BUF_ERROR;
486
487  strm.next_in = (unsigned char *)in;
488  strm.avail_out = COMPRESS_CHUNK;
489  strm.next_out = (unsigned char *)*out;
490  strm.avail_in = in_size;
491  flush = Z_FINISH;
492
493  ret = deflate(&strm, flush); /* no bad return value */
494  if (ret == Z_STREAM_ERROR) {
495    free(*out);
496    return ret;
497  }
498
499  have = COMPRESS_CHUNK - strm.avail_out;
500  *out_size = have;
501  // printf("%i -> %i\n",in_size,have);
502  /* clean up and return */
503  (void)deflateEnd(&strm);
504
505  if (strm.avail_out != 0) {
506    return Z_OK;
507  } else {
508    return Z_STREAM_ERROR;
509  }
510}
511
512int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
513                             unsigned char **out, ssize_t *out_size) {
514  int ret;
515  // unsigned have;
516  z_stream strm;
517
518  /* allocate inflate state */
519  strm.zalloc = Z_NULL;
520  strm.zfree = Z_NULL;
521  strm.opaque = Z_NULL;
522  strm.avail_in = 0;
523  strm.next_in = Z_NULL;
524  ret = inflateInit(&strm);
525  if (ret != Z_OK)
526    return ret;
527
528  *out = (unsigned char *)malloc(COMPRESS_CHUNK);
529  if (!(*out))
530    return Z_BUF_ERROR;
531
532  strm.avail_in = in_size;
533  strm.next_in = in;
534  strm.avail_out = COMPRESS_CHUNK;
535  strm.next_out = *out;
536
537  ret = inflate(&strm, Z_NO_FLUSH);
538  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
539  switch (ret) {
540  case Z_NEED_DICT:
541    ret = Z_DATA_ERROR; /* and fall through */
542  case Z_DATA_ERROR:
543  case Z_MEM_ERROR:
544    (void)inflateEnd(&strm);
545    return ret;
546  }
547  // have = COMPRESS_CHUNK - strm.avail_out;
548
549  /* clean up and return */
550  (void)inflateEnd(&strm);
551  return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
552}
553
554bool ui_com::ConnectionLost() { return connection_lost; }
555
556void ui_com::AddSendData(const SendData *obj) {
557  send_mutex->GetMutex();
558
559  resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
560
561  // on resume en meme temps tout ceux qui ont la meme periode
562  for (size_t i = 0; i < datasToSend.size(); i++) {
563    if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
564      resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i);
565      break;
566    }
567  }
568
569  datasToSend.push_back(obj);
570
571  send_mutex->ReleaseMutex();
572}
573
574//TODO: check if it is RT compatible
575//must be called with mutex locked
576void ui_com::UpdateSendData(const SendData *obj) {
577  // le mutex est deja pris par l'appellant
578  size_t id, i;
579
580  // on recupere l'id
581  for (i = 0; i < datasToSend.size(); i++) {
582    if (datasToSend.at(i) == obj) {
583      id = i;
584      break;
585    }
586  }
587
588  // on resume en meme temps tout ceux qui ont la meme periode
589  for (i = 0; i < datasToSend.size(); i++) {
590    if (i == id)
591      continue;
592    if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
593      resumeTimes.at(id) = resumeTimes.at(i);
594      break;
595    }
596  }
597
598  // si aucun match, on planifie l'execution
599  if (i == datasToSend.size())
600    resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
601
602  if (IsSuspended() == true) Resume();
603
604//update all buffers if necessary, discard datas
605    std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin();
606    while(pushedDatasIterator != pushedDatas.end()) {
607        size_t bufSize=0;
608        for (size_t i = 0; i < datasToSend.size(); i++) {
609            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) {
610                bufSize+=datasToSend.at(i)->SendSize();
611            }
612        }
613        bufSize*=(*pushedDatasIterator).nb_buffering;
614        if(bufSize!=0) bufSize+=sizeof(char)+sizeof(uint16_t)+sizeof(uint16_t);//header+period+nb_buffering
615        if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) {
616            Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize);
617            (*pushedDatasIterator).actual_size=0;
618            (*pushedDatasIterator).final_size=bufSize;
619            free((*pushedDatasIterator).buf);
620            (*pushedDatasIterator).buf=(char*)malloc(bufSize);
621        }
622        if(bufSize==0) {
623            Printf("delete buf\n");
624            free((*pushedDatasIterator).buf);
625            pushedDatasIterator = pushedDatas.erase(pushedDatasIterator);
626        } else {
627            ++pushedDatasIterator;
628        }
629    }
630 
631  //check if we need a new buffer for this couple period/nb_buffering
632  if(obj->IsEnabled()) {
633      bool match=false;
634      for (size_t i = 0; i < pushedDatas.size(); i++) {
635          if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
636              Printf("match item %i, period %i nb buff %i\n",i,obj->SendPeriod(),obj->NbBuffering());
637              match=true;
638            }
639      }
640      //create new one
641      if(!match) {
642        printf("no match\n");
643        PushedData_t tmp;
644        tmp.period=obj->SendPeriod();
645        tmp.nb_buffering=obj->NbBuffering();
646        tmp.actual_size=0;
647        tmp.final_size=0;
648        for (size_t i = 0; i < datasToSend.size(); i++) {
649            Printf("data %i/%i %i %i\n",i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering());
650            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {
651              tmp.final_size+=datasToSend.at(i)->SendSize();
652              Printf("add %i\n",datasToSend.at(i)->SendSize());
653            }
654        }
655        tmp.final_size*=obj->NbBuffering();
656        tmp.final_size+=sizeof(char)+sizeof(uint16_t)+sizeof(uint16_t);//header+period+nb_buffering
657        printf("final size %i\n",tmp.final_size);
658        tmp.buf=(char*)malloc(tmp.final_size);
659        if(tmp.buf!=NULL) {
660            pushedDatas.push_back(tmp);
661        } else {
662            Warn("could not allocate buffer of size %i\n",tmp.final_size);
663        }
664      }
665  }
666  Printf("nb buf %i\n",pushedDatas.size());
667  return;
668}
669
670//allocate a buffer at startup for the worst case
671//(every data send with same pedriod)
672//TODO: dynamic size depending on what gcs asks?
673void ui_com::UpdateDataToSendSize(void) {
674  send_mutex->GetMutex();
675  send_size = 3; // id(1)+period(2)
676  for (size_t i = 0; i < datasToSend.size(); i++) {
677    if (datasToSend[i] != NULL)
678      send_size += datasToSend[i]->SendSize();
679  }
680
681  // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
682  if (send_buffer != NULL)
683    free(send_buffer);
684  send_buffer = (char *)malloc(send_size * sizeof(char));
685  send_mutex->ReleaseMutex();
686}
687
688void ui_com::UpdateDataToSendBufferSize(const SendData *obj) {
689    send_mutex->GetMutex();
690   
691    PushedData_t* ptr=NULL;
692    //get buffer for this couple period/nb_buffering
693    for (size_t i = 0; i < pushedDatas.size(); i++) {
694      if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
695          ptr=&pushedDatas.at(i);
696        }
697    }
698    if(ptr==NULL) {
699        Err("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering());
700        return;
701    }
702   
703    ptr->final_size=sizeof(char)+sizeof(uint16_t)+sizeof(uint16_t);//header+period+nb_buffering
704    ptr->final_size+=obj->SendSize()*obj->NbBuffering();
705    Printf("final %i\n",ptr->final_size);
706    free(ptr->buf);
707    ptr->buf=(char*)malloc(ptr->final_size);
708   
709    send_mutex->ReleaseMutex();
710}
711
712void ui_com::RemoveSendData(const SendData *obj) {
713  // printf("remove_data_to_send %i\n",data_to_send.size());
714
715  send_mutex->GetMutex();
716  // on recupere l'id
717  for (size_t i = 0; i < datasToSend.size(); i++) {
718    if (datasToSend.at(i) == obj) {
719      datasToSend.erase(datasToSend.begin() + i);
720      resumeTimes.erase(resumeTimes.begin() + i);
721      // printf("remove_data_to_send %i ok\n",data_to_send.size());
722      break;
723    }
724  }
725  send_mutex->ReleaseMutex();
726
727  return;
728}
729
730void ui_com::Block(void) { send_mutex->GetMutex(); }
731
732void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracBrowser for help on using the repository browser.