Changeset 441 in flair-src
- Timestamp:
- Aug 31, 2021, 4:15:03 PM (3 years ago)
- Location:
- trunk/lib/FlairCore/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/ui_com.cpp
r440 r441 284 284 // on recpuere l'id de la prochaine execution 285 285 send_mutex->GetMutex(); 286 resume_id = resume _time.size();287 for (size_t i = 0; i < resume _time.size(); i++) {288 if (resume _time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) {289 min = resume _time.at(i);286 resume_id = resumeTimes.size(); 287 for (size_t i = 0; i < resumeTimes.size(); i++) { 288 if (resumeTimes.at(i) < min && datasToSend.at(i)->IsEnabled() == true) { 289 min = resumeTimes.at(i); 290 290 resume_id = i; 291 291 } … … 293 293 294 294 // attente 295 if (resume_id < resume _time.size()) {296 Time time = resume _time.at(resume_id);297 uint16_t resume_period = data _to_send.at(resume_id)->SendPeriod();295 if (resume_id < resumeTimes.size()) { 296 Time time = resumeTimes.at(resume_id); 297 uint16_t resume_period = datasToSend.at(resume_id)->SendPeriod(); 298 298 send_mutex->ReleaseMutex(); 299 299 // on dort jusqu'a la prochaine execution … … 305 305 send_mutex->GetMutex(); 306 306 307 for (size_t i = 0; i < data _to_send.size(); i++) {308 if (data _to_send.at(i)->SendPeriod() == resume_period && data_to_send.at(i)->IsEnabled() == true) {309 data _to_send.at(i)->CopyDatas(send_buffer + offset);310 offset += data _to_send.at(i)->SendSize();307 for (size_t i = 0; i < datasToSend.size(); i++) { 308 if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) { 309 datasToSend.at(i)->CopyDatas(send_buffer + offset); 310 offset += datasToSend.at(i)->SendSize(); 311 311 } 312 312 } … … 320 320 321 321 //test to push datas 322 for (size_t i = 0; i < data _to_send.size(); i++) {323 if (data _to_send.at(i)->SendPeriod() == resume_period && data_to_send.at(i)->IsEnabled() == true) {324 PushDatasToSend(data _to_send.at(i));322 for (size_t i = 0; i < datasToSend.size(); i++) { 323 if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) { 324 PushDatasToSend(datasToSend.at(i)); 325 325 } 326 326 } … … 328 328 329 329 // on planifie la prochaine execution 330 for (size_t i = 0; i < data _to_send.size(); i++) {331 if (data _to_send.at(i)->SendPeriod() == resume_period) {332 resume _time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000;330 for (size_t i = 0; i < datasToSend.size(); i++) { 331 if (datasToSend.at(i)->SendPeriod() == resume_period) { 332 resumeTimes.at(i) += datasToSend.at(i)->SendPeriod() * 1000000; 333 333 } 334 334 } … … 344 344 Time time = GetTime(); 345 345 send_mutex->GetMutex(); 346 for (size_t i = 0; i < data _to_send.size(); i++) {347 resume _time.at(i) =348 time + (Time)data _to_send.at(i)->SendPeriod() * 1000000;346 for (size_t i = 0; i < datasToSend.size(); i++) { 347 resumeTimes.at(i) = 348 time + (Time)datasToSend.at(i)->SendPeriod() * 1000000; 349 349 } 350 350 send_mutex->ReleaseMutex(); … … 356 356 //called with send_mutex locked 357 357 //in nrt 358 //TODO: RT part has to put datas in pipe, then use er thread call this359 void ui_com::PushDatasToSend(const SendData *data _to_send) {358 //TODO: RT part has to put datas in pipe, then user thread call this 359 void ui_com::PushDatasToSend(const SendData *dataToSend) { 360 360 PushedData_t* ptr=NULL; 361 361 //check if we already have one buffer for this couple period/nb_buffering 362 for (size_t i = 0; i < PushedDatas.size(); i++) { 363 if (PushedDatas.at(i).period==data_to_send->SendPeriod() && PushedDatas.at(i).nb_buffering==data_to_send->NbBuffering()) { 364 Printf("match item %i, period %i nb buff %i\n",i,data_to_send->SendPeriod(),data_to_send->NbBuffering()); 365 ptr=&PushedDatas.at(i); 366 } 367 } 368 //otherwire create new one 362 for (size_t i = 0; i < pushedDatas.size(); i++) { 363 if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) { 364 ptr=&pushedDatas.at(i); 365 } 366 } 369 367 if(ptr==NULL) { 370 printf("no match\n"); 371 PushedData_t tmp; 372 tmp.period=data_to_send->SendPeriod(); 373 tmp.nb_buffering=data_to_send->NbBuffering(); 374 tmp.size=0; 375 tmp.buf=NULL; 376 PushedDatas.push_back(tmp); 377 ptr=&PushedDatas.back(); 378 } 379 380 ptr->buf=(char*)realloc(ptr->buf,ptr->size+data_to_send->SendSize()); 368 Warn("no match in buffers\n"); 369 return; 370 } 381 371 382 data_to_send->CopyDatas(ptr->buf+ptr->size); 383 ptr->size+=data_to_send->SendSize(); 384 Printf("nb buffered %i\n",ptr->size); 385 //TODO: savoir quand a fait tous les nb buffers 386 Printf("pushed size %i period %i nb buffering %i\n",data_to_send->SendSize(),data_to_send->SendPeriod(),data_to_send->NbBuffering()); 372 dataToSend->CopyDatas(ptr->buf+ptr->actual_size); 373 ptr->actual_size+=dataToSend->SendSize(); 374 Printf("nb buffered %i\n",ptr->actual_size); 375 Printf("pushed size %i period %i nb buffering %i\n",dataToSend->SendSize(),dataToSend->SendPeriod(),dataToSend->NbBuffering()); 376 if(ptr->actual_size==ptr->final_size) { 377 Printf("ready to send\n"); 378 //clean 379 ptr->actual_size=0; 380 } 387 381 } 388 382 … … 546 540 send_mutex->GetMutex(); 547 541 548 resume _time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);542 resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000); 549 543 550 544 // on resume en meme temps tout ceux qui ont la meme periode 551 for (size_t i = 0; i < data _to_send.size(); i++) {552 if (data _to_send.at(i)->SendPeriod() == obj->SendPeriod()) {553 resume _time.at(resume_time.size() - 1) = resume_time.at(i);545 for (size_t i = 0; i < datasToSend.size(); i++) { 546 if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) { 547 resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i); 554 548 break; 555 549 } 556 550 } 557 551 558 data _to_send.push_back(obj);552 datasToSend.push_back(obj); 559 553 560 554 send_mutex->ReleaseMutex(); 561 555 } 562 556 557 //TODO: check if it is RT compatible 563 558 void ui_com::UpdateSendData(const SendData *obj) { 564 559 // le mutex est deja pris par l'appellant … … 566 561 567 562 // on recupere l'id 568 for (i = 0; i < data _to_send.size(); i++) {569 if (data _to_send.at(i) == obj) {563 for (i = 0; i < datasToSend.size(); i++) { 564 if (datasToSend.at(i) == obj) { 570 565 id = i; 571 566 break; … … 574 569 575 570 // on resume en meme temps tout ceux qui ont la meme periode 576 for (i = 0; i < data _to_send.size(); i++) {571 for (i = 0; i < datasToSend.size(); i++) { 577 572 if (i == id) 578 573 continue; 579 if (data_to_send.at(i)->IsEnabled() == true && 580 data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) { 581 resume_time.at(id) = resume_time.at(i); 574 if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) { 575 resumeTimes.at(id) = resumeTimes.at(i); 582 576 break; 583 577 } … … 585 579 586 580 // si aucun match, on planifie l'execution 587 if (i == data_to_send.size()) 588 resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000; 589 590 if (IsSuspended() == true) 591 Resume(); 592 581 if (i == datasToSend.size()) 582 resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000; 583 584 if (IsSuspended() == true) Resume(); 585 586 //update all buffers if necessary, discard datas 587 std::vector<PushedData_t>::iterator pushedDatasIterator = pushedDatas.begin(); 588 while(pushedDatasIterator != pushedDatas.end()) { 589 size_t bufSize=0; 590 for (size_t i = 0; i < datasToSend.size(); i++) { 591 if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) { 592 bufSize+=datasToSend.at(i)->SendSize(); 593 } 594 } 595 bufSize*=(*pushedDatasIterator).nb_buffering; 596 if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) { 597 Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize); 598 (*pushedDatasIterator).actual_size=0; 599 (*pushedDatasIterator).final_size=bufSize; 600 free((*pushedDatasIterator).buf); 601 (*pushedDatasIterator).buf=(char*)malloc(bufSize); 602 } 603 if(bufSize==0) { 604 Printf("delete buf\n"); 605 free((*pushedDatasIterator).buf); 606 pushedDatasIterator = pushedDatas.erase(pushedDatasIterator); 607 } else { 608 ++pushedDatasIterator; 609 } 610 } 611 612 //check if we need a new buffer for this couple period/nb_buffering 613 if(obj->IsEnabled()) { 614 bool match=false; 615 for (size_t i = 0; i < pushedDatas.size(); i++) { 616 if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) { 617 Printf("match item %i, period %i nb buff %i\n",i,obj->SendPeriod(),obj->NbBuffering()); 618 match=true; 619 } 620 } 621 //create new one 622 if(!match) { 623 printf("no match\n"); 624 PushedData_t tmp; 625 tmp.period=obj->SendPeriod(); 626 tmp.nb_buffering=obj->NbBuffering(); 627 tmp.actual_size=0; 628 tmp.final_size=0; 629 for (size_t i = 0; i < datasToSend.size(); i++) { 630 if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) { 631 tmp.final_size+=datasToSend.at(i)->SendSize(); 632 } 633 } 634 tmp.final_size*=obj->NbBuffering(); 635 printf("final size %i\n",tmp.final_size); 636 tmp.buf=(char*)malloc(tmp.final_size); 637 if(tmp.buf!=NULL) { 638 pushedDatas.push_back(tmp); 639 } else { 640 Warn("could not allocate buffer of size %i\n",tmp.final_size); 641 } 642 } 643 } 644 Printf("nb buf %i\n",pushedDatas.size()); 593 645 return; 594 646 } … … 600 652 send_mutex->GetMutex(); 601 653 send_size = 3; // id(1)+period(2) 602 for (size_t i = 0; i < data _to_send.size(); i++) {603 if (data _to_send[i] != NULL)604 send_size += data _to_send[i]->SendSize();654 for (size_t i = 0; i < datasToSend.size(); i++) { 655 if (datasToSend[i] != NULL) 656 send_size += datasToSend[i]->SendSize(); 605 657 } 606 658 … … 617 669 send_mutex->GetMutex(); 618 670 // on recupere l'id 619 for (size_t i = 0; i < data _to_send.size(); i++) {620 if (data _to_send.at(i) == obj) {621 data _to_send.erase(data_to_send.begin() + i);622 resume _time.erase(resume_time.begin() + i);671 for (size_t i = 0; i < datasToSend.size(); i++) { 672 if (datasToSend.at(i) == obj) { 673 datasToSend.erase(datasToSend.begin() + i); 674 resumeTimes.erase(resumeTimes.begin() + i); 623 675 // printf("remove_data_to_send %i ok\n",data_to_send.size()); 624 676 break; -
trunk/lib/FlairCore/src/unexported/ui_com.h
r440 r441 43 43 ssize_t Receive(char *buf, ssize_t buf_size); 44 44 void AddSendData(const flair::gui::SendData *obj); 45 void UpdateSendData(const flair::gui::SendData *obj); 45 void UpdateSendData(const flair::gui::SendData *obj);//must be called with mutex locked 46 46 void RemoveSendData(const flair::gui::SendData *obj); 47 47 void UpdateDataToSendSize(void); … … 53 53 ssize_t send_size; 54 54 char *send_buffer; 55 std::vector<const flair::gui::SendData *> data _to_send;56 std::vector<flair::core::Time> resume _time;55 std::vector<const flair::gui::SendData *> datasToSend; 56 std::vector<flair::core::Time> resumeTimes; 57 57 flair::core::Mutex *send_mutex; 58 58 UDTSOCKET socket_fd; … … 62 62 //private part, called to effectively send to udt 63 63 void SendNRT(char *buf, ssize_t size,int ttl); 64 void PushDatasToSend(const flair::gui::SendData *data _to_send);64 void PushDatasToSend(const flair::gui::SendData *dataToSend); 65 65 typedef struct { 66 66 char* buf; 67 ssize_t size; 67 size_t actual_size; 68 size_t final_size; 68 69 uint16_t period; 69 70 uint16_t nb_buffering; 70 71 } PushedData_t; 71 std::vector<PushedData_t> PushedDatas;72 std::vector<PushedData_t> pushedDatas; 72 73 73 74 static int compressBuffer(char *in, ssize_t in_size, char **out,
Note:
See TracChangeset
for help on using the changeset viewer.