Changeset 441 in flair-src for trunk/lib/FlairCore/src


Ignore:
Timestamp:
Aug 31, 2021, 4:15:03 PM (3 years ago)
Author:
Sanahuja Guillaume
Message:

update buffering

Location:
trunk/lib/FlairCore/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/ui_com.cpp

    r440 r441  
    284284    // on recpuere l'id de la prochaine execution
    285285    send_mutex->GetMutex();
    286     resume_id = resume_time.size();
    287     for (size_t i = 0; i < resume_time.size(); i++) {
    288       if (resume_time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) {
    289         min = resume_time.at(i);
     286    resume_id = resumeTimes.size();
     287    for (size_t i = 0; i < resumeTimes.size(); i++) {
     288      if (resumeTimes.at(i) < min && datasToSend.at(i)->IsEnabled() == true) {
     289        min = resumeTimes.at(i);
    290290        resume_id = i;
    291291      }
     
    293293
    294294    // attente
    295     if (resume_id < resume_time.size()) {
    296       Time time = resume_time.at(resume_id);
    297       uint16_t resume_period = data_to_send.at(resume_id)->SendPeriod();
     295    if (resume_id < resumeTimes.size()) {
     296      Time time = resumeTimes.at(resume_id);
     297      uint16_t resume_period = datasToSend.at(resume_id)->SendPeriod();
    298298      send_mutex->ReleaseMutex();
    299299      // on dort jusqu'a la prochaine execution
     
    305305      send_mutex->GetMutex();
    306306
    307       for (size_t i = 0; i < data_to_send.size(); i++) {
    308         if (data_to_send.at(i)->SendPeriod() == resume_period && data_to_send.at(i)->IsEnabled() == true) {
    309           data_to_send.at(i)->CopyDatas(send_buffer + offset);
    310           offset += data_to_send.at(i)->SendSize();
     307      for (size_t i = 0; i < datasToSend.size(); i++) {
     308        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
     309          datasToSend.at(i)->CopyDatas(send_buffer + offset);
     310          offset += datasToSend.at(i)->SendSize();
    311311        }
    312312      }
     
    320320
    321321      //test to push datas
    322       for (size_t i = 0; i < data_to_send.size(); i++) {
    323         if (data_to_send.at(i)->SendPeriod() == resume_period && data_to_send.at(i)->IsEnabled() == true) {
    324           PushDatasToSend(data_to_send.at(i));
     322      for (size_t i = 0; i < datasToSend.size(); i++) {
     323        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
     324          PushDatasToSend(datasToSend.at(i));
    325325        }
    326326      }
     
    328328     
    329329      // on planifie la prochaine execution
    330       for (size_t i = 0; i < data_to_send.size(); i++) {
    331         if (data_to_send.at(i)->SendPeriod() == resume_period) {
    332           resume_time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000;
     330      for (size_t i = 0; i < datasToSend.size(); i++) {
     331        if (datasToSend.at(i)->SendPeriod() == resume_period) {
     332          resumeTimes.at(i) += datasToSend.at(i)->SendPeriod() * 1000000;
    333333        }
    334334      }
     
    344344      Time time = GetTime();
    345345      send_mutex->GetMutex();
    346       for (size_t i = 0; i < data_to_send.size(); i++) {
    347         resume_time.at(i) =
    348             time + (Time)data_to_send.at(i)->SendPeriod() * 1000000;
     346      for (size_t i = 0; i < datasToSend.size(); i++) {
     347        resumeTimes.at(i) =
     348            time + (Time)datasToSend.at(i)->SendPeriod() * 1000000;
    349349      }
    350350      send_mutex->ReleaseMutex();
     
    356356//called with send_mutex locked
    357357//in nrt
    358 //TODO: RT part has to put datas in pipe, then useer thread call this
    359 void ui_com::PushDatasToSend(const SendData *data_to_send) {
     358//TODO: RT part has to put datas in pipe, then user thread call this
     359void ui_com::PushDatasToSend(const SendData *dataToSend) {
    360360  PushedData_t* ptr=NULL;
    361361  //check if we already have one buffer for this couple period/nb_buffering
    362   for (size_t i = 0; i < PushedDatas.size(); i++) {
    363       if (PushedDatas.at(i).period==data_to_send->SendPeriod() && PushedDatas.at(i).nb_buffering==data_to_send->NbBuffering()) {
    364           Printf("match item %i, period %i nb buff %i\n",i,data_to_send->SendPeriod(),data_to_send->NbBuffering());
    365           ptr=&PushedDatas.at(i);
    366         }
    367   }
    368   //otherwire create new one
     362  for (size_t i = 0; i < pushedDatas.size(); i++) {
     363      if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) {
     364          ptr=&pushedDatas.at(i);
     365        }
     366  }
    369367  if(ptr==NULL) {
    370     printf("no match\n");
    371     PushedData_t tmp;
    372     tmp.period=data_to_send->SendPeriod();
    373     tmp.nb_buffering=data_to_send->NbBuffering();
    374     tmp.size=0;
    375     tmp.buf=NULL;
    376     PushedDatas.push_back(tmp);
    377     ptr=&PushedDatas.back();
    378   }
    379    
    380   ptr->buf=(char*)realloc(ptr->buf,ptr->size+data_to_send->SendSize());
     368      Warn("no match in buffers\n");
     369      return;
     370  }
    381371 
    382   data_to_send->CopyDatas(ptr->buf+ptr->size);
    383   ptr->size+=data_to_send->SendSize();
    384   Printf("nb buffered %i\n",ptr->size);
    385 //TODO: savoir quand a fait tous les nb buffers
    386   Printf("pushed size %i period %i nb buffering %i\n",data_to_send->SendSize(),data_to_send->SendPeriod(),data_to_send->NbBuffering());
     372  dataToSend->CopyDatas(ptr->buf+ptr->actual_size);
     373  ptr->actual_size+=dataToSend->SendSize();
     374  Printf("nb buffered %i\n",ptr->actual_size);
     375  Printf("pushed size %i period %i nb buffering %i\n",dataToSend->SendSize(),dataToSend->SendPeriod(),dataToSend->NbBuffering());
     376  if(ptr->actual_size==ptr->final_size) {
     377      Printf("ready to send\n");
     378      //clean
     379      ptr->actual_size=0;
     380  }
    387381}
    388382
     
    546540  send_mutex->GetMutex();
    547541
    548   resume_time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
     542  resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
    549543
    550544  // on resume en meme temps tout ceux qui ont la meme periode
    551   for (size_t i = 0; i < data_to_send.size(); i++) {
    552     if (data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
    553       resume_time.at(resume_time.size() - 1) = resume_time.at(i);
     545  for (size_t i = 0; i < datasToSend.size(); i++) {
     546    if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
     547      resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i);
    554548      break;
    555549    }
    556550  }
    557551
    558   data_to_send.push_back(obj);
     552  datasToSend.push_back(obj);
    559553
    560554  send_mutex->ReleaseMutex();
    561555}
    562556
     557//TODO: check if it is RT compatible
    563558void ui_com::UpdateSendData(const SendData *obj) {
    564559  // le mutex est deja pris par l'appellant
     
    566561
    567562  // on recupere l'id
    568   for (i = 0; i < data_to_send.size(); i++) {
    569     if (data_to_send.at(i) == obj) {
     563  for (i = 0; i < datasToSend.size(); i++) {
     564    if (datasToSend.at(i) == obj) {
    570565      id = i;
    571566      break;
     
    574569
    575570  // on resume en meme temps tout ceux qui ont la meme periode
    576   for (i = 0; i < data_to_send.size(); i++) {
     571  for (i = 0; i < datasToSend.size(); i++) {
    577572    if (i == id)
    578573      continue;
    579     if (data_to_send.at(i)->IsEnabled() == true &&
    580         data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
    581       resume_time.at(id) = resume_time.at(i);
     574    if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
     575      resumeTimes.at(id) = resumeTimes.at(i);
    582576      break;
    583577    }
     
    585579
    586580  // si aucun match, on planifie l'execution
    587   if (i == data_to_send.size())
    588     resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
    589 
    590   if (IsSuspended() == true)
    591     Resume();
    592 
     581  if (i == datasToSend.size())
     582    resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
     583
     584  if (IsSuspended() == true) Resume();
     585
     586//update all buffers if necessary, discard datas
     587    std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin();
     588    while(pushedDatasIterator != pushedDatas.end()) {
     589        size_t bufSize=0;
     590        for (size_t i = 0; i < datasToSend.size(); i++) {
     591            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) {
     592                bufSize+=datasToSend.at(i)->SendSize();
     593            }
     594        }
     595        bufSize*=(*pushedDatasIterator).nb_buffering;
     596        if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) {
     597            Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize);
     598            (*pushedDatasIterator).actual_size=0;
     599            (*pushedDatasIterator).final_size=bufSize;
     600            free((*pushedDatasIterator).buf);
     601            (*pushedDatasIterator).buf=(char*)malloc(bufSize);
     602        }
     603        if(bufSize==0) {
     604            Printf("delete buf\n");
     605            free((*pushedDatasIterator).buf);
     606            pushedDatasIterator = pushedDatas.erase(pushedDatasIterator);
     607        } else {
     608            ++pushedDatasIterator;
     609        }
     610    }
     611 
     612  //check if we need a new buffer for this couple period/nb_buffering
     613  if(obj->IsEnabled()) {
     614      bool match=false;
     615      for (size_t i = 0; i < pushedDatas.size(); i++) {
     616          if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
     617              Printf("match item %i, period %i nb buff %i\n",i,obj->SendPeriod(),obj->NbBuffering());
     618              match=true;
     619            }
     620      }
     621      //create new one
     622      if(!match) {
     623        printf("no match\n");
     624        PushedData_t tmp;
     625        tmp.period=obj->SendPeriod();
     626        tmp.nb_buffering=obj->NbBuffering();
     627        tmp.actual_size=0;
     628        tmp.final_size=0;
     629        for (size_t i = 0; i < datasToSend.size(); i++) {
     630            if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {
     631              tmp.final_size+=datasToSend.at(i)->SendSize();
     632            }
     633        }
     634        tmp.final_size*=obj->NbBuffering();
     635        printf("final size %i\n",tmp.final_size);
     636        tmp.buf=(char*)malloc(tmp.final_size);
     637        if(tmp.buf!=NULL) {
     638            pushedDatas.push_back(tmp);
     639        } else {
     640            Warn("could not allocate buffer of size %i\n",tmp.final_size);
     641        }
     642      }
     643  }
     644  Printf("nb buf %i\n",pushedDatas.size());
    593645  return;
    594646}
     
    600652  send_mutex->GetMutex();
    601653  send_size = 3; // id(1)+period(2)
    602   for (size_t i = 0; i < data_to_send.size(); i++) {
    603     if (data_to_send[i] != NULL)
    604       send_size += data_to_send[i]->SendSize();
     654  for (size_t i = 0; i < datasToSend.size(); i++) {
     655    if (datasToSend[i] != NULL)
     656      send_size += datasToSend[i]->SendSize();
    605657  }
    606658
     
    617669  send_mutex->GetMutex();
    618670  // on recupere l'id
    619   for (size_t i = 0; i < data_to_send.size(); i++) {
    620     if (data_to_send.at(i) == obj) {
    621       data_to_send.erase(data_to_send.begin() + i);
    622       resume_time.erase(resume_time.begin() + i);
     671  for (size_t i = 0; i < datasToSend.size(); i++) {
     672    if (datasToSend.at(i) == obj) {
     673      datasToSend.erase(datasToSend.begin() + i);
     674      resumeTimes.erase(resumeTimes.begin() + i);
    623675      // printf("remove_data_to_send %i ok\n",data_to_send.size());
    624676      break;
  • trunk/lib/FlairCore/src/unexported/ui_com.h

    r440 r441  
    4343  ssize_t Receive(char *buf, ssize_t buf_size);
    4444  void AddSendData(const flair::gui::SendData *obj);
    45   void UpdateSendData(const flair::gui::SendData *obj);
     45  void UpdateSendData(const flair::gui::SendData *obj);//must be called with mutex locked
    4646  void RemoveSendData(const flair::gui::SendData *obj);
    4747  void UpdateDataToSendSize(void);
     
    5353  ssize_t send_size;
    5454  char *send_buffer;
    55   std::vector<const flair::gui::SendData *> data_to_send;
    56   std::vector<flair::core::Time> resume_time;
     55  std::vector<const flair::gui::SendData *> datasToSend;
     56  std::vector<flair::core::Time> resumeTimes;
    5757  flair::core::Mutex *send_mutex;
    5858  UDTSOCKET socket_fd;
     
    6262  //private part, called to effectively send to udt
    6363  void SendNRT(char *buf, ssize_t size,int ttl);
    64   void PushDatasToSend(const flair::gui::SendData *data_to_send);
     64  void PushDatasToSend(const flair::gui::SendData *dataToSend);
    6565  typedef struct {
    6666    char* buf;
    67     ssize_t size;
     67    size_t actual_size;
     68    size_t final_size;
    6869    uint16_t period;
    6970    uint16_t nb_buffering;
    7071  } PushedData_t;
    71   std::vector<PushedData_t> PushedDatas;
     72  std::vector<PushedData_t> pushedDatas;
    7273
    7374  static int compressBuffer(char *in, ssize_t in_size, char **out,
Note: See TracChangeset for help on using the changeset viewer.