source: flair-src/trunk/lib/FlairCore/src/ui_com.cpp @ 447

Last change on this file since 447 was 445, checked in by Sanahuja Guillaume, 3 months ago

update buffering

File size: 20.4 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5//  created:    2011/05/01
6//  filename:   ui_com.cpp
7//
8//  author:     Guillaume Sanahuja
9//              Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11//  version:    $Id: $
12//
13//  purpose:    classe permettant la lecture et l'ecriture sur socket UDT
14//
15//
16/*********************************************************************/
17
18#include "ui_com.h"
19#include <cstdlib>
20#include <string.h>
21#include <unistd.h>
22#include "Mutex.h"
23#include "SendData.h"
24#include "communication.h"
25#include "FrameworkManager.h"
26#include "zlib.h"
27#include <assert.h>
28#include "config.h"
29
30#ifdef __XENO__
31#include <pthread.h>
32#include <native/task.h>
33#include <native/task.h>
34#include <fcntl.h>
35#endif
36
37using std::string;
38using namespace flair::core;
39using namespace flair::gui;
40
41ui_com::ui_com(const Object *parent, UDTSOCKET sock) : Thread(parent, "send", 2,16384*2) {
42  // mutex
43  send_mutex = new Mutex(this, ObjectName());
44
45  socket_fd = sock;
46  connection_lost = false;
47
48#ifdef __XENO__
49 Err("multi buffering is not well implemented in RT\n");
50#endif
51
52#ifdef __XENO__
53  int status;
54  string tmp_name;
55
56  is_running = true;
57
58  // pipe
59  tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
60  // xenomai limitation
61  if (tmp_name.size() > 31)
62    Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
63#ifdef RT_PIPE_SIZE
64  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
65#else
66  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
67#endif
68
69  if (status != 0) {
70                char errorMsg[256];
71    Err("rt_pipe_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
72    // return -1;
73  }
74
75// start user side thread
76#ifdef NRT_STACK_SIZE
77  // Initialize thread creation attributes
78  pthread_attr_t attr;
79  if (pthread_attr_init(&attr) != 0) {
80    Err("pthread_attr_init error\n");
81  }
82
83  if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
84    Err("pthread_attr_setstacksize error\n");
85  }
86
87  if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
88#else  // NRT_STACK_SIZE
89  if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
90#endif // NRT_STACK_SIZE
91  {
92    Err("pthread_create error\n");
93    // return -1;
94  }
95#ifdef NRT_STACK_SIZE
96  if (pthread_attr_destroy(&attr) != 0) {
97    Err("pthread_attr_destroy error\n");
98  }
99#endif
100
101#endif //__XENO__
102  int timeout = 100;
103  if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
104    Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
105    /*
106  timeout=-1;
107  if (UDT::setsockopt(socket_fd, 0, UDT_SNDTIMEO, &timeout, sizeof(int)) != 0)
108    Err("UDT::setsockopt error (UDT_SNDTIMEO)\n");
109*/
110  bool blocking = true;
111  if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
112    Err("UDT::setsockopt error (UDT_SNDSYN)\n");
113
114  if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
115    Err("UDT::setsockopt error (UDT_RCVSYN)\n");
116  //#endif //__XENO__
117
118  Start();
119}
120
121ui_com::~ui_com() {
122//Printf("destruction ui_com\n");
123
124#ifdef __XENO__
125  is_running = false;
126
127  pthread_join(thread, NULL);
128
129  int status = rt_pipe_delete(&pipe);
130  if (status != 0) {
131                char errorMsg[256];
132    Err("rt_pipe_delete error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
133        }
134#endif
135
136  SafeStop();
137
138  if (IsSuspended() == true)
139    Resume();
140
141  Join();
142 
143  char buf=CLOSING_CONNECTION;
144  Send(&buf,1);
145 
146    for (size_t i = 0; i < pushedDatas.size(); i++) {
147      free(pushedDatas.at(i).buf);
148    }
149  //Printf("destruction ui_com ok\n");
150}
151
152//send, public part; dispatch to SendNRT (nrt) or rt_pipe (rt)
153//rt_pipe is read by user thread which calls SendNRT
154void ui_com::Send(char *buf, ssize_t size,int ttl) {
155  if (connection_lost == true)
156    return;
157
158  char *tosend = buf;
159  if (buf[0] == XML_HEADER) {
160    // cut xml header
161    tosend = strstr(buf, "<root");
162    size -= tosend - buf;
163  }
164
165#ifdef __XENO__
166  //data buf
167  ssize_t nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
168  if (nb_write < 0) {
169                char errorMsg[256];
170    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
171  } else if (nb_write != size) {
172    Err("rt_pipe_write error %i/%i\n", nb_write, size);
173  }
174  //ttl
175  nb_write = rt_pipe_write(&pipe, &ttl, sizeof(ttl), P_NORMAL);
176  if (nb_write < 0) {
177                char errorMsg[256];
178    Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg)));
179  } else if (nb_write != sizeof(ttl)) {
180    Err("rt_pipe_write error %i/%i\n", nb_write, size);
181  }
182#else //__XENO__
183  SendNRT(tosend,size,ttl);
184#endif //__XENO__
185}
186
187//send, private part; called to effectively send to udt
188//this is always in nrt
189void ui_com::SendNRT(char *buf, ssize_t size,int ttl) {
190  ssize_t nb_write;
191  bool sendItCompressed = false;
192 
193#ifdef COMPRESS_FRAMES
194  if (buf[0] == XML_HEADER) {
195    sendItCompressed = true;
196  }
197#endif // COMPRESS_FRAMES
198
199  if (sendItCompressed) {
200    char *out;
201    ssize_t out_size;
202    if (compressBuffer(buf, size, &out, &out_size, 9) == Z_OK) {
203      size = out_size;
204      nb_write = UDT::sendmsg(socket_fd, out, size,ttl, true);
205      free(out);
206    } else {
207      Warn("Compress error, sending it uncompressed\n");
208      sendItCompressed = false;
209    }
210  }
211
212  if (!sendItCompressed) {
213    nb_write = UDT::sendmsg(socket_fd, buf, size, ttl, true);
214  }
215
216   //Printf("write %i %i\n",nb_write,size);
217  if (nb_write < 0) {
218    Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
219    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
220        UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
221      connection_lost = true;
222    }
223  } else if (nb_write != size) {
224    Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
225        UDT::getlasterror().getErrorCode(), nb_write, size);
226  }
227}
228
229ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
230  ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
231
232  if (bytesRead < 0) {
233    if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
234      Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
235      connection_lost = true;
236    }
237  }
238
239  return bytesRead;
240}
241
242void ui_com::CheckConnection(void) {
243  int32_t val,len;
244 
245  int result=UDT::getsockopt(socket_fd,0,UDT_STATE,&val,&len); 
246   if (result < 0) {
247    Printf("UDT::getsockopt error (%s)\n", UDT::getlasterror().getErrorMessage());
248   }
249 // printf("opt: %i %i %i\n",result,val,len);
250}
251
252//todo: le run (rt) push les données une par une
253//le nrt recupere et decide quand envoyer :
254// -quand toutes les données d'une meme periode sont recues
255// -quand on a atteind le nb buffering
256// -le nrt alloue dynamiquement le buffer d'envoi
257//-> enelver l'alloc du buffer au startup (UpdateDataToSendSize)
258void ui_com::Run(void) {
259#ifdef __XENO__
260  WarnUponSwitches(true);
261  printf("\n"); // a revoir pourquoi??
262// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
263#endif
264  // on attend d'avoir des choses à faire
265  Suspend();
266
267  while (!ToBeStopped()) {
268    size_t resume_id;
269    Time min = 0xffffffffffffffffULL;
270
271    // on recpuere l'id de la prochaine execution
272    send_mutex->GetMutex();
273    resume_id = resumeTimes.size();
274    for (size_t i = 0; i < resumeTimes.size(); i++) {
275      if (resumeTimes.at(i) < min && datasToSend.at(i)->IsEnabled() == true) {
276        min = resumeTimes.at(i);
277        resume_id = i;
278      }
279    }
280
281    // attente
282    if (resume_id < resumeTimes.size()) {
283      Time time = resumeTimes.at(resume_id);
284      uint16_t resume_period = datasToSend.at(resume_id)->SendPeriod();
285      send_mutex->ReleaseMutex();
286      // on dort jusqu'a la prochaine execution
287      SleepUntil(time);
288
289      // envoi des donnees
290      send_mutex->GetMutex();
291
292      //mulit buffering
293      for (size_t i = 0; i < datasToSend.size(); i++) {
294        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
295          PushDatasToSend(datasToSend.at(i));
296        }
297      }
298     
299      // on planifie la prochaine execution
300      for (size_t i = 0; i < datasToSend.size(); i++) {
301        if (datasToSend.at(i)->SendPeriod() == resume_period) {
302          resumeTimes.at(i) += datasToSend.at(i)->SendPeriod() * 1000000;
303        }
304      }
305      send_mutex->ReleaseMutex();
306      //Printf("%i %lld\n",resume_period,GetTime()/1000000);
307    } else {
308      send_mutex->ReleaseMutex();
309      // rien a faire, suspend
310      //Printf("rien a faire suspend\n");
311      Suspend();
312      //Printf("wake\n");
313      // on planifie la prochaine execution
314      Time time = GetTime();
315      send_mutex->GetMutex();
316      for (size_t i = 0; i < datasToSend.size(); i++) {
317        resumeTimes.at(i) =
318            time + (Time)datasToSend.at(i)->SendPeriod() * 1000000;
319      }
320      send_mutex->ReleaseMutex();
321    }
322  }
323}
324
325
326//called with send_mutex locked
327//in nrt
328//TODO: RT part has to put datas in pipe, then user thread call this
329void ui_com::PushDatasToSend(const SendData *dataToSend) {
330  PushedData_t* ptr=NULL;
331  //check if we already have one buffer for this couple period/nb_buffering
332  for (size_t i = 0; i < pushedDatas.size(); i++) {
333      if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) {
334          ptr=&pushedDatas.at(i);
335        }
336  }
337  if(ptr==NULL) {
338      Warn("no match in buffers\n");
339      return;
340  }
341 
342  if(ptr->actual_size==0) {
343      if(ptr->nb_buffering>1) {
344          if (IsBigEndian()) {
345            ptr->buf[0] = MULTIPLE_DATA_BIG_ENDIAN;
346          } else {
347            ptr->buf[0]  = MULTIPLE_DATA_LITTLE_ENDIAN;
348          }
349          ptr->actual_size+=sizeof(char);
350          memcpy(ptr->buf+ptr->actual_size,&(ptr->period),sizeof(uint16_t));
351          ptr->actual_size+=sizeof(uint16_t);
352          memcpy(ptr->buf+ptr->actual_size,&(ptr->nb_buffering),sizeof(uint16_t));
353          ptr->actual_size+=sizeof(uint16_t);
354      } else {
355          if (IsBigEndian()) {
356            ptr->buf[0] = DATA_BIG_ENDIAN;
357          } else {
358            ptr->buf[0]  = DATA_LITTLE_ENDIAN;
359          }
360          ptr->actual_size+=sizeof(char);
361          memcpy(ptr->buf+ptr->actual_size,&(ptr->period),sizeof(uint16_t));
362          ptr->actual_size+=sizeof(uint16_t);
363      }
364  }
365 
366  dataToSend->CopyDatas(ptr->buf+ptr->actual_size);
367  ptr->actual_size+=dataToSend->SendSize();
368  //Printf("nb buffered %i/%i\n",ptr->actual_size,ptr->final_size);
369  //Printf("pushed size %i period %i nb buffering %i\n",dataToSend->SendSize(),dataToSend->SendPeriod(),dataToSend->NbBuffering());
370  if(ptr->actual_size>=ptr->final_size) {//par securité on test aussi le cas ou supérieur
371      //Printf("ready to send\n");
372      SendNRT(ptr->buf, ptr->actual_size,ptr->period);
373      //clean
374      ptr->actual_size=0;
375  }
376}
377
378
379#ifdef __XENO__
380void *ui_com::user_thread(void *arg) {
381  int pipe_fd = -1;
382  string devname;
383  char *buf = NULL;
384  ui_com *caller = (ui_com *)arg;
385  int rv;
386  fd_set set;
387  struct timeval timeout;
388  ssize_t nb_read;
389  int ttl;
390
391  buf = (char *)malloc(NRT_PIPE_SIZE);
392  if (buf == NULL) {
393    caller->Err("malloc error\n");
394  }
395
396  devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
397            caller->ObjectName() + "-pipe";
398  while (pipe_fd < 0) {
399    pipe_fd = open(devname.c_str(), O_RDWR);
400    if (pipe_fd < 0 && errno != ENOENT) {
401                        char errorMsg[256];
402      caller->Err("open pipe_fd error (%s)\n", strerror_r(errno, errorMsg, sizeof(errorMsg)));
403                }
404    usleep(1000);
405  }
406
407  while (1) {
408    FD_ZERO(&set);         // clear the set
409    FD_SET(pipe_fd, &set); // add our file descriptor to the set
410
411    timeout.tv_sec = 0;
412    timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
413
414    rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
415
416    if (rv == -1) {
417      if (caller->is_running == false && UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
418        break; // timeout
419      if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
420        caller->Err("epoll_wait, %s, code %i\n",
421                    UDT::getlasterror().getErrorMessage(), UDT::getlasterror().getErrorCode());
422    } else if (rv == 0) {
423      // printf("timeout\n"); // a timeout occured
424      if (caller->is_running == false)
425        break;
426
427    } else {
428      //read data buf and ttl
429      //RT part has made 2 rt_pipe_write; they should be available in nrt
430      //(select ensure at least one data is ready)
431      nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
432      buf[nb_read] = 0;
433      read(pipe_fd, &ttl, sizeof(ttl));
434// printf("envoi\n%s\n",buf);
435
436      caller->SendNRT(buf, nb_read,ttl);
437    }
438  }
439
440  close(pipe_fd);
441  if (buf != NULL)
442    free(buf);
443  pthread_exit(0);
444}
445#endif
446
447int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
448                           ssize_t *out_size, int level) {
449  int ret, flush;
450  unsigned have;
451  z_stream strm;
452
453  /* allocate deflate state */
454  strm.zalloc = Z_NULL;
455  strm.zfree = Z_NULL;
456  strm.opaque = Z_NULL;
457  ret = deflateInit(&strm, level);
458  if (ret != Z_OK)
459    return ret;
460
461  *out = (char *)malloc(COMPRESS_CHUNK);
462  if (!(*out))
463    return Z_BUF_ERROR;
464
465  strm.next_in = (unsigned char *)in;
466  strm.avail_out = COMPRESS_CHUNK;
467  strm.next_out = (unsigned char *)*out;
468  strm.avail_in = in_size;
469  flush = Z_FINISH;
470
471  ret = deflate(&strm, flush); /* no bad return value */
472  if (ret == Z_STREAM_ERROR) {
473    free(*out);
474    return ret;
475  }
476
477  have = COMPRESS_CHUNK - strm.avail_out;
478  *out_size = have;
479  // printf("%i -> %i\n",in_size,have);
480  /* clean up and return */
481  (void)deflateEnd(&strm);
482
483  if (strm.avail_out != 0) {
484    return Z_OK;
485  } else {
486    return Z_STREAM_ERROR;
487  }
488}
489
490int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
491                             unsigned char **out, ssize_t *out_size) {
492  int ret;
493  // unsigned have;
494  z_stream strm;
495
496  /* allocate inflate state */
497  strm.zalloc = Z_NULL;
498  strm.zfree = Z_NULL;
499  strm.opaque = Z_NULL;
500  strm.avail_in = 0;
501  strm.next_in = Z_NULL;
502  ret = inflateInit(&strm);
503  if (ret != Z_OK)
504    return ret;
505
506  *out = (unsigned char *)malloc(COMPRESS_CHUNK);
507  if (!(*out))
508    return Z_BUF_ERROR;
509
510  strm.avail_in = in_size;
511  strm.next_in = in;
512  strm.avail_out = COMPRESS_CHUNK;
513  strm.next_out = *out;
514
515  ret = inflate(&strm, Z_NO_FLUSH);
516  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
517  switch (ret) {
518  case Z_NEED_DICT:
519    ret = Z_DATA_ERROR; /* and fall through */
520  case Z_DATA_ERROR:
521  case Z_MEM_ERROR:
522    (void)inflateEnd(&strm);
523    return ret;
524  }
525  // have = COMPRESS_CHUNK - strm.avail_out;
526
527  /* clean up and return */
528  (void)inflateEnd(&strm);
529  return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
530}
531
532bool ui_com::ConnectionLost() { return connection_lost; }
533
534void ui_com::AddSendData(const SendData *obj) {
535  send_mutex->GetMutex();
536
537  resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
538
539  // on resume en meme temps tout ceux qui ont la meme periode
540  for (size_t i = 0; i < datasToSend.size(); i++) {
541    if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
542      resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i);
543      break;
544    }
545  }
546
547  datasToSend.push_back(obj);
548
549  send_mutex->ReleaseMutex();
550}
551
552//TODO: check if it is RT compatible
553//must be called with mutex locked
554void ui_com::UpdateSendData(const SendData *obj) {
555  // le mutex est deja pris par l'appellant
556  size_t id, i;
557
558  // on recupere l'id
559  for (i = 0; i < datasToSend.size(); i++) {
560    if (datasToSend.at(i) == obj) {
561      id = i;
562      break;
563    }
564  }
565
566  // on resume en meme temps tout ceux qui ont la meme periode
567  for (i = 0; i < datasToSend.size(); i++) {
568    if (i == id)
569      continue;
570    if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
571      resumeTimes.at(id) = resumeTimes.at(i);
572      break;
573    }
574  }
575
576  // si aucun match, on planifie l'execution
577  if (i == datasToSend.size())
578    resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
579
580  if (IsSuspended() == true) Resume();
581
582//update all buffers if necessary, discard datas
583    std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin();
584    while(pushedDatasIterator != pushedDatas.end()) {
585        size_t bufSize=0;
586        for (size_t i = 0; i < datasToSend.size(); i++) {
587            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) {
588                bufSize+=datasToSend.at(i)->SendSize();
589            }
590        }
591        bufSize*=(*pushedDatasIterator).nb_buffering;
592        if(bufSize!=0) {
593            bufSize+=sizeof(char)+sizeof(uint16_t);//header+period
594            if((*pushedDatasIterator).nb_buffering>1) bufSize+=sizeof(uint16_t);//+nb_buffering
595        }   
596        if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) {
597            //Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize);
598            (*pushedDatasIterator).actual_size=0;
599            (*pushedDatasIterator).final_size=bufSize;
600            free((*pushedDatasIterator).buf);
601            (*pushedDatasIterator).buf=(char*)malloc(bufSize);
602        }
603        if(bufSize==0) {
604            //Printf("delete buf\n");
605            free((*pushedDatasIterator).buf);
606            pushedDatasIterator = pushedDatas.erase(pushedDatasIterator);
607        } else {
608            ++pushedDatasIterator;
609        }
610    }
611 
612  //check if we need a new buffer for this couple period/nb_buffering
613  if(obj->IsEnabled()) {
614      bool match=false;
615      for (size_t i = 0; i < pushedDatas.size(); i++) {
616          if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
617              //Printf("match item %i, period %i nb buff %i\n",i,obj->SendPeriod(),obj->NbBuffering());
618              match=true;
619            }
620      }
621      //create new one
622      if(!match) {
623        //printf("no match\n");
624        PushedData_t tmp;
625        tmp.period=obj->SendPeriod();
626        tmp.nb_buffering=obj->NbBuffering();
627        tmp.actual_size=0;
628        tmp.final_size=0;
629        for (size_t i = 0; i < datasToSend.size(); i++) {
630            //Printf("data %i/%i %i %i\n",i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering());
631            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {
632              tmp.final_size+=datasToSend.at(i)->SendSize();
633              //Printf("add %i\n",datasToSend.at(i)->SendSize());
634            }
635        }
636        tmp.final_size*=obj->NbBuffering();
637        tmp.final_size+=sizeof(char)+sizeof(uint16_t);//header+period
638        if(tmp.nb_buffering>1) tmp.final_size+=sizeof(uint16_t);//+nb_buffering
639        //printf("final size %i\n",tmp.final_size);
640        tmp.buf=(char*)malloc(tmp.final_size);
641        if(tmp.buf!=NULL) {
642            pushedDatas.push_back(tmp);
643        } else {
644            Warn("could not allocate buffer of size %i\n",tmp.final_size);
645        }
646      }
647  }
648  //Printf("nb buf %i\n",pushedDatas.size());
649  return;
650}
651
652void ui_com::UpdateDataToSendBufferSize(const SendData *obj) {
653    if(!obj->IsEnabled()) return;
654   
655    send_mutex->GetMutex();
656   
657    PushedData_t* ptr=NULL;
658    //get buffer for this couple period/nb_buffering
659    for (size_t i = 0; i < pushedDatas.size(); i++) {
660      if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
661          ptr=&pushedDatas.at(i);
662        }
663    }
664    if(ptr==NULL) {
665        Err("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering());
666        return;
667    }
668   
669    ptr->final_size=sizeof(char)+sizeof(uint16_t);//header+period
670    if(ptr->nb_buffering>1) ptr->final_size+=sizeof(uint16_t);//+nb_buffering
671    ptr->final_size+=obj->SendSize()*obj->NbBuffering();
672    //Printf("final %i\n",ptr->final_size);
673    free(ptr->buf);
674    ptr->buf=(char*)malloc(ptr->final_size);
675   
676    send_mutex->ReleaseMutex();
677}
678
679void ui_com::RemoveSendData(const SendData *obj) {
680  // printf("remove_data_to_send %i\n",data_to_send.size());
681
682  send_mutex->GetMutex();
683  // on recupere l'id
684  for (size_t i = 0; i < datasToSend.size(); i++) {
685    if (datasToSend.at(i) == obj) {
686      datasToSend.erase(datasToSend.begin() + i);
687      resumeTimes.erase(resumeTimes.begin() + i);
688      // printf("remove_data_to_send %i ok\n",data_to_send.size());
689      break;
690    }
691  }
692  send_mutex->ReleaseMutex();
693
694  return;
695}
696
697void ui_com::Block(void) { send_mutex->GetMutex(); }
698
699void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracBrowser for help on using the repository browser.