Changeset 456 in flair-src
- Timestamp:
- Oct 6, 2021, 11:03:44 AM (3 years ago)
- Location:
- trunk/lib/FlairCore/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/ui_com.cpp
r453 r456 253 253 //le nrt recupere et decide quand envoyer : 254 254 // -quand toutes les données d'une meme periode sont recues 255 // -quand on a attein dle nb buffering255 // -quand on a atteint le nb buffering 256 256 // -le nrt alloue dynamiquement le buffer d'envoi 257 //-> enelver l'alloc du buffer au startup (UpdateDataToSendSize)258 257 void ui_com::Run(void) { 259 258 #ifdef __XENO__ … … 290 289 send_mutex->GetMutex(); 291 290 292 //mul itbuffering291 //multi buffering 293 292 for (size_t i = 0; i < datasToSend.size(); i++) { 294 293 if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) { … … 328 327 //TODO: RT part has to put datas in pipe, then user thread call this 329 328 void ui_com::PushDatasToSend(const SendData *dataToSend) { 330 PushedData_t* ptr=NULL; 331 //check if we already have one buffer for this couple period/nb_buffering 332 for (size_t i = 0; i < pushedDatas.size(); i++) { 333 if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) { 334 ptr=&pushedDatas.at(i); 335 } 336 } 329 PushedData_t* ptr=GetCorrespondingPushedData(dataToSend); 330 337 331 if(ptr==NULL) { 338 332 Warn("no match in buffers\n"); 333 return; 334 } 335 336 if(ptr->buf==NULL) { 337 Warn("buffer not allocated\n"); 339 338 return; 340 339 } … … 539 538 540 539 void ui_com::AddSendData(const SendData *obj) { 541 send_mutex->GetMutex(); 542 543 resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000); 544 545 // on resume en meme temps tout ceux qui ont la meme periode 546 for (size_t i = 0; i < datasToSend.size(); i++) { 547 if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) { 548 resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i); 549 break; 550 } 551 } 552 553 datasToSend.push_back(obj); 554 555 send_mutex->ReleaseMutex(); 556 } 557 558 //TODO: check if it is RT compatible 540 send_mutex->GetMutex(); 541 542 resumeTimes.push_back(0);//time will be updated in UpdateResumeTime 543 datasToSend.push_back(obj); 544 UpdateResumeTime(obj); 545 546 send_mutex->ReleaseMutex(); 547 } 548 559 549 //must be called with mutex locked 560 void ui_com::UpdateSendData(const SendData *obj) { 561 // Printf("UpdateSendData %s\n",obj->ObjectName().c_str()); 562 // le mutex est deja pris par l'appellant 563 size_t id, i; 550 void ui_com::UpdateResumeTime(const SendData *obj) { 551 size_t id, i; 564 552 565 553 // on recupere l'id … … 571 559 } 572 560 573 // on resume en meme temps tout ceux qui ont la meme periode 561 // on resume en meme temps tous ceux qui ont la meme periode 562 // si quelqu'un a la meme periode on prend son resume time 574 563 for (i = 0; i < datasToSend.size(); i++) { 575 if (i == id) 576 continue; 564 if (i == id) continue; 577 565 if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) { 578 566 resumeTimes.at(id) = resumeTimes.at(i); … … 584 572 if (i == datasToSend.size()) 585 573 resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000; 586 587 if (IsSuspended() == true) Resume(); 588 589 //update all buffers if necessary, discard datas 574 575 } 576 577 ui_com::PushedData_t* ui_com::GetCorrespondingPushedData(const SendData *obj) { 578 PushedData_t* ptr=NULL; 579 //get buffer for this couple period/nb_buffering 580 for (size_t i = 0; i < pushedDatas.size(); i++) { 581 if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) { 582 ptr=&pushedDatas.at(i); 583 break; 584 } 585 } 586 return ptr; 587 } 588 589 //TODO: check if it is RT compatible 590 //must be called with mutex locked 591 //called at senddata creation, or when period/nb_buffering has changed on gcs 592 //todo: not ralloc when change in buf size, but delete and create a PushedData_t? permet de factoriser avec les autres methodes? 593 void ui_com::UpdateSendData(const SendData *obj) { 594 // Printf("UpdateSendData %s\n",obj->ObjectName().c_str()); 595 // le mutex est deja pris par l'appellant 596 597 UpdateResumeTime(obj); 598 599 if (IsSuspended() == true) Resume(); 600 601 //update all buffers if necessary, discard datas 590 602 std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin(); 591 603 while(pushedDatasIterator != pushedDatas.end()) { 592 size_t bufSize=0; 593 for (size_t i = 0; i < datasToSend.size(); i++) { 594 if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) { 595 bufSize+=datasToSend.at(i)->SendSize(); 596 } 597 } 598 bufSize*=(*pushedDatasIterator).nb_buffering; 599 if(bufSize!=0) { 600 bufSize+=sizeof(char)+sizeof(uint16_t);//header+period 601 if((*pushedDatasIterator).nb_buffering>1) bufSize+=sizeof(uint16_t);//+nb_buffering 602 } 604 size_t bufSize=CalcBufferSize(&(*pushedDatasIterator)); 603 605 if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) { 604 606 //Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize); … … 617 619 } 618 620 619 //check if we need a new buffer for this couple period/nb_buffering 620 if(obj->IsEnabled()) { 621 bool match=false; 622 for (size_t i = 0; i < pushedDatas.size(); i++) { 623 if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) { 624 //Printf("match item %i, period %i nb buff %i, size %i\n",i,obj->SendPeriod(),obj->NbBuffering(),pushedDatas.at(i).final_size); 625 match=true; 626 } 627 } 628 //create new one 629 if(!match) { 630 //printf("no match\n"); 631 PushedData_t tmp; 632 tmp.period=obj->SendPeriod(); 633 tmp.nb_buffering=obj->NbBuffering(); 634 tmp.actual_size=0; 635 tmp.final_size=0; 636 for (size_t i = 0; i < datasToSend.size(); i++) { 637 //Printf("data %s %i/%i %i %i\n",datasToSend.at(i)->ObjectName().c_str(),i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering()); 638 if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) { 639 tmp.final_size+=datasToSend.at(i)->SendSize(); 640 //Printf("add %i\n",datasToSend.at(i)->SendSize()); 621 //check if we need a new buffer for this couple period/nb_buffering 622 if(obj->IsEnabled()) { 623 PushedData_t *ptr=GetCorrespondingPushedData(obj); 624 //create new one 625 if(ptr==NULL) AddPushedData(obj); 626 } 627 //Printf("nb buf %i\n",pushedDatas.size()); 628 return; 629 } 630 631 //called for exemple when we add a curve to a graph 632 void ui_com::UpdateDataToSendBufferSize(const SendData *obj) { 633 if(!obj->IsEnabled()) return; 634 //Printf("UpdateDataToSendBufferSize %s\n",obj->ObjectName().c_str()); 635 send_mutex->GetMutex(); 636 637 PushedData_t* ptr=GetCorrespondingPushedData(obj); 638 639 //create new one 640 if(ptr==NULL) { 641 AddPushedData(obj); 642 } else { 643 ptr->final_size=CalcBufferSize(ptr); 644 if(ptr->final_size!=0) { 645 free(ptr->buf); 646 ptr->buf=(char*)malloc(ptr->final_size); 647 if(ptr->buf==NULL) { 648 Err("malloc failed for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering()); 641 649 } 642 650 } 643 tmp.final_size*=obj->NbBuffering(); 644 tmp.final_size+=sizeof(char)+sizeof(uint16_t);//header+period 645 if(tmp.nb_buffering>1) tmp.final_size+=sizeof(uint16_t);//+nb_buffering 646 //printf("final size %i\n",tmp.final_size); 651 } 652 send_mutex->ReleaseMutex(); 653 } 654 655 void ui_com::AddPushedData(const SendData *obj) { 656 //Printf("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering()); 657 PushedData_t tmp; 658 tmp.period=obj->SendPeriod(); 659 tmp.nb_buffering=obj->NbBuffering(); 660 tmp.actual_size=0; 661 tmp.final_size=CalcBufferSize(&tmp); 662 if(tmp.final_size!=0) { 647 663 tmp.buf=(char*)malloc(tmp.final_size); 648 664 if(tmp.buf!=NULL) { … … 651 667 Warn("could not allocate buffer of size %i\n",tmp.final_size); 652 668 } 653 } 654 } 655 //Printf("nb buf %i\n",pushedDatas.size()); 656 return; 657 } 658 659 void ui_com::UpdateDataToSendBufferSize(const SendData *obj) { 660 if(!obj->IsEnabled()) return; 661 //Printf("UpdateDataToSendBufferSize %s\n",obj->ObjectName().c_str()); 662 send_mutex->GetMutex(); 663 664 PushedData_t* ptr=NULL; 665 //get buffer for this couple period/nb_buffering 666 for (size_t i = 0; i < pushedDatas.size(); i++) { 667 if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) { 668 ptr=&pushedDatas.at(i); 669 } 670 } 671 if(ptr==NULL) { 672 Err("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering()); 673 return; 674 } 675 676 ptr->final_size=0; 669 } 670 } 671 672 673 //must be called with mutex locked 674 size_t ui_com::CalcBufferSize(const PushedData_t* ptr) const { 675 size_t final_size=0; 677 676 //Printf("actual %i\n",ptr->final_size); 678 677 for (size_t i = 0; i < datasToSend.size(); i++) { 679 678 //Printf("data %s %i/%i %i %i\n",datasToSend.at(i)->ObjectName().c_str(),i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering()); 680 if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) {681 ptr->final_size+=datasToSend.at(i)->SendSize();682 //Printf("add %i\n",datasToSend.at(i)->SendSize());679 if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == ptr->period && datasToSend.at(i)->NbBuffering()==ptr->nb_buffering) { 680 final_size+=datasToSend.at(i)->SendSize(); 681 //Printf("add %i\n",datasToSend.at(i)->SendSize()); 683 682 } 684 683 } 685 ptr->final_size*=obj->NbBuffering(); 686 ptr->final_size+=sizeof(char)+sizeof(uint16_t);//header+period 687 if(ptr->nb_buffering>1) ptr->final_size+=sizeof(uint16_t);//+nb_buffering 688 689 //Printf("final %i\n",ptr->final_size); 690 free(ptr->buf); 691 ptr->buf=(char*)malloc(ptr->final_size); 684 final_size*=ptr->nb_buffering; 685 if(final_size==0) { 686 return 0; 687 } 688 final_size+=sizeof(char)+sizeof(uint16_t);//header+period 689 if(ptr->nb_buffering>1) final_size+=sizeof(uint16_t);//+nb_buffering 692 690 693 send_mutex->ReleaseMutex();691 return final_size; 694 692 } 695 693 -
trunk/lib/FlairCore/src/unexported/ui_com.h
r445 r456 51 51 void CheckConnection(void); 52 52 private: 53 //datasToSend and resumeTimes are pushed/pop together 53 54 std::vector<const flair::gui::SendData *> datasToSend; 54 55 std::vector<flair::core::Time> resumeTimes; … … 61 62 void SendNRT(char *buf, ssize_t size,int ttl); 62 63 void PushDatasToSend(const flair::gui::SendData *dataToSend); 64 void UpdateResumeTime(const flair::gui::SendData *obj);//must be called with mutex locked 63 65 typedef struct { 64 66 char* buf; … … 69 71 } PushedData_t; 70 72 std::vector<PushedData_t> pushedDatas; 73 size_t CalcBufferSize(const PushedData_t* ptr) const;//must be called with mutex locked 74 PushedData_t* GetCorrespondingPushedData(const flair::gui::SendData *dataToSend);//returns null if no corresponding buffer 75 void AddPushedData(const flair::gui::SendData *obj); 76 71 77 72 78 static int compressBuffer(char *in, ssize_t in_size, char **out,
Note:
See TracChangeset
for help on using the changeset viewer.