// %flair:license{ // This file is part of the Flair framework distributed under the // CECILL-C License, Version 1.0. // %flair:license} // created: 2011/05/01 // filename: ui_com.cpp // // author: Guillaume Sanahuja // Copyright Heudiasyc UMR UTC/CNRS 7253 // // version: $Id: $ // // purpose: classe permettant la lecture et l'ecriture sur socket UDT // // /*********************************************************************/ #include "ui_com.h" #include #include #include #include "Mutex.h" #include "SendData.h" #include "communication.h" #include "FrameworkManager.h" #include "zlib.h" #include #include "config.h" #ifdef __XENO__ #include #include #include #include #endif using std::string; using namespace flair::core; using namespace flair::gui; ui_com::ui_com(const Object *parent, UDTSOCKET sock) : Thread(parent, "send", 2,16384*2) { // mutex send_mutex = new Mutex(this, ObjectName()); socket_fd = sock; connection_lost = false; #ifdef __XENO__ Err("multi buffering is not well implemented in RT\n"); #endif #ifdef __XENO__ int status; string tmp_name; is_running = true; // pipe tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe"; // xenomai limitation if (tmp_name.size() > 31) Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str()); #ifdef RT_PIPE_SIZE status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE); #else status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0); #endif if (status != 0) { char errorMsg[256]; Err("rt_pipe_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg))); // return -1; } // start user side thread #ifdef NRT_STACK_SIZE // Initialize thread creation attributes pthread_attr_t attr; if (pthread_attr_init(&attr) != 0) { Err("pthread_attr_init error\n"); } if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) { Err("pthread_attr_setstacksize error\n"); } if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0) #else // NRT_STACK_SIZE if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0) #endif // NRT_STACK_SIZE { Err("pthread_create error\n"); // return -1; } #ifdef NRT_STACK_SIZE if (pthread_attr_destroy(&attr) != 0) { Err("pthread_attr_destroy error\n"); } #endif #endif //__XENO__ int timeout = 100; if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0) Err("UDT::setsockopt error (UDT_RCVTIMEO)\n"); /* timeout=-1; if (UDT::setsockopt(socket_fd, 0, UDT_SNDTIMEO, &timeout, sizeof(int)) != 0) Err("UDT::setsockopt error (UDT_SNDTIMEO)\n"); */ bool blocking = true; if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0) Err("UDT::setsockopt error (UDT_SNDSYN)\n"); if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0) Err("UDT::setsockopt error (UDT_RCVSYN)\n"); //#endif //__XENO__ Start(); } ui_com::~ui_com() { //Printf("destruction ui_com\n"); #ifdef __XENO__ is_running = false; pthread_join(thread, NULL); int status = rt_pipe_delete(&pipe); if (status != 0) { char errorMsg[256]; Err("rt_pipe_delete error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg))); } #endif SafeStop(); if (IsSuspended() == true) Resume(); Join(); char buf=CLOSING_CONNECTION; Send(&buf,1); for (size_t i = 0; i < pushedDatas.size(); i++) { free(pushedDatas.at(i).buf); } //Printf("destruction ui_com ok\n"); } //send, public part; dispatch to SendNRT (nrt) or rt_pipe (rt) //rt_pipe is read by user thread which calls SendNRT void ui_com::Send(char *buf, ssize_t size,int ttl) { if (connection_lost == true) return; char *tosend = buf; if (buf[0] == XML_HEADER) { // cut xml header tosend = strstr(buf, " enelver l'alloc du buffer au startup (UpdateDataToSendSize) void ui_com::Run(void) { #ifdef __XENO__ WarnUponSwitches(true); printf("\n"); // a revoir pourquoi?? // sans ce printf, dans le simu_roll, le suspend ne fonctionne pas... #endif // on attend d'avoir des choses à faire Suspend(); while (!ToBeStopped()) { size_t resume_id; Time min = 0xffffffffffffffffULL; // on recpuere l'id de la prochaine execution send_mutex->GetMutex(); resume_id = resumeTimes.size(); for (size_t i = 0; i < resumeTimes.size(); i++) { if (resumeTimes.at(i) < min && datasToSend.at(i)->IsEnabled() == true) { min = resumeTimes.at(i); resume_id = i; } } // attente if (resume_id < resumeTimes.size()) { Time time = resumeTimes.at(resume_id); uint16_t resume_period = datasToSend.at(resume_id)->SendPeriod(); send_mutex->ReleaseMutex(); // on dort jusqu'a la prochaine execution SleepUntil(time); // envoi des donnees send_mutex->GetMutex(); //mulit buffering for (size_t i = 0; i < datasToSend.size(); i++) { if (datasToSend.at(i)->SendPeriod() == resume_period && datasToSend.at(i)->IsEnabled() == true) { PushDatasToSend(datasToSend.at(i)); } } // on planifie la prochaine execution for (size_t i = 0; i < datasToSend.size(); i++) { if (datasToSend.at(i)->SendPeriod() == resume_period) { resumeTimes.at(i) += datasToSend.at(i)->SendPeriod() * 1000000; } } send_mutex->ReleaseMutex(); //Printf("%i %lld\n",resume_period,GetTime()/1000000); } else { send_mutex->ReleaseMutex(); // rien a faire, suspend //Printf("rien a faire suspend\n"); Suspend(); //Printf("wake\n"); // on planifie la prochaine execution Time time = GetTime(); send_mutex->GetMutex(); for (size_t i = 0; i < datasToSend.size(); i++) { resumeTimes.at(i) = time + (Time)datasToSend.at(i)->SendPeriod() * 1000000; } send_mutex->ReleaseMutex(); } } } //called with send_mutex locked //in nrt //TODO: RT part has to put datas in pipe, then user thread call this void ui_com::PushDatasToSend(const SendData *dataToSend) { PushedData_t* ptr=NULL; //check if we already have one buffer for this couple period/nb_buffering for (size_t i = 0; i < pushedDatas.size(); i++) { if (pushedDatas.at(i).period==dataToSend->SendPeriod() && pushedDatas.at(i).nb_buffering==dataToSend->NbBuffering()) { ptr=&pushedDatas.at(i); } } if(ptr==NULL) { Warn("no match in buffers\n"); return; } if(ptr->actual_size==0) { if(ptr->nb_buffering>1) { if (IsBigEndian()) { ptr->buf[0] = MULTIPLE_DATA_BIG_ENDIAN; } else { ptr->buf[0] = MULTIPLE_DATA_LITTLE_ENDIAN; } ptr->actual_size+=sizeof(char); memcpy(ptr->buf+ptr->actual_size,&(ptr->period),sizeof(uint16_t)); ptr->actual_size+=sizeof(uint16_t); memcpy(ptr->buf+ptr->actual_size,&(ptr->nb_buffering),sizeof(uint16_t)); ptr->actual_size+=sizeof(uint16_t); } else { if (IsBigEndian()) { ptr->buf[0] = DATA_BIG_ENDIAN; } else { ptr->buf[0] = DATA_LITTLE_ENDIAN; } ptr->actual_size+=sizeof(char); memcpy(ptr->buf+ptr->actual_size,&(ptr->period),sizeof(uint16_t)); ptr->actual_size+=sizeof(uint16_t); } } dataToSend->CopyDatas(ptr->buf+ptr->actual_size); ptr->actual_size+=dataToSend->SendSize(); //Printf("nb buffered %i/%i\n",ptr->actual_size,ptr->final_size); //Printf("pushed size %i period %i nb buffering %i\n",dataToSend->SendSize(),dataToSend->SendPeriod(),dataToSend->NbBuffering()); if(ptr->actual_size>=ptr->final_size) {//par securité on test aussi le cas ou supérieur //Printf("ready to send\n"); SendNRT(ptr->buf, ptr->actual_size,ptr->period); //clean ptr->actual_size=0; } } #ifdef __XENO__ void *ui_com::user_thread(void *arg) { int pipe_fd = -1; string devname; char *buf = NULL; ui_com *caller = (ui_com *)arg; int rv; fd_set set; struct timeval timeout; ssize_t nb_read; int ttl; buf = (char *)malloc(NRT_PIPE_SIZE); if (buf == NULL) { caller->Err("malloc error\n"); } devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" + caller->ObjectName() + "-pipe"; while (pipe_fd < 0) { pipe_fd = open(devname.c_str(), O_RDWR); if (pipe_fd < 0 && errno != ENOENT) { char errorMsg[256]; caller->Err("open pipe_fd error (%s)\n", strerror_r(errno, errorMsg, sizeof(errorMsg))); } usleep(1000); } while (1) { FD_ZERO(&set); // clear the set FD_SET(pipe_fd, &set); // add our file descriptor to the set timeout.tv_sec = 0; timeout.tv_usec = SELECT_TIMEOUT_MS * 1000; rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout); if (rv == -1) { if (caller->is_running == false && UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT) break; // timeout if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT) caller->Err("epoll_wait, %s, code %i\n", UDT::getlasterror().getErrorMessage(), UDT::getlasterror().getErrorCode()); } else if (rv == 0) { // printf("timeout\n"); // a timeout occured if (caller->is_running == false) break; } else { //read data buf and ttl //RT part has made 2 rt_pipe_write; they should be available in nrt //(select ensure at least one data is ready) nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE); buf[nb_read] = 0; read(pipe_fd, &ttl, sizeof(ttl)); // printf("envoi\n%s\n",buf); caller->SendNRT(buf, nb_read,ttl); } } close(pipe_fd); if (buf != NULL) free(buf); pthread_exit(0); } #endif int ui_com::compressBuffer(char *in, ssize_t in_size, char **out, ssize_t *out_size, int level) { int ret, flush; unsigned have; z_stream strm; /* allocate deflate state */ strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; ret = deflateInit(&strm, level); if (ret != Z_OK) return ret; *out = (char *)malloc(COMPRESS_CHUNK); if (!(*out)) return Z_BUF_ERROR; strm.next_in = (unsigned char *)in; strm.avail_out = COMPRESS_CHUNK; strm.next_out = (unsigned char *)*out; strm.avail_in = in_size; flush = Z_FINISH; ret = deflate(&strm, flush); /* no bad return value */ if (ret == Z_STREAM_ERROR) { free(*out); return ret; } have = COMPRESS_CHUNK - strm.avail_out; *out_size = have; // printf("%i -> %i\n",in_size,have); /* clean up and return */ (void)deflateEnd(&strm); if (strm.avail_out != 0) { return Z_OK; } else { return Z_STREAM_ERROR; } } int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size, unsigned char **out, ssize_t *out_size) { int ret; // unsigned have; z_stream strm; /* allocate inflate state */ strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; strm.avail_in = 0; strm.next_in = Z_NULL; ret = inflateInit(&strm); if (ret != Z_OK) return ret; *out = (unsigned char *)malloc(COMPRESS_CHUNK); if (!(*out)) return Z_BUF_ERROR; strm.avail_in = in_size; strm.next_in = in; strm.avail_out = COMPRESS_CHUNK; strm.next_out = *out; ret = inflate(&strm, Z_NO_FLUSH); assert(ret != Z_STREAM_ERROR); /* state not clobbered */ switch (ret) { case Z_NEED_DICT: ret = Z_DATA_ERROR; /* and fall through */ case Z_DATA_ERROR: case Z_MEM_ERROR: (void)inflateEnd(&strm); return ret; } // have = COMPRESS_CHUNK - strm.avail_out; /* clean up and return */ (void)inflateEnd(&strm); return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR; } bool ui_com::ConnectionLost() { return connection_lost; } void ui_com::AddSendData(const SendData *obj) { send_mutex->GetMutex(); resumeTimes.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000); // on resume en meme temps tout ceux qui ont la meme periode for (size_t i = 0; i < datasToSend.size(); i++) { if (datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) { resumeTimes.at(resumeTimes.size() - 1) = resumeTimes.at(i); break; } } datasToSend.push_back(obj); send_mutex->ReleaseMutex(); } //TODO: check if it is RT compatible //must be called with mutex locked void ui_com::UpdateSendData(const SendData *obj) { // Printf("UpdateSendData %s\n",obj->ObjectName().c_str()); // le mutex est deja pris par l'appellant size_t id, i; // on recupere l'id for (i = 0; i < datasToSend.size(); i++) { if (datasToSend.at(i) == obj) { id = i; break; } } // on resume en meme temps tout ceux qui ont la meme periode for (i = 0; i < datasToSend.size(); i++) { if (i == id) continue; if (datasToSend.at(i)->IsEnabled() == true && datasToSend.at(i)->SendPeriod() == obj->SendPeriod()) { resumeTimes.at(id) = resumeTimes.at(i); break; } } // si aucun match, on planifie l'execution if (i == datasToSend.size()) resumeTimes.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000; if (IsSuspended() == true) Resume(); //update all buffers if necessary, discard datas std::vector::iterator pushedDatasIterator = pushedDatas.begin(); while(pushedDatasIterator != pushedDatas.end()) { size_t bufSize=0; for (size_t i = 0; i < datasToSend.size(); i++) { if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == (*pushedDatasIterator).period && datasToSend.at(i)->NbBuffering()==(*pushedDatasIterator).nb_buffering) { bufSize+=datasToSend.at(i)->SendSize(); } } bufSize*=(*pushedDatasIterator).nb_buffering; if(bufSize!=0) { bufSize+=sizeof(char)+sizeof(uint16_t);//header+period if((*pushedDatasIterator).nb_buffering>1) bufSize+=sizeof(uint16_t);//+nb_buffering } if(bufSize!=(*pushedDatasIterator).final_size && bufSize!=0) { //Printf("change buf size %i->%i\n",(*pushedDatasIterator).final_size,bufSize); (*pushedDatasIterator).actual_size=0; (*pushedDatasIterator).final_size=bufSize; free((*pushedDatasIterator).buf); (*pushedDatasIterator).buf=(char*)malloc(bufSize); } if(bufSize==0) { //Printf("delete buf\n"); free((*pushedDatasIterator).buf); pushedDatasIterator = pushedDatas.erase(pushedDatasIterator); } else { ++pushedDatasIterator; } } //check if we need a new buffer for this couple period/nb_buffering if(obj->IsEnabled()) { bool match=false; for (size_t i = 0; i < pushedDatas.size(); i++) { if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) { //Printf("match item %i, period %i nb buff %i, size %i\n",i,obj->SendPeriod(),obj->NbBuffering(),pushedDatas.at(i).final_size); match=true; } } //create new one if(!match) { //printf("no match\n"); PushedData_t tmp; tmp.period=obj->SendPeriod(); tmp.nb_buffering=obj->NbBuffering(); tmp.actual_size=0; tmp.final_size=0; for (size_t i = 0; i < datasToSend.size(); i++) { //Printf("data %s %i/%i %i %i\n",datasToSend.at(i)->ObjectName().c_str(),i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering()); if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) { tmp.final_size+=datasToSend.at(i)->SendSize(); //Printf("add %i\n",datasToSend.at(i)->SendSize()); } } tmp.final_size*=obj->NbBuffering(); tmp.final_size+=sizeof(char)+sizeof(uint16_t);//header+period if(tmp.nb_buffering>1) tmp.final_size+=sizeof(uint16_t);//+nb_buffering //printf("final size %i\n",tmp.final_size); tmp.buf=(char*)malloc(tmp.final_size); if(tmp.buf!=NULL) { pushedDatas.push_back(tmp); } else { Warn("could not allocate buffer of size %i\n",tmp.final_size); } } } //Printf("nb buf %i\n",pushedDatas.size()); return; } void ui_com::UpdateDataToSendBufferSize(const SendData *obj) { if(!obj->IsEnabled()) return; //Printf("UpdateDataToSendBufferSize %s\n",obj->ObjectName().c_str()); send_mutex->GetMutex(); PushedData_t* ptr=NULL; //get buffer for this couple period/nb_buffering for (size_t i = 0; i < pushedDatas.size(); i++) { if (pushedDatas.at(i).period==obj->SendPeriod() && pushedDatas.at(i).nb_buffering==obj->NbBuffering()) { ptr=&pushedDatas.at(i); } } if(ptr==NULL) { Err("no corresponding match for couple %i %i\n",obj->SendPeriod(),obj->NbBuffering()); return; } ptr->final_size=0; //Printf("actual %i\n",ptr->final_size); for (size_t i = 0; i < datasToSend.size(); i++) { //Printf("data %s %i/%i %i %i\n",datasToSend.at(i)->ObjectName().c_str(),i,datasToSend.size(),datasToSend.at(i)->SendPeriod() ,datasToSend.at(i)->NbBuffering()); if (datasToSend.at(i)->IsEnabled() && datasToSend.at(i)->SendPeriod() == obj->SendPeriod() && datasToSend.at(i)->NbBuffering()==obj->NbBuffering()) { ptr->final_size+=datasToSend.at(i)->SendSize(); //Printf("add %i\n",datasToSend.at(i)->SendSize()); } } ptr->final_size*=obj->NbBuffering(); ptr->final_size+=sizeof(char)+sizeof(uint16_t);//header+period if(ptr->nb_buffering>1) ptr->final_size+=sizeof(uint16_t);//+nb_buffering //Printf("final %i\n",ptr->final_size); free(ptr->buf); ptr->buf=(char*)malloc(ptr->final_size); send_mutex->ReleaseMutex(); } void ui_com::RemoveSendData(const SendData *obj) { // printf("remove_data_to_send %i\n",data_to_send.size()); send_mutex->GetMutex(); // on recupere l'id for (size_t i = 0; i < datasToSend.size(); i++) { if (datasToSend.at(i) == obj) { datasToSend.erase(datasToSend.begin() + i); resumeTimes.erase(resumeTimes.begin() + i); // printf("remove_data_to_send %i ok\n",data_to_send.size()); break; } } send_mutex->ReleaseMutex(); return; } void ui_com::Block(void) { send_mutex->GetMutex(); } void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }