Changeset 456 in flair-src for trunk/lib/FlairCore/src


Ignore:
Timestamp:
Oct 6, 2021, 11:03:44 AM (3 years ago)
Author:
Sanahuja Guillaume
Message:

refactor nb_buffering

Location:
trunk/lib/FlairCore/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/ui_com.cpp

    r453 r456  
    253253//le nrt recupere et decide quand envoyer :
    254254// -quand toutes les données d'une meme periode sont recues
    255 // -quand on a atteind le nb buffering
     255// -quand on a atteint le nb buffering
    256256// -le nrt alloue dynamiquement le buffer d'envoi
    257 //-> enelver l'alloc du buffer au startup (UpdateDataToSendSize)
    258257void ui_com::Run(void) {
    259258#ifdef __XENO__
     
    290289      send_mutex->GetMutex();
    291290
    292       //mulit buffering
     291      //multi buffering
    293292      for (size_t i = 0; i < datasToSend.size(); i++) {
    294293        if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) {
     
    328327//TODO: RT part has to put datas in pipe, then user thread call this
    329328void ui_com::PushDatasToSend(const SendData *dataToSend) {
    330   PushedData_t* ptr=NULL;
    331   //check if we already have one buffer for this couple period/nb_buffering
    332   for (size_t i = 0; i < pushedDatas.size(); i++) {
    333       if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) {
    334           ptr=&pushedDatas.at(i);
    335         }
    336   }
     329  PushedData_t* ptr=GetCorrespondingPushedData(dataToSend);
     330 
    337331  if(ptr==NULL) {
    338332      Warn("no match in buffers\n");
     333      return;
     334  }
     335 
     336  if(ptr->buf==NULL) {
     337      Warn("buffer not allocated\n");
    339338      return;
    340339  }
     
    539538
    540539void ui_com::AddSendData(const SendData *obj) {
    541   send_mutex->GetMutex();
    542 
    543   resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
    544 
    545   // on resume en meme temps tout ceux qui ont la meme periode
    546   for (size_t i = 0; i < datasToSend.size(); i++) {
    547     if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
    548       resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i);
    549       break;
    550     }
    551   }
    552 
    553   datasToSend.push_back(obj);
    554 
    555   send_mutex->ReleaseMutex();
    556 }
    557 
    558 //TODO: check if it is RT compatible
     540    send_mutex->GetMutex();
     541
     542    resumeTimes.push_back(0);//time will be updated in UpdateResumeTime
     543    datasToSend.push_back(obj);
     544    UpdateResumeTime(obj);
     545   
     546    send_mutex->ReleaseMutex();
     547}
     548
    559549//must be called with mutex locked
    560 void ui_com::UpdateSendData(const SendData *obj) {
    561    // Printf("UpdateSendData %s\n",obj->ObjectName().c_str());
    562   // le mutex est deja pris par l'appellant
    563   size_t id, i;
     550void ui_com::UpdateResumeTime(const SendData *obj) {
     551    size_t id, i;
    564552
    565553  // on recupere l'id
     
    571559  }
    572560
    573   // on resume en meme temps tout ceux qui ont la meme periode
     561  // on resume en meme temps tous ceux qui ont la meme periode
     562    // si quelqu'un a la meme periode on prend son resume time
    574563  for (i = 0; i < datasToSend.size(); i++) {
    575     if (i == id)
    576       continue;
     564    if (i == id) continue;
    577565    if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) {
    578566      resumeTimes.at(id) = resumeTimes.at(i);
     
    584572  if (i == datasToSend.size())
    585573    resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
    586 
    587   if (IsSuspended() == true) Resume();
    588 
    589 //update all buffers if necessary, discard datas
     574   
     575}
     576
     577ui_com::PushedData_t* ui_com::GetCorrespondingPushedData(const SendData *obj) {
     578    PushedData_t* ptr=NULL;
     579    //get buffer for this couple period/nb_buffering
     580    for (size_t i = 0; i < pushedDatas.size(); i++) {
     581        if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
     582            ptr=&pushedDatas.at(i);
     583            break;
     584        }
     585    }
     586    return ptr;
     587}
     588 
     589//TODO: check if it is RT compatible
     590//must be called with mutex locked
     591//called at senddata creation, or when period/nb_buffering has changed on gcs
     592//todo: not ralloc when change in buf size, but delete and create a PushedData_t? permet de factoriser avec les autres methodes?
     593void ui_com::UpdateSendData(const SendData *obj) {
     594   // Printf("UpdateSendData %s\n",obj->ObjectName().c_str());
     595  // le mutex est deja pris par l'appellant
     596 
     597    UpdateResumeTime(obj);
     598
     599    if (IsSuspended() == true) Resume();
     600
     601    //update all buffers if necessary, discard datas
    590602    std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin();
    591603    while(pushedDatasIterator != pushedDatas.end()) {
    592         size_t bufSize=0;
    593         for (size_t i = 0; i < datasToSend.size(); i++) {
    594             if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) {
    595                 bufSize+=datasToSend.at(i)->SendSize();
    596             }
    597         }
    598         bufSize*=(*pushedDatasIterator).nb_buffering;
    599         if(bufSize!=0) {
    600             bufSize+=sizeof(char)+sizeof(uint16_t);//header+period
    601             if((*pushedDatasIterator).nb_buffering>1) bufSize+=sizeof(uint16_t);//+nb_buffering
    602         }   
     604        size_t bufSize=CalcBufferSize(&(*pushedDatasIterator));
    603605        if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) {
    604606            //Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize);
     
    617619    }
    618620 
    619   //check if we need a new buffer for this couple period/nb_buffering
    620   if(obj->IsEnabled()) {
    621       bool match=false;
    622       for (size_t i = 0; i < pushedDatas.size(); i++) {
    623           if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
    624               //Printf("match item %i, period %i nb buff %i, size %i\n",i,obj->SendPeriod(),obj->NbBuffering(),pushedDatas.at(i).final_size);
    625               match=true;
    626             }
    627       }
    628       //create new one
    629       if(!match) {
    630         //printf("no match\n");
    631         PushedData_t tmp;
    632         tmp.period=obj->SendPeriod();
    633         tmp.nb_buffering=obj->NbBuffering();
    634         tmp.actual_size=0;
    635         tmp.final_size=0;
    636         for (size_t i = 0; i < datasToSend.size(); i++) {
    637             //Printf("data %s %i/%i %i %i\n",datasToSend.at(i)->ObjectName().c_str(),i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering());
    638             if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {
    639               tmp.final_size+=datasToSend.at(i)->SendSize();
    640               //Printf("add %i\n",datasToSend.at(i)->SendSize());
     621    //check if we need a new buffer for this couple period/nb_buffering
     622    if(obj->IsEnabled()) {
     623        PushedData_t *ptr=GetCorrespondingPushedData(obj);
     624        //create new one
     625        if(ptr==NULL) AddPushedData(obj);
     626    }
     627    //Printf("nb buf %i\n",pushedDatas.size());
     628    return;
     629}
     630
     631//called for exemple when we add a curve to a graph
     632void ui_com::UpdateDataToSendBufferSize(const SendData *obj) {
     633    if(!obj->IsEnabled()) return;
     634    //Printf("UpdateDataToSendBufferSize %s\n",obj->ObjectName().c_str());
     635    send_mutex->GetMutex();
     636
     637    PushedData_t* ptr=GetCorrespondingPushedData(obj);
     638 
     639    //create new one
     640    if(ptr==NULL) {
     641        AddPushedData(obj);
     642    } else {
     643        ptr->final_size=CalcBufferSize(ptr);
     644        if(ptr->final_size!=0) {
     645            free(ptr->buf);
     646            ptr->buf=(char*)malloc(ptr->final_size);
     647            if(ptr->buf==NULL) {
     648                Err("malloc failed for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering());
    641649            }
    642650        }
    643         tmp.final_size*=obj->NbBuffering();
    644         tmp.final_size+=sizeof(char)+sizeof(uint16_t);//header+period
    645         if(tmp.nb_buffering>1) tmp.final_size+=sizeof(uint16_t);//+nb_buffering
    646         //printf("final size %i\n",tmp.final_size);
     651    }
     652    send_mutex->ReleaseMutex();
     653}
     654
     655void ui_com::AddPushedData(const SendData *obj) {
     656    //Printf("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering());
     657    PushedData_t tmp;
     658    tmp.period=obj->SendPeriod();
     659    tmp.nb_buffering=obj->NbBuffering();
     660    tmp.actual_size=0;
     661    tmp.final_size=CalcBufferSize(&tmp);
     662    if(tmp.final_size!=0) {
    647663        tmp.buf=(char*)malloc(tmp.final_size);
    648664        if(tmp.buf!=NULL) {
     
    651667            Warn("could not allocate buffer of size %i\n",tmp.final_size);
    652668        }
    653       }
    654   }
    655   //Printf("nb buf %i\n",pushedDatas.size());
    656   return;
    657 }
    658 
    659 void ui_com::UpdateDataToSendBufferSize(const SendData *obj) {
    660     if(!obj->IsEnabled()) return;
    661     //Printf("UpdateDataToSendBufferSize %s\n",obj->ObjectName().c_str());
    662     send_mutex->GetMutex();
    663    
    664     PushedData_t* ptr=NULL;
    665     //get buffer for this couple period/nb_buffering
    666     for (size_t i = 0; i < pushedDatas.size(); i++) {
    667       if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) {
    668           ptr=&pushedDatas.at(i);
    669         }
    670     }
    671     if(ptr==NULL) {
    672         Err("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering());
    673         return;
    674     }
    675    
    676     ptr->final_size=0;
     669    }
     670}
     671
     672
     673//must be called with mutex locked
     674size_t ui_com::CalcBufferSize(const PushedData_t* ptr) const {
     675    size_t final_size=0;
    677676    //Printf("actual %i\n",ptr->final_size);
    678677    for (size_t i = 0; i < datasToSend.size(); i++) {
    679678        //Printf("data %s %i/%i %i %i\n",datasToSend.at(i)->ObjectName().c_str(),i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering());
    680         if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {
    681           ptr->final_size+=datasToSend.at(i)->SendSize();
    682           //Printf("add %i\n",datasToSend.at(i)->SendSize());
     679        if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == ptr->period && datasToSend.at(i)->NbBuffering()==ptr->nb_buffering) {
     680            final_size+=datasToSend.at(i)->SendSize();
     681            //Printf("add %i\n",datasToSend.at(i)->SendSize());
    683682        }
    684683    }
    685     ptr->final_size*=obj->NbBuffering();
    686     ptr->final_size+=sizeof(char)+sizeof(uint16_t);//header+period
    687     if(ptr->nb_buffering>1) ptr->final_size+=sizeof(uint16_t);//+nb_buffering
    688        
    689     //Printf("final %i\n",ptr->final_size);
    690     free(ptr->buf);
    691     ptr->buf=(char*)malloc(ptr->final_size);
     684    final_size*=ptr->nb_buffering;
     685    if(final_size==0) {
     686        return 0;
     687    }
     688    final_size+=sizeof(char)+sizeof(uint16_t);//header+period
     689    if(ptr->nb_buffering>1) final_size+=sizeof(uint16_t);//+nb_buffering
    692690   
    693     send_mutex->ReleaseMutex();
     691    return final_size;
    694692}
    695693
  • trunk/lib/FlairCore/src/unexported/ui_com.h

    r445 r456  
    5151  void CheckConnection(void);
    5252private:
     53    //datasToSend and resumeTimes are pushed/pop together
    5354  std::vector<const flair::gui::SendData *> datasToSend;
    5455  std::vector<flair::core::Time> resumeTimes;
     
    6162  void SendNRT(char *buf, ssize_t size,int ttl);
    6263  void PushDatasToSend(const flair::gui::SendData *dataToSend);
     64  void UpdateResumeTime(const flair::gui::SendData *obj);//must be called with mutex locked
    6365  typedef struct {
    6466    char* buf;
     
    6971  } PushedData_t;
    7072  std::vector<PushedData_t> pushedDatas;
     73  size_t CalcBufferSize(const PushedData_t* ptr) const;//must be called with mutex locked
     74  PushedData_t* GetCorrespondingPushedData(const flair::gui::SendData *dataToSend);//returns null if no corresponding buffer
     75  void AddPushedData(const flair::gui::SendData *obj);
     76
    7177
    7278  static int compressBuffer(char *in, ssize_t in_size, char **out,
Note: See TracChangeset for help on using the changeset viewer.