source: flair-src/trunk/lib/FlairCore/src/ui_com.cpp@ 116

Last change on this file since 116 was 15, checked in by Bayard Gildas, 8 years ago

sources reformatted with flair-format-dir script

File size: 15.0 KB
RevLine 
[2]1// %flair:license{
[15]2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
[2]4// %flair:license}
5// created: 2011/05/01
6// filename: ui_com.cpp
7//
8// author: Guillaume Sanahuja
9// Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11// version: $Id: $
12//
13// purpose: classe permettant la lecture et l'ecriture sur socket UDT
14//
15//
16/*********************************************************************/
17
18#include "ui_com.h"
19#include <cstdlib>
20#include <string.h>
21#include <unistd.h>
22#include "Mutex.h"
23#include "SendData.h"
24#include "communication.h"
25#include "FrameworkManager.h"
26#include "zlib.h"
27#include <assert.h>
28#include "config.h"
29
30#ifdef __XENO__
31#include <pthread.h>
32#include <native/task.h>
33#include <native/task.h>
34#include <fcntl.h>
35#endif
36
37using std::string;
38using namespace flair::core;
39using namespace flair::gui;
40
[15]41ui_com::ui_com(const Object *parent, UDTSOCKET sock)
42 : Thread(parent, "send", 2) {
43 // buffer envoi
44 send_buffer = (char *)malloc(3);
45 send_size = 3; // id(1)+period(2)
[2]46
[15]47 // mutex
48 send_mutex = NULL;
49 send_mutex = new Mutex(this, ObjectName());
[2]50
[15]51 socket_fd = sock;
52 connection_lost = false;
[2]53
54#ifdef __XENO__
[15]55 int status;
56 string tmp_name;
[2]57
[15]58 is_running = true;
[2]59
[15]60 // pipe
61 tmp_name = getFrameworkManager()->ObjectName() + "-" + ObjectName() + "-pipe";
62 // xenomai limitation
63 if (tmp_name.size() > 31)
64 Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
[2]65#ifdef RT_PIPE_SIZE
[15]66 status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
[2]67#else
[15]68 status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
[2]69#endif
70
[15]71 if (status != 0) {
72 Err("rt_pipe_create error (%s)\n", strerror(-status));
73 // return -1;
74 }
[2]75
[15]76// start user side thread
[2]77#ifdef NRT_STACK_SIZE
[15]78 // Initialize thread creation attributes
79 pthread_attr_t attr;
80 if (pthread_attr_init(&attr) != 0) {
81 Err("pthread_attr_init error\n");
82 }
[2]83
[15]84 if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
85 Err("pthread_attr_setstacksize error\n");
86 }
[2]87
[15]88 if (pthread_create(&thread, &attr, user_thread, (void *)this) < 0)
89#else // NRT_STACK_SIZE
90 if (pthread_create(&thread, NULL, user_thread, (void *)this) < 0)
91#endif // NRT_STACK_SIZE
92 {
93 Err("pthread_create error\n");
94 // return -1;
95 }
[2]96#ifdef NRT_STACK_SIZE
[15]97 if (pthread_attr_destroy(&attr) != 0) {
98 Err("pthread_attr_destroy error\n");
99 }
[2]100#endif
101
[15]102#endif //__XENO__
103 int timeout = 100;
104 if (UDT::setsockopt(socket_fd, 0, UDT_RCVTIMEO, &timeout, sizeof(int)) != 0)
105 Err("UDT::setsockopt error (UDT_RCVTIMEO)\n");
[2]106
[15]107 bool blocking = true;
108 if (UDT::setsockopt(socket_fd, 0, UDT_SNDSYN, &blocking, sizeof(bool)) != 0)
109 Err("UDT::setsockopt error (UDT_SNDSYN)\n");
[2]110
[15]111 if (UDT::setsockopt(socket_fd, 0, UDT_RCVSYN, &blocking, sizeof(bool)) != 0)
112 Err("UDT::setsockopt error (UDT_RCVSYN)\n");
113 //#endif //__XENO__
[2]114
[15]115 Start();
[2]116}
117
[15]118ui_com::~ui_com() {
119// printf("destruction ui_com\n");
[2]120
121#ifdef __XENO__
[15]122 is_running = false;
[2]123
[15]124 pthread_join(thread, NULL);
[2]125
[15]126 int status = rt_pipe_delete(&pipe);
127 if (status != 0)
128 Err("rt_pipe_delete error (%s)\n", strerror(-status));
[2]129#endif
130
[15]131 SafeStop();
[2]132
[15]133 if (IsSuspended() == true)
134 Resume();
[2]135
[15]136 Join();
[2]137
[15]138 if (send_buffer != NULL)
139 free(send_buffer);
140 send_buffer = NULL;
[2]141
[15]142 // printf("destruction ui_com ok\n");
[2]143}
144
[15]145void ui_com::Send(char *buf, ssize_t size) {
146 if (connection_lost == true)
147 return;
[2]148
[15]149 char *tosend = buf;
150 ssize_t nb_write;
[2]151
[15]152 if (buf[0] == XML_HEADER) {
153 // cut xml header
154 tosend = strstr(buf, "<root");
155 size -= tosend - buf;
156 }
[2]157
158#ifdef __XENO__
[15]159 nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);
[2]160
[15]161 if (nb_write < 0) {
162 Err("rt_pipe_write error (%s)\n", strerror(-nb_write));
163 } else if (nb_write != size) {
164 Err("rt_pipe_write error %i/%i\n", nb_write, size);
165 }
[2]166#else //__XENO__
167
168#ifdef COMPRESS_FRAMES
[15]169 bool sendItCompressed = false;
170 if (buf[0] == XML_HEADER) {
171 sendItCompressed = true;
172 }
[2]173
[15]174 if (sendItCompressed) {
175 char *out;
176 ssize_t out_size;
177 if (compressBuffer(tosend, size, &out, &out_size, 9) == Z_OK) {
178 size = out_size;
179 nb_write = UDT::sendmsg(socket_fd, out, size, -1, true);
180 free(out);
181 } else {
182 Warn("Compress error, sending it uncompressed\n");
183 sendItCompressed = false;
[2]184 }
[15]185 }
[2]186
[15]187 if (!sendItCompressed) {
188 nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
189 }
[2]190
[15]191#else // COMPRESS_FRAMES
192 nb_write = UDT::sendmsg(socket_fd, tosend, size, -1, true);
193#endif // COMPRESS_FRAMES
194 // Printf("write %i %i\n",nb_write,size);
195 if (nb_write < 0) {
196 Err("UDT::sendmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
197 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
198 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
199 connection_lost = true;
[2]200 }
[15]201 } else if (nb_write != size) {
202 Err("%s, code %i (%ld/%ld)\n", UDT::getlasterror().getErrorMessage(),
203 UDT::getlasterror().getErrorCode(), nb_write, size);
204 }
[2]205#endif //__XENO__
206}
207
[15]208ssize_t ui_com::Receive(char *buf, ssize_t buf_size) {
209 ssize_t bytesRead = UDT::recvmsg(socket_fd, buf, buf_size);
[2]210
[15]211 if (bytesRead < 0) {
212 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST) {
213 Err("UDT::recvmsg error (%s)\n", UDT::getlasterror().getErrorMessage());
214 connection_lost = true;
[2]215 }
[15]216 }
[2]217
[15]218 return bytesRead;
[2]219}
220
[15]221void ui_com::Run(void) {
222 // check endianness
223 char header;
224 if (IsBigEndian()) {
225 header = DATAS_BIG_ENDIAN;
226 Printf("System is big endian\n");
227 } else {
228 header = DATAS_LITTLE_ENDIAN;
229 Printf("System is little endian\n");
230 }
[2]231
232#ifdef __XENO__
[15]233 WarnUponSwitches(true);
234 printf("\n"); // a revoir pourquoi??
235// sans ce printf, dans le simu_roll, le suspend ne fonctionne pas...
[2]236#endif
[15]237 // on attend d'avoir des choses à faire
238 Suspend();
[2]239
[15]240 while (!ToBeStopped()) {
241 size_t resume_id;
242 Time min = 0xffffffffffffffffULL;
[2]243
[15]244 // on recpuere l'id de la prochaine execution
245 send_mutex->GetMutex();
246 resume_id = resume_time.size();
247 for (size_t i = 0; i < resume_time.size(); i++) {
248 if (resume_time.at(i) < min && data_to_send.at(i)->IsEnabled() == true) {
249 min = resume_time.at(i);
250 resume_id = i;
251 }
252 }
[2]253
[15]254 // attente
255 if (resume_id < resume_time.size()) {
256 Time time = resume_time.at(resume_id);
257 uint16_t resume_period = data_to_send.at(resume_id)->SendPeriod();
258 send_mutex->ReleaseMutex();
259 // on dort jusqu'a la prochaine execution
260 SleepUntil(time);
[2]261
[15]262 // envoi des donnees
263 int offset = 3;
264 send_buffer[0] = header;
265 send_mutex->GetMutex();
[2]266
[15]267 for (size_t i = 0; i < data_to_send.size(); i++) {
268 if (data_to_send.at(i)->SendPeriod() == resume_period &&
269 data_to_send.at(i)->IsEnabled() == true) {
270 data_to_send.at(i)->CopyDatas(send_buffer + offset);
271 offset += data_to_send.at(i)->SendSize();
272 }
273 }
274 if (offset != 3) {
275 memcpy(&send_buffer[1], &resume_period, sizeof(uint16_t));
276 // printf("send %i %i %i %x
277 // %x\n",resume_period,offset,sizeof(uint16_t),send_buffer,&send_buffer[1]);
278 // for(int i=0;i<offset;i++) printf("%x ",send_buffer[i]);
279 // printf("\n");
280 Send(send_buffer, offset);
281 }
[2]282
[15]283 // on planifie la prochaine execution
284 for (size_t i = 0; i < data_to_send.size(); i++) {
285 if (data_to_send.at(i)->SendPeriod() == resume_period) {
286 resume_time.at(i) += data_to_send.at(i)->SendPeriod() * 1000000;
[2]287 }
[15]288 }
289 send_mutex->ReleaseMutex();
290 // Printf("%i %lld\n",resume_period,GetTime()/1000000);
291 } else {
292 send_mutex->ReleaseMutex();
293 // rien a faire, suspend
294 // Printf("rien a faire suspend\n");
295 Suspend();
296 // Printf("wake\n");
297 // on planifie la prochaine execution
298 Time time = GetTime();
299 send_mutex->GetMutex();
300 for (size_t i = 0; i < data_to_send.size(); i++) {
301 resume_time.at(i) =
302 time + (Time)data_to_send.at(i)->SendPeriod() * 1000000;
303 }
304 send_mutex->ReleaseMutex();
[2]305 }
[15]306 }
[2]307}
308#ifdef __XENO__
[15]309void *ui_com::user_thread(void *arg) {
310 int pipe_fd = -1;
311 string devname;
312 char *buf = NULL;
313 ui_com *caller = (ui_com *)arg;
314 int rv;
315 fd_set set;
316 struct timeval timeout;
317 ssize_t nb_read, nb_write;
[2]318
[15]319 buf = (char *)malloc(NRT_PIPE_SIZE);
320 if (buf == NULL) {
321 caller->Err("malloc error\n");
322 }
[2]323
[15]324 devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
325 caller->ObjectName() + "-pipe";
326 while (pipe_fd < 0) {
327 pipe_fd = open(devname.c_str(), O_RDWR);
328 if (pipe_fd < 0 && errno != ENOENT)
329 caller->Err("open pipe_fd error (%s)\n", strerror(-errno));
330 usleep(1000);
331 }
[2]332
[15]333 while (1) {
334 FD_ZERO(&set); // clear the set
335 FD_SET(pipe_fd, &set); // add our file descriptor to the set
[2]336
[15]337 timeout.tv_sec = 0;
338 timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
[2]339
[15]340 rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
[2]341
[15]342 if (rv == -1) {
343 if (caller->is_running == false &&
344 UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT)
345 break; // timeout
346 if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT)
347 caller->Err("epoll_wait, %s, code %i\n",
348 UDT::getlasterror().getErrorMessage(),
349 UDT::getlasterror().getErrorCode());
350 } else if (rv == 0) {
351 // printf("timeout\n"); // a timeout occured
352 if (caller->is_running == false)
353 break;
[2]354
[15]355 } else {
356 nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE);
357 buf[nb_read] = 0;
358// printf("envoi\n%s\n",buf);
[2]359
360#ifdef COMPRESS_FRAMES
[15]361 char *out;
362 ssize_t out_size;
[2]363
[15]364 if (buf[0] == XML_HEADER) {
365 if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) {
366 nb_read = out_size;
367 nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true);
368 free(out);
369 } else {
370 caller->Warn("Compress error, sending it uncompressed\n");
371 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
372 }
373 } else {
374 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
375 }
[2]376#else
[15]377 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true);
[2]378#endif
379
[15]380 if (nb_write < 0) {
381 caller->Err("UDT::sendmsg error (%s)\n",
382 UDT::getlasterror().getErrorMessage());
383 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST ||
384 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) {
385 caller->connection_lost = true;
[2]386 }
[15]387 } else if (nb_write != nb_read) {
388 caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read);
389 }
[2]390 }
[15]391 }
[2]392
[15]393 close(pipe_fd);
394 if (buf != NULL)
395 free(buf);
396 pthread_exit(0);
[2]397}
398#endif
399
[15]400int ui_com::compressBuffer(char *in, ssize_t in_size, char **out,
401 ssize_t *out_size, int level) {
402 int ret, flush;
403 unsigned have;
404 z_stream strm;
[2]405
[15]406 /* allocate deflate state */
407 strm.zalloc = Z_NULL;
408 strm.zfree = Z_NULL;
409 strm.opaque = Z_NULL;
410 ret = deflateInit(&strm, level);
411 if (ret != Z_OK)
412 return ret;
[2]413
[15]414 *out = (char *)malloc(COMPRESS_CHUNK);
415 if (!(*out))
416 return Z_BUF_ERROR;
[2]417
[15]418 strm.next_in = (unsigned char *)in;
419 strm.avail_out = COMPRESS_CHUNK;
420 strm.next_out = (unsigned char *)*out;
421 strm.avail_in = in_size;
422 flush = Z_FINISH;
[2]423
[15]424 ret = deflate(&strm, flush); /* no bad return value */
425 if (ret == Z_STREAM_ERROR) {
426 free(*out);
427 return ret;
428 }
[2]429
[15]430 have = COMPRESS_CHUNK - strm.avail_out;
431 *out_size = have;
432 // printf("%i -> %i\n",in_size,have);
433 /* clean up and return */
434 (void)deflateEnd(&strm);
[2]435
[15]436 if (strm.avail_out != 0) {
437 return Z_OK;
438 } else {
439 return Z_STREAM_ERROR;
440 }
[2]441}
442
[15]443int ui_com::uncompressBuffer(unsigned char *in, ssize_t in_size,
444 unsigned char **out, ssize_t *out_size) {
445 int ret;
446 // unsigned have;
447 z_stream strm;
[2]448
[15]449 /* allocate inflate state */
450 strm.zalloc = Z_NULL;
451 strm.zfree = Z_NULL;
452 strm.opaque = Z_NULL;
453 strm.avail_in = 0;
454 strm.next_in = Z_NULL;
455 ret = inflateInit(&strm);
456 if (ret != Z_OK)
457 return ret;
[2]458
[15]459 *out = (unsigned char *)malloc(COMPRESS_CHUNK);
460 if (!(*out))
461 return Z_BUF_ERROR;
[2]462
[15]463 strm.avail_in = in_size;
464 strm.next_in = in;
465 strm.avail_out = COMPRESS_CHUNK;
466 strm.next_out = *out;
[2]467
[15]468 ret = inflate(&strm, Z_NO_FLUSH);
469 assert(ret != Z_STREAM_ERROR); /* state not clobbered */
470 switch (ret) {
471 case Z_NEED_DICT:
472 ret = Z_DATA_ERROR; /* and fall through */
473 case Z_DATA_ERROR:
474 case Z_MEM_ERROR:
[2]475 (void)inflateEnd(&strm);
[15]476 return ret;
477 }
478 // have = COMPRESS_CHUNK - strm.avail_out;
[2]479
[15]480 /* clean up and return */
481 (void)inflateEnd(&strm);
482 return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
[2]483}
484
[15]485bool ui_com::ConnectionLost() { return connection_lost; }
[2]486
[15]487void ui_com::AddSendData(const SendData *obj) {
488 send_mutex->GetMutex();
[2]489
[15]490 resume_time.push_back(GetTime() + (Time)obj->SendPeriod() * 1000000);
491
492 // on resume en meme temps tout ceux qui ont la meme periode
493 for (size_t i = 0; i < data_to_send.size(); i++) {
494 if (data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
495 resume_time.at(resume_time.size() - 1) = resume_time.at(i);
496 break;
[2]497 }
[15]498 }
[2]499
[15]500 data_to_send.push_back(obj);
[2]501
[15]502 send_mutex->ReleaseMutex();
[2]503}
504
[15]505void ui_com::UpdateSendData(const SendData *obj) {
506 // le mutex est deja pris par l'appellant
507 size_t id, i;
[2]508
[15]509 // on recupere l'id
510 for (i = 0; i < data_to_send.size(); i++) {
511 if (data_to_send.at(i) == obj) {
512 id = i;
513 break;
[2]514 }
[15]515 }
[2]516
[15]517 // on resume en meme temps tout ceux qui ont la meme periode
518 for (i = 0; i < data_to_send.size(); i++) {
519 if (i == id)
520 continue;
521 if (data_to_send.at(i)->IsEnabled() == true &&
522 data_to_send.at(i)->SendPeriod() == obj->SendPeriod()) {
523 resume_time.at(id) = resume_time.at(i);
524 break;
[2]525 }
[15]526 }
[2]527
[15]528 // si aucun match, on planifie l'execution
529 if (i == data_to_send.size())
530 resume_time.at(id) = GetTime() + (Time)obj->SendPeriod() * 1000000;
[2]531
[15]532 if (IsSuspended() == true)
533 Resume();
[2]534
[15]535 return;
[2]536}
537
[15]538void ui_com::UpdateDataToSendSize(void) {
539 send_mutex->GetMutex();
540 send_size = 3; // id(1)+period(2)
541 for (size_t i = 0; i < data_to_send.size(); i++) {
542 if (data_to_send[i] != NULL)
543 send_size += data_to_send[i]->SendSize();
544 }
[2]545
[15]546 // send_buffer=(char*)realloc((void*)send_buffer,send_size*sizeof(char));
547 if (send_buffer != NULL)
548 free(send_buffer);
549 send_buffer = (char *)malloc(send_size * sizeof(char));
550 send_mutex->ReleaseMutex();
[2]551}
552
[15]553void ui_com::RemoveSendData(const SendData *obj) {
554 // printf("remove_data_to_send %i\n",data_to_send.size());
[2]555
[15]556 send_mutex->GetMutex();
557 // on recupere l'id
558 for (size_t i = 0; i < data_to_send.size(); i++) {
559 if (data_to_send.at(i) == obj) {
560 data_to_send.erase(data_to_send.begin() + i);
561 resume_time.erase(resume_time.begin() + i);
562 // printf("remove_data_to_send %i ok\n",data_to_send.size());
563 break;
[2]564 }
[15]565 }
566 send_mutex->ReleaseMutex();
[2]567
[15]568 return;
[2]569}
570
[15]571void ui_com::Block(void) { send_mutex->GetMutex(); }
[2]572
[15]573void ui_com::UnBlock(void) { send_mutex->ReleaseMutex(); }
Note: See TracBrowser for help on using the repository browser.