Changeset 15 in flair-src for trunk/lib/FlairCore/src/ui_com.cpp
- Timestamp:
- Apr 8, 2016, 3:40:57 PM (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/ui_com.cpp
r2 r15 39 39 using namespace flair::gui; 40 40 41 ui_com::ui_com(const Object *parent, UDTSOCKET sock): Thread(parent,"send",2)42 {43 //buffer envoi44 send_buffer=(char*)malloc(3);45 send_size=3;//id(1)+period(2)46 47 //mutex48 send_mutex=NULL;49 send_mutex=new Mutex(this,ObjectName());50 51 socket_fd=sock;52 connection_lost=false;41 ui_com::ui_com(const Object *parent, UDTSOCKET sock) 42 : Thread(parent, "send", 2) { 43 // buffer envoi 44 send_buffer = (char *)malloc(3); 45 send_size = 3; // id(1)+period(2) 46 47 // mutex 48 send_mutex = NULL; 49 send_mutex = new Mutex(this, ObjectName()); 50 51 socket_fd = sock; 52 connection_lost = false; 53 53 54 54 #ifdef __XENO__ 55 int status; 56 string tmp_name; 57 58 is_running=true; 59 60 //pipe 61 tmp_name= getFrameworkManager()->ObjectName() + "-" + ObjectName()+ "-pipe"; 62 //xenomai limitation 63 if(tmp_name.size()>31) Err("rt_pipe_create error (%s is too long)\n",tmp_name.c_str()); 55 int status; 56 string tmp_name; 57 58 is_running = true; 59 60 // pipe 61 tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe"; 62 // xenomai limitation 63 if (tmp_name.size() > 31) 64 Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str()); 64 65 #ifdef RT_PIPE_SIZE 65 status=rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO,RT_PIPE_SIZE);66 status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE); 66 67 #else 67 status=rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO,0); 68 #endif 69 70 if(status!=0) 71 { 72 Err("rt_pipe_create error (%s)\n",strerror(-status)); 73 //return -1; 74 } 75 76 //start user side thread 68 status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0); 69 #endif 70 71 if (status != 0) { 72 Err("rt_pipe_create error (%s)\n", strerror(-status)); 73 // return -1; 74 } 75 76 // start user side thread 77 77 #ifdef NRT_STACK_SIZE 78 // Initialize thread creation attributes 79 pthread_attr_t attr; 80 if(pthread_attr_init(&attr) != 0) 81 { 82 Err("pthread_attr_init error\n"); 83 } 84 85 if(pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) 86 { 87 Err("pthread_attr_setstacksize error\n"); 88 } 89 90 if(pthread_create(&thread, &attr, user_thread, (void*)this) < 0) 91 #else //NRT_STACK_SIZE 92 if(pthread_create(&thread, NULL, user_thread, (void*)this) < 0) 93 #endif //NRT_STACK_SIZE 94 { 95 Err("pthread_create error\n"); 96 //return -1; 97 } 78 // Initialize thread creation attributes 79 pthread_attr_t attr; 80 if (pthread_attr_init(&attr) != 0) { 81 Err("pthread_attr_init error\n"); 82 } 83 84 if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) { 85 Err("pthread_attr_setstacksize error\n"); 86 } 87 88 if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0) 89 #else // NRT_STACK_SIZE 90 if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0) 91 #endif // NRT_STACK_SIZE 92 { 93 Err("pthread_create error\n"); 94 // return -1; 95 } 98 96 #ifdef NRT_STACK_SIZE 99 if(pthread_attr_destroy(&attr) != 0)100 {101 Err("pthread_attr_destroy error\n");102 } 103 #endif 104 105 #endif//__XENO__ 106 int timeout=100;107 if(UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int))!=0)Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");108 109 110 if(UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool))!=0) Err("UDT::setsockopt error (UDT_SNDSYN)\n");111 112 if(UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool))!=0) Err("UDT::setsockopt error (UDT_RCVSYN)\n"); 113 //#endif //__XENO__ 114 115 Start();116 } 117 118 119 ui_com::~ui_com() 120 {121 //printf("destruction ui_com\n");97 if (pthread_attr_destroy(&attr) != 0) { 98 Err("pthread_attr_destroy error\n"); 99 } 100 #endif 101 102 #endif //__XENO__ 103 int timeout = 100; 104 if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0) 105 Err("UDT::setsockopt error (UDT_RCVTIMEO)\n"); 106 107 bool blocking = true; 108 if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0) 109 Err("UDT::setsockopt error (UDT_SNDSYN)\n"); 110 111 if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0) 112 Err("UDT::setsockopt error (UDT_RCVSYN)\n"); 113 //#endif //__XENO__ 114 115 Start(); 116 } 117 118 ui_com::~ui_com() { 119 // printf("destruction ui_com\n"); 122 120 123 121 #ifdef __XENO__ 124 is_running=false; 125 126 pthread_join(thread, NULL); 127 128 int status=rt_pipe_delete(&pipe); 129 if(status!=0) Err("rt_pipe_delete error (%s)\n",strerror(-status)); 130 #endif 131 132 SafeStop(); 133 134 if(IsSuspended()==true) Resume(); 135 136 Join(); 137 138 if(send_buffer!=NULL) free(send_buffer); 139 send_buffer=NULL; 140 141 //printf("destruction ui_com ok\n"); 142 } 143 144 void ui_com::Send(char* buf,ssize_t size) { 145 if(connection_lost==true) return; 146 147 char* tosend=buf; 148 ssize_t nb_write; 149 150 if(buf[0]==XML_HEADER) { 151 //cut xml header 152 tosend=strstr(buf,"<root"); 153 size-=tosend-buf; 154 } 122 is_running = false; 123 124 pthread_join(thread, NULL); 125 126 int status = rt_pipe_delete(&pipe); 127 if (status != 0) 128 Err("rt_pipe_delete error (%s)\n", strerror(-status)); 129 #endif 130 131 SafeStop(); 132 133 if (IsSuspended() == true) 134 Resume(); 135 136 Join(); 137 138 if (send_buffer != NULL) 139 free(send_buffer); 140 send_buffer = NULL; 141 142 // printf("destruction ui_com ok\n"); 143 } 144 145 void ui_com::Send(char *buf, ssize_t size) { 146 if (connection_lost == true) 147 return; 148 149 char *tosend = buf; 150 ssize_t nb_write; 151 152 if (buf[0] == XML_HEADER) { 153 // cut xml header 154 tosend = strstr(buf, "<root"); 155 size -= tosend - buf; 156 } 155 157 156 158 #ifdef __XENO__ 157 nb_write=rt_pipe_write(&pipe,tosend,size,P_NORMAL); 158 159 if(nb_write<0) 160 { 161 Err("rt_pipe_write error (%s)\n",strerror(-nb_write)); 162 } 163 else if (nb_write != size) 164 { 165 Err("rt_pipe_write error %i/%i\n",nb_write,size); 166 } 159 nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL); 160 161 if (nb_write < 0) { 162 Err("rt_pipe_write error (%s)\n", strerror(-nb_write)); 163 } else if (nb_write != size) { 164 Err("rt_pipe_write error %i/%i\n", nb_write, size); 165 } 167 166 #else //__XENO__ 168 167 169 168 #ifdef COMPRESS_FRAMES 170 bool sendItCompressed=false; 171 if(buf[0]==XML_HEADER) { 172 sendItCompressed=true; 173 } 174 175 if(sendItCompressed) { 176 char *out; 177 ssize_t out_size; 178 if(compressBuffer(tosend, size,&out,&out_size, 9)==Z_OK) { 179 size=out_size; 180 nb_write = UDT::sendmsg(socket_fd,out,size, -1, true); 181 free(out); 169 bool sendItCompressed = false; 170 if (buf[0] == XML_HEADER) { 171 sendItCompressed = true; 172 } 173 174 if (sendItCompressed) { 175 char *out; 176 ssize_t out_size; 177 if (compressBuffer(tosend, size, &out, &out_size, 9) == Z_OK) { 178 size = out_size; 179 nb_write = UDT::sendmsg(socket_fd, out, size, -1, true); 180 free(out); 181 } else { 182 Warn("Compress error, sending it uncompressed\n"); 183 sendItCompressed = false; 184 } 185 } 186 187 if (!sendItCompressed) { 188 nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true); 189 } 190 191 #else // COMPRESS_FRAMES 192 nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true); 193 #endif // COMPRESS_FRAMES 194 // Printf("write %i %i\n",nb_write,size); 195 if (nb_write < 0) { 196 Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage()); 197 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST || 198 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) { 199 connection_lost = true; 200 } 201 } else if (nb_write != size) { 202 Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(), 203 UDT::getlasterror().getErrorCode(), nb_write, size); 204 } 205 #endif //__XENO__ 206 } 207 208 ssize_t ui_com::Receive(char *buf, ssize_t buf_size) { 209 ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size); 210 211 if (bytesRead < 0) { 212 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) { 213 Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage()); 214 connection_lost = true; 215 } 216 } 217 218 return bytesRead; 219 } 220 221 void ui_com::Run(void) { 222 // check endianness 223 char header; 224 if (IsBigEndian()) { 225 header = DATAS_BIG_ENDIAN; 226 Printf("System is big endian\n"); 227 } else { 228 header = DATAS_LITTLE_ENDIAN; 229 Printf("System is little endian\n"); 230 } 231 232 #ifdef __XENO__ 233 WarnUponSwitches(true); 234 printf("\n"); // a revoir pourquoi?? 235 // sans ce printf, dans le simu_roll, le suspend ne fonctionne pas... 236 #endif 237 // on attend d'avoir des choses à faire 238 Suspend(); 239 240 while (!ToBeStopped()) { 241 size_t resume_id; 242 Time min = 0xffffffffffffffffULL; 243 244 // on recpuere l'id de la prochaine execution 245 send_mutex->GetMutex(); 246 resume_id = resume_time.size(); 247 for (size_t i = 0; i < resume_time.size(); i++) { 248 if (resume_time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) { 249 min = resume_time.at(i); 250 resume_id = i; 251 } 252 } 253 254 // attente 255 if (resume_id < resume_time.size()) { 256 Time time = resume_time.at(resume_id); 257 uint16_t resume_period = data_to_send.at(resume_id)->SendPeriod(); 258 send_mutex->ReleaseMutex(); 259 // on dort jusqu'a la prochaine execution 260 SleepUntil(time); 261 262 // envoi des donnees 263 int offset = 3; 264 send_buffer[0] = header; 265 send_mutex->GetMutex(); 266 267 for (size_t i = 0; i < data_to_send.size(); i++) { 268 if (data_to_send.at(i)->SendPeriod() == resume_period && 269 data_to_send.at(i)->IsEnabled() == true) { 270 data_to_send.at(i)->CopyDatas(send_buffer + offset); 271 offset += data_to_send.at(i)->SendSize(); 272 } 273 } 274 if (offset != 3) { 275 memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t)); 276 // printf("send %i %i %i %x 277 // %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]); 278 // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]); 279 // printf("\n"); 280 Send(send_buffer, offset); 281 } 282 283 // on planifie la prochaine execution 284 for (size_t i = 0; i < data_to_send.size(); i++) { 285 if (data_to_send.at(i)->SendPeriod() == resume_period) { 286 resume_time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000; 287 } 288 } 289 send_mutex->ReleaseMutex(); 290 // Printf("%i %lld\n",resume_period,GetTime()/1000000); 291 } else { 292 send_mutex->ReleaseMutex(); 293 // rien a faire, suspend 294 // Printf("rien a faire suspend\n"); 295 Suspend(); 296 // Printf("wake\n"); 297 // on planifie la prochaine execution 298 Time time = GetTime(); 299 send_mutex->GetMutex(); 300 for (size_t i = 0; i < data_to_send.size(); i++) { 301 resume_time.at(i) = 302 time + (Time)data_to_send.at(i)->SendPeriod() * 1000000; 303 } 304 send_mutex->ReleaseMutex(); 305 } 306 } 307 } 308 #ifdef __XENO__ 309 void *ui_com::user_thread(void *arg) { 310 int pipe_fd = -1; 311 string devname; 312 char *buf = NULL; 313 ui_com *caller = (ui_com *)arg; 314 int rv; 315 fd_set set; 316 struct timeval timeout; 317 ssize_t nb_read, nb_write; 318 319 buf = (char *)malloc(NRT_PIPE_SIZE); 320 if (buf == NULL) { 321 caller->Err("malloc error\n"); 322 } 323 324 devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" + 325 caller->ObjectName() + "-pipe"; 326 while (pipe_fd < 0) { 327 pipe_fd = open(devname.c_str(), O_RDWR); 328 if (pipe_fd < 0 && errno != ENOENT) 329 caller->Err("open pipe_fd error (%s)\n", strerror(-errno)); 330 usleep(1000); 331 } 332 333 while (1) { 334 FD_ZERO(&set); // clear the set 335 FD_SET(pipe_fd, &set); // add our file descriptor to the set 336 337 timeout.tv_sec = 0; 338 timeout.tv_usec = SELECT_TIMEOUT_MS * 1000; 339 340 rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout); 341 342 if (rv == -1) { 343 if (caller->is_running == false && 344 UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT) 345 break; // timeout 346 if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT) 347 caller->Err("epoll_wait, %s, code %i\n", 348 UDT::getlasterror().getErrorMessage(), 349 UDT::getlasterror().getErrorCode()); 350 } else if (rv == 0) { 351 // printf("timeout\n"); // a timeout occured 352 if (caller->is_running == false) 353 break; 354 355 } else { 356 nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE); 357 buf[nb_read] = 0; 358 // printf("envoi\n%s\n",buf); 359 360 #ifdef COMPRESS_FRAMES 361 char *out; 362 ssize_t out_size; 363 364 if (buf[0] == XML_HEADER) { 365 if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) { 366 nb_read = out_size; 367 nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true); 368 free(out); 182 369 } else { 183 184 sendItCompressed=false;370 caller->Warn("Compress error, sending it uncompressed\n"); 371 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true); 185 372 } 186 } 187 188 if(!sendItCompressed) { 189 nb_write = UDT::sendmsg(socket_fd,tosend,size, -1, true); 190 } 191 192 #else //COMPRESS_FRAMES 193 nb_write = UDT::sendmsg(socket_fd,tosend,size, -1, true); 194 #endif //COMPRESS_FRAMES 195 //Printf("write %i %i\n",nb_write,size); 196 if(nb_write<0) { 197 Err("UDT::sendmsg error (%s)\n",UDT::getlasterror().getErrorMessage()); 198 if(UDT::getlasterror().getErrorCode()==CUDTException::ECONNLOST || UDT::getlasterror().getErrorCode()==CUDTException::EINVSOCK) { 199 connection_lost=true; 373 } else { 374 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true); 375 } 376 #else 377 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true); 378 #endif 379 380 if (nb_write < 0) { 381 caller->Err("UDT::sendmsg error (%s)\n", 382 UDT::getlasterror().getErrorMessage()); 383 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST || 384 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) { 385 caller->connection_lost = true; 200 386 } 201 } else if (nb_write != size) { 202 Err("%s, code %i (%ld/%ld)\n",UDT::getlasterror().getErrorMessage(),UDT::getlasterror().getErrorCode(),nb_write,size); 203 } 204 #endif //__XENO__ 205 } 206 207 ssize_t ui_com::Receive(char* buf,ssize_t buf_size) { 208 ssize_t bytesRead= UDT::recvmsg(socket_fd,buf,buf_size); 209 210 if(bytesRead<0) { 211 if(UDT::getlasterror().getErrorCode()==CUDTException::ECONNLOST) { 212 Err("UDT::recvmsg error (%s)\n",UDT::getlasterror().getErrorMessage()); 213 connection_lost=true; 214 } 215 } 216 217 return bytesRead; 218 } 219 220 void ui_com::Run(void) 221 { 222 //check endianness 223 char header; 224 if(IsBigEndian()) 225 { 226 header=DATAS_BIG_ENDIAN; 227 Printf("System is big endian\n"); 228 } 229 else 230 { 231 header=DATAS_LITTLE_ENDIAN; 232 Printf("System is little endian\n"); 233 } 234 235 #ifdef __XENO__ 236 WarnUponSwitches(true); 237 printf("\n");//a revoir pourquoi?? 238 //sans ce printf, dans le simu_roll, le suspend ne fonctionne pas... 239 #endif 240 //on attend d'avoir des choses à faire 241 Suspend(); 242 243 while(!ToBeStopped()) 244 { 245 size_t resume_id; 246 Time min=0xffffffffffffffffULL; 247 248 //on recpuere l'id de la prochaine execution 249 send_mutex->GetMutex(); 250 resume_id=resume_time.size(); 251 for(size_t i=0;i<resume_time.size();i++) 252 { 253 if(resume_time.at(i)<min && data_to_send.at(i)->IsEnabled()==true) 254 { 255 min=resume_time.at(i); 256 resume_id=i; 257 } 258 } 259 260 //attente 261 if(resume_id<resume_time.size()) 262 { 263 Time time=resume_time.at(resume_id); 264 uint16_t resume_period=data_to_send.at(resume_id)->SendPeriod(); 265 send_mutex->ReleaseMutex(); 266 //on dort jusqu'a la prochaine execution 267 SleepUntil(time); 268 269 //envoi des donnees 270 int offset=3; 271 send_buffer[0]=header; 272 send_mutex->GetMutex(); 273 274 for(size_t i=0;i<data_to_send.size();i++) { 275 if(data_to_send.at(i)->SendPeriod()==resume_period && data_to_send.at(i)->IsEnabled()==true) { 276 data_to_send.at(i)->CopyDatas(send_buffer+offset); 277 offset+=data_to_send.at(i)->SendSize(); 278 } 279 } 280 if(offset!=3) { 281 memcpy(&send_buffer[1],&resume_period,sizeof(uint16_t)); 282 //printf("send %i %i %i %x %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]); 283 //for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]); 284 //printf("\n"); 285 Send(send_buffer,offset); 286 } 287 288 //on planifie la prochaine execution 289 for(size_t i=0;i<data_to_send.size();i++) 290 { 291 if(data_to_send.at(i)->SendPeriod()==resume_period) 292 { 293 resume_time.at(i)+=data_to_send.at(i)->SendPeriod()*1000000; 294 } 295 } 296 send_mutex->ReleaseMutex(); 297 //Printf("%i %lld\n",resume_period,GetTime()/1000000); 298 } 299 else 300 { 301 send_mutex->ReleaseMutex(); 302 //rien a faire, suspend 303 //Printf("rien a faire suspend\n"); 304 Suspend(); 305 //Printf("wake\n"); 306 //on planifie la prochaine execution 307 Time time=GetTime(); 308 send_mutex->GetMutex(); 309 for(size_t i=0;i<data_to_send.size();i++) 310 { 311 resume_time.at(i)=time+(Time)data_to_send.at(i)->SendPeriod()*1000000; 312 } 313 send_mutex->ReleaseMutex(); 314 } 315 } 316 } 317 #ifdef __XENO__ 318 void* ui_com::user_thread(void * arg) 319 { 320 int pipe_fd=-1; 321 string devname; 322 char* buf=NULL; 323 ui_com *caller = (ui_com*)arg; 324 int rv; 325 fd_set set; 326 struct timeval timeout; 327 ssize_t nb_read,nb_write; 328 329 buf=(char*)malloc(NRT_PIPE_SIZE); 330 if(buf==NULL) 331 { 332 caller->Err("malloc error\n"); 333 } 334 335 devname= NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" + caller->ObjectName()+ "-pipe"; 336 while(pipe_fd<0) 337 { 338 pipe_fd = open(devname.c_str(), O_RDWR); 339 if (pipe_fd < 0 && errno!=ENOENT) caller->Err("open pipe_fd error (%s)\n",strerror(-errno)); 340 usleep(1000); 341 } 342 343 while(1) 344 { 345 FD_ZERO(&set); // clear the set 346 FD_SET(pipe_fd, &set); // add our file descriptor to the set 347 348 timeout.tv_sec = 0; 349 timeout.tv_usec = SELECT_TIMEOUT_MS*1000; 350 351 rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout); 352 353 if(rv == -1) 354 { 355 if(caller->is_running==false && UDT::getlasterror().getErrorCode()==CUDTException::ETIMEOUT) break;//timeout 356 if(UDT::getlasterror().getErrorCode()!=CUDTException::ETIMEOUT) caller->Err("epoll_wait, %s, code %i\n",UDT::getlasterror().getErrorMessage(),UDT::getlasterror().getErrorCode()); 357 } 358 else if(rv == 0) 359 { 360 //printf("timeout\n"); // a timeout occured 361 if(caller->is_running==false) break; 362 363 } 364 else 365 { 366 nb_read=read(pipe_fd,buf,NRT_PIPE_SIZE); 367 buf[nb_read]=0; 368 //printf("envoi\n%s\n",buf); 369 370 #ifdef COMPRESS_FRAMES 371 char *out; 372 ssize_t out_size; 373 374 if(buf[0]==XML_HEADER) { 375 if(compressBuffer(buf, nb_read,&out,&out_size, 9)==Z_OK) { 376 nb_read=out_size; 377 nb_write = UDT::sendmsg(caller->socket_fd,out,out_size, -1, true); 378 free(out); 379 } else { 380 caller->Warn("Compress error, sending it uncompressed\n"); 381 nb_write = UDT::sendmsg(caller->socket_fd, buf,nb_read, -1, true); 382 } 383 } else { 384 nb_write = UDT::sendmsg(caller->socket_fd, buf,nb_read, -1, true); 385 } 386 #else 387 nb_write = UDT::sendmsg(caller->socket_fd, buf,nb_read, -1, true); 388 #endif 389 390 if(nb_write<0) 391 { 392 caller->Err("UDT::sendmsg error (%s)\n",UDT::getlasterror().getErrorMessage()); 393 if(UDT::getlasterror().getErrorCode()==CUDTException::ECONNLOST || UDT::getlasterror().getErrorCode()==CUDTException::EINVSOCK) 394 { 395 caller->connection_lost=true; 396 } 397 } 398 else if (nb_write != nb_read) 399 { 400 caller->Err("UDT::sendmsg error %i/%i\n",nb_write,nb_read); 401 } 402 } 403 } 404 405 close(pipe_fd); 406 if(buf!=NULL) free(buf); 407 pthread_exit(0); 408 } 409 #endif 410 411 int ui_com::compressBuffer(char *in, ssize_t in_size,char **out,ssize_t *out_size, int level) 412 { 413 int ret, flush; 414 unsigned have; 415 z_stream strm; 416 417 /* allocate deflate state */ 418 strm.zalloc = Z_NULL; 419 strm.zfree = Z_NULL; 420 strm.opaque = Z_NULL; 421 ret = deflateInit(&strm, level); 422 if (ret != Z_OK) 423 return ret; 424 425 *out=(char*)malloc(COMPRESS_CHUNK); 426 if(!(*out)) 427 return Z_BUF_ERROR; 428 429 strm.next_in = (unsigned char*)in; 430 strm.avail_out = COMPRESS_CHUNK; 431 strm.next_out = (unsigned char*)*out; 432 strm.avail_in=in_size; 433 flush=Z_FINISH; 434 435 ret = deflate(&strm, flush); /* no bad return value */ 436 if (ret == Z_STREAM_ERROR) { 437 free(*out); 438 return ret; 439 } 440 441 have = COMPRESS_CHUNK - strm.avail_out; 442 *out_size=have; 443 //printf("%i -> %i\n",in_size,have); 444 /* clean up and return */ 445 (void)deflateEnd(&strm); 446 447 if(strm.avail_out!=0) { 448 return Z_OK; 449 } else { 450 return Z_STREAM_ERROR; 451 } 452 } 453 454 int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,unsigned char **out,ssize_t *out_size) 455 { 456 int ret; 457 //unsigned have; 458 z_stream strm; 459 460 /* allocate inflate state */ 461 strm.zalloc = Z_NULL; 462 strm.zfree = Z_NULL; 463 strm.opaque = Z_NULL; 464 strm.avail_in = 0; 465 strm.next_in = Z_NULL; 466 ret = inflateInit(&strm); 467 if (ret != Z_OK) 468 return ret; 469 470 *out=(unsigned char*)malloc(COMPRESS_CHUNK); 471 if(!(*out)) 472 return Z_BUF_ERROR; 473 474 strm.avail_in = in_size; 475 strm.next_in = in; 476 strm.avail_out = COMPRESS_CHUNK; 477 strm.next_out = *out; 478 479 ret = inflate(&strm, Z_NO_FLUSH); 480 assert(ret != Z_STREAM_ERROR); /* state not clobbered */ 481 switch (ret) { 482 case Z_NEED_DICT: 483 ret = Z_DATA_ERROR; /* and fall through */ 484 case Z_DATA_ERROR: 485 case Z_MEM_ERROR: 486 (void)inflateEnd(&strm); 487 return ret; 488 } 489 //have = COMPRESS_CHUNK - strm.avail_out; 490 491 /* clean up and return */ 387 } else if (nb_write != nb_read) { 388 caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read); 389 } 390 } 391 } 392 393 close(pipe_fd); 394 if (buf != NULL) 395 free(buf); 396 pthread_exit(0); 397 } 398 #endif 399 400 int ui_com::compressBuffer(char *in, ssize_t in_size, char **out, 401 ssize_t *out_size, int level) { 402 int ret, flush; 403 unsigned have; 404 z_stream strm; 405 406 /* allocate deflate state */ 407 strm.zalloc = Z_NULL; 408 strm.zfree = Z_NULL; 409 strm.opaque = Z_NULL; 410 ret = deflateInit(&strm, level); 411 if (ret != Z_OK) 412 return ret; 413 414 *out = (char *)malloc(COMPRESS_CHUNK); 415 if (!(*out)) 416 return Z_BUF_ERROR; 417 418 strm.next_in = (unsigned char *)in; 419 strm.avail_out = COMPRESS_CHUNK; 420 strm.next_out = (unsigned char *)*out; 421 strm.avail_in = in_size; 422 flush = Z_FINISH; 423 424 ret = deflate(&strm, flush); /* no bad return value */ 425 if (ret == Z_STREAM_ERROR) { 426 free(*out); 427 return ret; 428 } 429 430 have = COMPRESS_CHUNK - strm.avail_out; 431 *out_size = have; 432 // printf("%i -> %i\n",in_size,have); 433 /* clean up and return */ 434 (void)deflateEnd(&strm); 435 436 if (strm.avail_out != 0) { 437 return Z_OK; 438 } else { 439 return Z_STREAM_ERROR; 440 } 441 } 442 443 int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size, 444 unsigned char **out, ssize_t *out_size) { 445 int ret; 446 // unsigned have; 447 z_stream strm; 448 449 /* allocate inflate state */ 450 strm.zalloc = Z_NULL; 451 strm.zfree = Z_NULL; 452 strm.opaque = Z_NULL; 453 strm.avail_in = 0; 454 strm.next_in = Z_NULL; 455 ret = inflateInit(&strm); 456 if (ret != Z_OK) 457 return ret; 458 459 *out = (unsigned char *)malloc(COMPRESS_CHUNK); 460 if (!(*out)) 461 return Z_BUF_ERROR; 462 463 strm.avail_in = in_size; 464 strm.next_in = in; 465 strm.avail_out = COMPRESS_CHUNK; 466 strm.next_out = *out; 467 468 ret = inflate(&strm, Z_NO_FLUSH); 469 assert(ret != Z_STREAM_ERROR); /* state not clobbered */ 470 switch (ret) { 471 case Z_NEED_DICT: 472 ret = Z_DATA_ERROR; /* and fall through */ 473 case Z_DATA_ERROR: 474 case Z_MEM_ERROR: 492 475 (void)inflateEnd(&strm); 493 return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR; 494 } 495 496 bool ui_com::ConnectionLost() 497 { 498 return connection_lost; 499 } 500 501 void ui_com::AddSendData(const SendData *obj) 502 { 503 send_mutex->GetMutex(); 504 505 resume_time.push_back(GetTime()+(Time)obj->SendPeriod()*1000000); 506 507 //on resume en meme temps tout ceux qui ont la meme periode 508 for(size_t i=0;i<data_to_send.size();i++) 509 { 510 if(data_to_send.at(i)->SendPeriod()==obj->SendPeriod()) 511 { 512 resume_time.at(resume_time.size()-1)=resume_time.at(i); 513 break; 514 } 515 } 516 517 data_to_send.push_back(obj); 518 519 send_mutex->ReleaseMutex(); 520 } 521 522 void ui_com::UpdateSendData(const SendData *obj) 523 { 524 //le mutex est deja pris par l'appellant 525 size_t id,i; 526 527 //on recupere l'id 528 for(i=0;i<data_to_send.size();i++) 529 { 530 if(data_to_send.at(i)==obj) 531 { 532 id=i; 533 break; 534 } 535 } 536 537 //on resume en meme temps tout ceux qui ont la meme periode 538 for(i=0;i<data_to_send.size();i++) 539 { 540 if(i==id) continue; 541 if(data_to_send.at(i)->IsEnabled()==true && data_to_send.at(i)->SendPeriod()==obj->SendPeriod()) 542 { 543 resume_time.at(id)=resume_time.at(i); 544 break; 545 } 546 } 547 548 //si aucun match, on planifie l'execution 549 if(i==data_to_send.size()) resume_time.at(id)=GetTime()+(Time)obj->SendPeriod()*1000000; 550 551 if(IsSuspended()==true) Resume(); 552 553 return; 554 } 555 556 void ui_com::UpdateDataToSendSize(void) 557 { 558 send_mutex->GetMutex(); 559 send_size=3;//id(1)+period(2) 560 for(size_t i=0;i<data_to_send.size();i++) 561 { 562 if(data_to_send[i]!=NULL) send_size+=data_to_send[i]->SendSize(); 563 } 564 565 //send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char)); 566 if(send_buffer!=NULL) free(send_buffer); 567 send_buffer=(char*)malloc(send_size*sizeof(char)); 568 send_mutex->ReleaseMutex(); 569 } 570 571 void ui_com::RemoveSendData(const SendData *obj) 572 { 573 //printf("remove_data_to_send %i\n",data_to_send.size()); 574 575 send_mutex->GetMutex(); 576 //on recupere l'id 577 for(size_t i=0;i<data_to_send.size();i++) 578 { 579 if(data_to_send.at(i)==obj) 580 { 581 data_to_send.erase (data_to_send.begin()+i); 582 resume_time.erase (resume_time.begin()+i); 583 //printf("remove_data_to_send %i ok\n",data_to_send.size()); 584 break; 585 } 586 } 587 send_mutex->ReleaseMutex(); 588 589 return; 590 } 591 592 void ui_com::Block(void) 593 { 594 send_mutex->GetMutex(); 595 } 596 597 void ui_com::UnBlock(void) 598 { 599 send_mutex->ReleaseMutex(); 600 } 476 return ret; 477 } 478 // have = COMPRESS_CHUNK - strm.avail_out; 479 480 /* clean up and return */ 481 (void)inflateEnd(&strm); 482 return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR; 483 } 484 485 bool ui_com::ConnectionLost() { return connection_lost; } 486 487 void ui_com::AddSendData(const SendData *obj) { 488 send_mutex->GetMutex(); 489 490 resume_time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000); 491 492 // on resume en meme temps tout ceux qui ont la meme periode 493 for (size_t i = 0; i < data_to_send.size(); i++) { 494 if (data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) { 495 resume_time.at(resume_time.size() - 1) = resume_time.at(i); 496 break; 497 } 498 } 499 500 data_to_send.push_back(obj); 501 502 send_mutex->ReleaseMutex(); 503 } 504 505 void ui_com::UpdateSendData(const SendData *obj) { 506 // le mutex est deja pris par l'appellant 507 size_t id, i; 508 509 // on recupere l'id 510 for (i = 0; i < data_to_send.size(); i++) { 511 if (data_to_send.at(i) == obj) { 512 id = i; 513 break; 514 } 515 } 516 517 // on resume en meme temps tout ceux qui ont la meme periode 518 for (i = 0; i < data_to_send.size(); i++) { 519 if (i == id) 520 continue; 521 if (data_to_send.at(i)->IsEnabled() == true && 522 data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) { 523 resume_time.at(id) = resume_time.at(i); 524 break; 525 } 526 } 527 528 // si aucun match, on planifie l'execution 529 if (i == data_to_send.size()) 530 resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000; 531 532 if (IsSuspended() == true) 533 Resume(); 534 535 return; 536 } 537 538 void ui_com::UpdateDataToSendSize(void) { 539 send_mutex->GetMutex(); 540 send_size = 3; // id(1)+period(2) 541 for (size_t i = 0; i < data_to_send.size(); i++) { 542 if (data_to_send[i] != NULL) 543 send_size += data_to_send[i]->SendSize(); 544 } 545 546 // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char)); 547 if (send_buffer != NULL) 548 free(send_buffer); 549 send_buffer = (char *)malloc(send_size * sizeof(char)); 550 send_mutex->ReleaseMutex(); 551 } 552 553 void ui_com::RemoveSendData(const SendData *obj) { 554 // printf("remove_data_to_send %i\n",data_to_send.size()); 555 556 send_mutex->GetMutex(); 557 // on recupere l'id 558 for (size_t i = 0; i < data_to_send.size(); i++) { 559 if (data_to_send.at(i) == obj) { 560 data_to_send.erase(data_to_send.begin() + i); 561 resume_time.erase(resume_time.begin() + i); 562 // printf("remove_data_to_send %i ok\n",data_to_send.size()); 563 break; 564 } 565 } 566 send_mutex->ReleaseMutex(); 567 568 return; 569 } 570 571 void ui_com::Block(void) { send_mutex->GetMutex(); } 572 573 void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note:
See TracChangeset
for help on using the changeset viewer.