source: flair-src/trunk/lib/FlairCore/src/ui_com.cpp@ 45

Last change on this file since 45 was 15, checked in by Bayard Gildas, 8 years ago

sources reformatted with flair-format-dir script

File size: 15.0 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5// created: 2011/05/01
6// filename: ui_com.cpp
7//
8// author: Guillaume Sanahuja
9// Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11// version: $Id: $
12//
13// purpose: classe permettant la lecture et l'ecriture sur socket UDT
14//
15//
16/*********************************************************************/
17
18#include "ui_com.h"
19#include <cstdlib>
20#include <string.h>
21#include <unistd.h>
22#include "Mutex.h"
23#include "SendData.h"
24#include "communication.h"
25#include "FrameworkManager.h"
26#include "zlib.h"
27#include <assert.h>
28#include "config.h"
29
30#ifdef __XENO__
31#include <pthread.h>
32#include <native/task.h>
33#include <native/task.h>
34#include <fcntl.h>
35#endif
36
37using std::string;
38using namespace flair::core;
39using namespace flair::gui;
40
41ui_com::ui_com(const Object *parent, UDTSOCKET sock)
42 : Thread(parent, "send", 2) {
43 // buffer envoi
44 send_buffer = (char *)malloc(3);
45 send_size = 3; // id(1)+period(2)
46
47 // mutex
48 send_mutex = NULL;
49 send_mutex = new Mutex(this, ObjectName());
50
51 socket_fd = sock;
52 connection_lost = false;
53
54#ifdef __XENO__
55 int status;
56 string tmp_name;
57
58 is_running = true;
59
60 // pipe
61 tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
62 // xenomai limitation
63 if (tmp_name.size() > 31)
64 Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
65#ifdef RT_PIPE_SIZE
66 status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
67#else
68 status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
69#endif
70
71 if (status != 0) {
72 Err("rt_pipe_create error (%s)\n", strerror(-status));
73 // return -1;
74 }
75
76// start user side thread
77#ifdef NRT_STACK_SIZE
78 // Initialize thread creation attributes
79 pthread_attr_t attr;
80 if (pthread_attr_init(&attr) != 0) {
81 Err("pthread_attr_init error\n");
82 }
83
84 if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
85 Err("pthread_attr_setstacksize error\n");
86 }
87
88 if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
89#else // NRT_STACK_SIZE
90 if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
91#endif // NRT_STACK_SIZE
92 {
93 Err("pthread_create error\n");
94 // return -1;
95 }
96#ifdef NRT_STACK_SIZE
97 if (pthread_attr_destroy(&attr) != 0) {
98 Err("pthread_attr_destroy error\n");
99 }
100#endif
101
102#endif //__XENO__
103 int timeout = 100;
104 if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
105 Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
106
107 bool blocking = true;
108 if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
109 Err("UDT::setsockopt error (UDT_SNDSYN)\n");
110
111 if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
112 Err("UDT::setsockopt error (UDT_RCVSYN)\n");
113 //#endif //__XENO__
114
115 Start();
116}
117
118ui_com::~ui_com() {
119// printf("destruction ui_com\n");
120
121#ifdef __XENO__
122 is_running = false;
123
124 pthread_join(thread, NULL);
125
126 int status = rt_pipe_delete(&pipe);
127 if (status != 0)
128 Err("rt_pipe_delete error (%s)\n", strerror(-status));
129#endif
130
131 SafeStop();
132
133 if (IsSuspended() == true)
134 Resume();
135
136 Join();
137
138 if (send_buffer != NULL)
139 free(send_buffer);
140 send_buffer = NULL;
141
142 // printf("destruction ui_com ok\n");
143}
144
145void ui_com::Send(char *buf, ssize_t size) {
146 if (connection_lost == true)
147 return;
148
149 char *tosend = buf;
150 ssize_t nb_write;
151
152 if (buf[0] == XML_HEADER) {
153 // cut xml header
154 tosend = strstr(buf, "<root");
155 size -= tosend - buf;
156 }
157
158#ifdef __XENO__
159 nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
160
161 if (nb_write < 0) {
162 Err("rt_pipe_write error (%s)\n", strerror(-nb_write));
163 } else if (nb_write != size) {
164 Err("rt_pipe_write error %i/%i\n", nb_write, size);
165 }
166#else //__XENO__
167
168#ifdef COMPRESS_FRAMES
169 bool sendItCompressed = false;
170 if (buf[0] == XML_HEADER) {
171 sendItCompressed = true;
172 }
173
174 if (sendItCompressed) {
175 char *out;
176 ssize_t out_size;
177 if (compressBuffer(tosend, size, &out, &out_size, 9) == Z_OK) {
178 size = out_size;
179 nb_write = UDT::sendmsg(socket_fd, out, size, -1, true);
180 free(out);
181 } else {
182 Warn("Compress error, sending it uncompressed\n");
183 sendItCompressed = false;
184 }
185 }
186
187 if (!sendItCompressed) {
188 nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
189 }
190
191#else // COMPRESS_FRAMES
192 nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
193#endif // COMPRESS_FRAMES
194 // Printf("write %i %i\n",nb_write,size);
195 if (nb_write < 0) {
196 Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
197 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
198 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
199 connection_lost = true;
200 }
201 } else if (nb_write != size) {
202 Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
203 UDT::getlasterror().getErrorCode(), nb_write, size);
204 }
205#endif //__XENO__
206}
207
208ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
209 ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
210
211 if (bytesRead < 0) {
212 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
213 Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
214 connection_lost = true;
215 }
216 }
217
218 return bytesRead;
219}
220
221void ui_com::Run(void) {
222 // check endianness
223 char header;
224 if (IsBigEndian()) {
225 header = DATAS_BIG_ENDIAN;
226 Printf("System is big endian\n");
227 } else {
228 header = DATAS_LITTLE_ENDIAN;
229 Printf("System is little endian\n");
230 }
231
232#ifdef __XENO__
233 WarnUponSwitches(true);
234 printf("\n"); // a revoir pourquoi??
235// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
236#endif
237 // on attend d'avoir des choses à faire
238 Suspend();
239
240 while (!ToBeStopped()) {
241 size_t resume_id;
242 Time min = 0xffffffffffffffffULL;
243
244 // on recpuere l'id de la prochaine execution
245 send_mutex->GetMutex();
246 resume_id = resume_time.size();
247 for (size_t i = 0; i < resume_time.size(); i++) {
248 if (resume_time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) {
249 min = resume_time.at(i);
250 resume_id = i;
251 }
252 }
253
254 // attente
255 if (resume_id < resume_time.size()) {
256 Time time = resume_time.at(resume_id);
257 uint16_t resume_period = data_to_send.at(resume_id)->SendPeriod();
258 send_mutex->ReleaseMutex();
259 // on dort jusqu'a la prochaine execution
260 SleepUntil(time);
261
262 // envoi des donnees
263 int offset = 3;
264 send_buffer[0] = header;
265 send_mutex->GetMutex();
266
267 for (size_t i = 0; i < data_to_send.size(); i++) {
268 if (data_to_send.at(i)->SendPeriod() == resume_period &&
269 data_to_send.at(i)->IsEnabled() == true) {
270 data_to_send.at(i)->CopyDatas(send_buffer + offset);
271 offset += data_to_send.at(i)->SendSize();
272 }
273 }
274 if (offset != 3) {
275 memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t));
276 // printf("send %i %i %i %x
277 // %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
278 // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
279 // printf("\n");
280 Send(send_buffer, offset);
281 }
282
283 // on planifie la prochaine execution
284 for (size_t i = 0; i < data_to_send.size(); i++) {
285 if (data_to_send.at(i)->SendPeriod() == resume_period) {
286 resume_time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000;
287 }
288 }
289 send_mutex->ReleaseMutex();
290 // Printf("%i %lld\n",resume_period,GetTime()/1000000);
291 } else {
292 send_mutex->ReleaseMutex();
293 // rien a faire, suspend
294 // Printf("rien a faire suspend\n");
295 Suspend();
296 // Printf("wake\n");
297 // on planifie la prochaine execution
298 Time time = GetTime();
299 send_mutex->GetMutex();
300 for (size_t i = 0; i < data_to_send.size(); i++) {
301 resume_time.at(i) =
302 time + (Time)data_to_send.at(i)->SendPeriod() * 1000000;
303 }
304 send_mutex->ReleaseMutex();
305 }
306 }
307}
308#ifdef __XENO__
309void *ui_com::user_thread(void *arg) {
310 int pipe_fd = -1;
311 string devname;
312 char *buf = NULL;
313 ui_com *caller = (ui_com *)arg;
314 int rv;
315 fd_set set;
316 struct timeval timeout;
317 ssize_t nb_read, nb_write;
318
319 buf = (char *)malloc(NRT_PIPE_SIZE);
320 if (buf == NULL) {
321 caller->Err("malloc error\n");
322 }
323
324 devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
325 caller->ObjectName() + "-pipe";
326 while (pipe_fd < 0) {
327 pipe_fd = open(devname.c_str(), O_RDWR);
328 if (pipe_fd < 0 && errno != ENOENT)
329 caller->Err("open pipe_fd error (%s)\n", strerror(-errno));
330 usleep(1000);
331 }
332
333 while (1) {
334 FD_ZERO(&set); // clear the set
335 FD_SET(pipe_fd, &set); // add our file descriptor to the set
336
337 timeout.tv_sec = 0;
338 timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
339
340 rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
341
342 if (rv == -1) {
343 if (caller->is_running == false &&
344 UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
345 break; // timeout
346 if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
347 caller->Err("epoll_wait, %s, code %i\n",
348 UDT::getlasterror().getErrorMessage(),
349 UDT::getlasterror().getErrorCode());
350 } else if (rv == 0) {
351 // printf("timeout\n"); // a timeout occured
352 if (caller->is_running == false)
353 break;
354
355 } else {
356 nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
357 buf[nb_read] = 0;
358// printf("envoi\n%s\n",buf);
359
360#ifdef COMPRESS_FRAMES
361 char *out;
362 ssize_t out_size;
363
364 if (buf[0] == XML_HEADER) {
365 if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) {
366 nb_read = out_size;
367 nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true);
368 free(out);
369 } else {
370 caller->Warn("Compress error, sending it uncompressed\n");
371 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
372 }
373 } else {
374 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
375 }
376#else
377 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
378#endif
379
380 if (nb_write < 0) {
381 caller->Err("UDT::sendmsg error (%s)\n",
382 UDT::getlasterror().getErrorMessage());
383 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
384 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
385 caller->connection_lost = true;
386 }
387 } else if (nb_write != nb_read) {
388 caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read);
389 }
390 }
391 }
392
393 close(pipe_fd);
394 if (buf != NULL)
395 free(buf);
396 pthread_exit(0);
397}
398#endif
399
400int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
401 ssize_t *out_size, int level) {
402 int ret, flush;
403 unsigned have;
404 z_stream strm;
405
406 /* allocate deflate state */
407 strm.zalloc = Z_NULL;
408 strm.zfree = Z_NULL;
409 strm.opaque = Z_NULL;
410 ret = deflateInit(&strm, level);
411 if (ret != Z_OK)
412 return ret;
413
414 *out = (char *)malloc(COMPRESS_CHUNK);
415 if (!(*out))
416 return Z_BUF_ERROR;
417
418 strm.next_in = (unsigned char *)in;
419 strm.avail_out = COMPRESS_CHUNK;
420 strm.next_out = (unsigned char *)*out;
421 strm.avail_in = in_size;
422 flush = Z_FINISH;
423
424 ret = deflate(&strm, flush); /* no bad return value */
425 if (ret == Z_STREAM_ERROR) {
426 free(*out);
427 return ret;
428 }
429
430 have = COMPRESS_CHUNK - strm.avail_out;
431 *out_size = have;
432 // printf("%i -> %i\n",in_size,have);
433 /* clean up and return */
434 (void)deflateEnd(&strm);
435
436 if (strm.avail_out != 0) {
437 return Z_OK;
438 } else {
439 return Z_STREAM_ERROR;
440 }
441}
442
443int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
444 unsigned char **out, ssize_t *out_size) {
445 int ret;
446 // unsigned have;
447 z_stream strm;
448
449 /* allocate inflate state */
450 strm.zalloc = Z_NULL;
451 strm.zfree = Z_NULL;
452 strm.opaque = Z_NULL;
453 strm.avail_in = 0;
454 strm.next_in = Z_NULL;
455 ret = inflateInit(&strm);
456 if (ret != Z_OK)
457 return ret;
458
459 *out = (unsigned char *)malloc(COMPRESS_CHUNK);
460 if (!(*out))
461 return Z_BUF_ERROR;
462
463 strm.avail_in = in_size;
464 strm.next_in = in;
465 strm.avail_out = COMPRESS_CHUNK;
466 strm.next_out = *out;
467
468 ret = inflate(&strm, Z_NO_FLUSH);
469 assert(ret != Z_STREAM_ERROR); /* state not clobbered */
470 switch (ret) {
471 case Z_NEED_DICT:
472 ret = Z_DATA_ERROR; /* and fall through */
473 case Z_DATA_ERROR:
474 case Z_MEM_ERROR:
475 (void)inflateEnd(&strm);
476 return ret;
477 }
478 // have = COMPRESS_CHUNK - strm.avail_out;
479
480 /* clean up and return */
481 (void)inflateEnd(&strm);
482 return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
483}
484
485bool ui_com::ConnectionLost() { return connection_lost; }
486
487void ui_com::AddSendData(const SendData *obj) {
488 send_mutex->GetMutex();
489
490 resume_time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
491
492 // on resume en meme temps tout ceux qui ont la meme periode
493 for (size_t i = 0; i < data_to_send.size(); i++) {
494 if (data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
495 resume_time.at(resume_time.size() - 1) = resume_time.at(i);
496 break;
497 }
498 }
499
500 data_to_send.push_back(obj);
501
502 send_mutex->ReleaseMutex();
503}
504
505void ui_com::UpdateSendData(const SendData *obj) {
506 // le mutex est deja pris par l'appellant
507 size_t id, i;
508
509 // on recupere l'id
510 for (i = 0; i < data_to_send.size(); i++) {
511 if (data_to_send.at(i) == obj) {
512 id = i;
513 break;
514 }
515 }
516
517 // on resume en meme temps tout ceux qui ont la meme periode
518 for (i = 0; i < data_to_send.size(); i++) {
519 if (i == id)
520 continue;
521 if (data_to_send.at(i)->IsEnabled() == true &&
522 data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
523 resume_time.at(id) = resume_time.at(i);
524 break;
525 }
526 }
527
528 // si aucun match, on planifie l'execution
529 if (i == data_to_send.size())
530 resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
531
532 if (IsSuspended() == true)
533 Resume();
534
535 return;
536}
537
538void ui_com::UpdateDataToSendSize(void) {
539 send_mutex->GetMutex();
540 send_size = 3; // id(1)+period(2)
541 for (size_t i = 0; i < data_to_send.size(); i++) {
542 if (data_to_send[i] != NULL)
543 send_size += data_to_send[i]->SendSize();
544 }
545
546 // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
547 if (send_buffer != NULL)
548 free(send_buffer);
549 send_buffer = (char *)malloc(send_size * sizeof(char));
550 send_mutex->ReleaseMutex();
551}
552
553void ui_com::RemoveSendData(const SendData *obj) {
554 // printf("remove_data_to_send %i\n",data_to_send.size());
555
556 send_mutex->GetMutex();
557 // on recupere l'id
558 for (size_t i = 0; i < data_to_send.size(); i++) {
559 if (data_to_send.at(i) == obj) {
560 data_to_send.erase(data_to_send.begin() + i);
561 resume_time.erase(resume_time.begin() + i);
562 // printf("remove_data_to_send %i ok\n",data_to_send.size());
563 break;
564 }
565 }
566 send_mutex->ReleaseMutex();
567
568 return;
569}
570
571void ui_com::Block(void) { send_mutex->GetMutex(); }
572
573void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracBrowser for help on using the repository browser.