Changeset 15 in flair-src for trunk/lib/FlairCore/src/Socket_impl.cpp


Ignore:
Timestamp:
04/08/16 15:40:57 (8 years ago)
Author:
Bayard Gildas
Message:

sources reformatted with flair-format-dir script

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/Socket_impl.cpp

    r2 r15  
    2929using namespace flair::core;
    3030
    31 Socket_impl::Socket_impl(const Socket* self,string name,uint16_t port)
    32 {
    33     this->self=self;
    34     this->port=port;
    35     this->address="";
    36     this->broadcast=false;
    37     Init();
    38 }
    39 
    40 Socket_impl::Socket_impl(const Socket* self,string name,string address,bool broadcast)
    41 {
    42     this->self=self;
    43     int pos = address.find(":");
    44     this->address=address.substr (0,pos);
    45     port=atoi(address.substr(pos+1).c_str());
    46     this->broadcast=broadcast;
    47 
    48     if(pos==0 || address=="")
    49     {
    50         self->Err("address %s is not correct\n",address.c_str());
    51     }
    52     Init();
    53 
    54 }
    55 
    56 void Socket_impl::Init(void)
    57 {
    58         int yes=1;
    59 
    60     fd = socket(AF_INET, SOCK_DGRAM, 0); //UDP
    61 
    62     if(broadcast==true)
    63     {
    64         if(setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(int) )!=0)
    65             self->Err("Setsockopt error\n");
    66     }
    67 
    68     if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int) )!=0)
    69         self->Err("Setsockopt error\n");
    70 
    71     if(address=="" || broadcast==true)
    72     {
    73         sockaddr_in sin = { 0 };
    74 
    75         if(broadcast==true)
    76         {
    77             struct hostent *hostinfo;
    78 
    79             hostinfo = gethostbyname(this->address.c_str());
    80             if (hostinfo == NULL)
    81             {
    82                 self->Err("hostinfo error\n");
    83             }
    84             sin.sin_addr = *(in_addr *) hostinfo->h_addr;
    85         }
    86         else
    87         {
    88             sin.sin_addr.s_addr=INADDR_ANY;
    89         }
    90 
    91         sin.sin_port = htons(port);
    92         sin.sin_family = AF_INET;
    93         if(bind(fd,(sockaddr *) &sin, sizeof sin) == -1)
    94         {
    95             self->Err("bind error\n");
    96         }
    97     }
    98 
    99 
    100 #ifdef __XENO__
    101     string tmp_name;
    102     int status;
    103     is_running=true;
    104 
    105     //pipe
    106     tmp_name= getFrameworkManager()->ObjectName() + "-" + self->ObjectName()+ "-pipe";
    107     //xenomai limitation
    108     if(tmp_name.size()>31) self->Err("rt_pipe_create error (%s is too long)\n",tmp_name.c_str());
     31Socket_impl::Socket_impl(const Socket *self, string name, uint16_t port) {
     32  this->self = self;
     33  this->port = port;
     34  this->address = "";
     35  this->broadcast = false;
     36  Init();
     37}
     38
     39Socket_impl::Socket_impl(const Socket *self, string name, string address,
     40                         bool broadcast) {
     41  this->self = self;
     42  int pos = address.find(":");
     43  this->address = address.substr(0, pos);
     44  port = atoi(address.substr(pos + 1).c_str());
     45  this->broadcast = broadcast;
     46
     47  if (pos == 0 || address == "") {
     48    self->Err("address %s is not correct\n", address.c_str());
     49  }
     50  Init();
     51}
     52
     53void Socket_impl::Init(void) {
     54  int yes = 1;
     55
     56  fd = socket(AF_INET, SOCK_DGRAM, 0); // UDP
     57
     58  if (broadcast == true) {
     59    if (setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(int)) != 0)
     60      self->Err("Setsockopt error\n");
     61  }
     62
     63  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) != 0)
     64    self->Err("Setsockopt error\n");
     65
     66  if (address == "" || broadcast == true) {
     67    sockaddr_in sin = {0};
     68
     69    if (broadcast == true) {
     70      struct hostent *hostinfo;
     71
     72      hostinfo = gethostbyname(this->address.c_str());
     73      if (hostinfo == NULL) {
     74        self->Err("hostinfo error\n");
     75      }
     76      sin.sin_addr = *(in_addr *)hostinfo->h_addr;
     77    } else {
     78      sin.sin_addr.s_addr = INADDR_ANY;
     79    }
     80
     81    sin.sin_port = htons(port);
     82    sin.sin_family = AF_INET;
     83    if (bind(fd, (sockaddr *)&sin, sizeof sin) == -1) {
     84      self->Err("bind error\n");
     85    }
     86  }
     87
     88#ifdef __XENO__
     89  string tmp_name;
     90  int status;
     91  is_running = true;
     92
     93  // pipe
     94  tmp_name =
     95      getFrameworkManager()->ObjectName() + "-" + self->ObjectName() + "-pipe";
     96  // xenomai limitation
     97  if (tmp_name.size() > 31)
     98    self->Err("rt_pipe_create error (%s is too long)\n", tmp_name.c_str());
    10999#ifdef RT_PIPE_SIZE
    110     status=rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO,RT_PIPE_SIZE);
     100  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, RT_PIPE_SIZE);
    111101#else
    112     status=rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO,0);
    113 #endif
    114     if(status!=0)
    115     {
    116         self->Err("rt_pipe_create error (%s)\n",strerror(-status));
    117     }
    118 
    119     //start user side thread
     102  status = rt_pipe_create(&pipe, tmp_name.c_str(), P_MINOR_AUTO, 0);
     103#endif
     104  if (status != 0) {
     105    self->Err("rt_pipe_create error (%s)\n", strerror(-status));
     106  }
     107
     108// start user side thread
    120109#ifdef NRT_STACK_SIZE
    121     // Initialize thread creation attributes
    122     pthread_attr_t attr;
    123     if(pthread_attr_init(&attr) != 0)
    124     {
    125         self->Err("pthread_attr_init error\n");
    126     }
    127     if(pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0)
    128     {
    129         self->Err("pthread_attr_setstacksize error\n");
    130     }
    131     if(pthread_create(&user_thread,  &attr, user, (void*)this) < 0)
    132     {
    133         self->Err("pthread_create error\n");
    134     }
    135     if(pthread_attr_destroy(&attr) != 0)
    136     {
    137         self->Err("pthread_attr_destroy error\n");
    138     }
    139 #else //NRT_STACK_SIZE
    140     if(pthread_create(&user_thread, NULL, user, (void*)this) < 0)
    141     {
    142         self->Err("pthread_create error\n");
    143     }
    144 #endif //NRT_STACK_SIZE
     110  // Initialize thread creation attributes
     111  pthread_attr_t attr;
     112  if (pthread_attr_init(&attr) != 0) {
     113    self->Err("pthread_attr_init error\n");
     114  }
     115  if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) {
     116    self->Err("pthread_attr_setstacksize error\n");
     117  }
     118  if (pthread_create(&user_thread, &attr, user, (void *)this) < 0) {
     119    self->Err("pthread_create error\n");
     120  }
     121  if (pthread_attr_destroy(&attr) != 0) {
     122    self->Err("pthread_attr_destroy error\n");
     123  }
     124#else  // NRT_STACK_SIZE
     125  if (pthread_create(&user_thread, NULL, user, (void *)this) < 0) {
     126    self->Err("pthread_create error\n");
     127  }
     128#endif // NRT_STACK_SIZE
    145129#endif //__XENO__
    146130
    147     if(address!="")
    148     {
    149         struct hostent *hostinfo;
    150         hostinfo = gethostbyname(address.c_str());
    151         if (hostinfo == NULL)
    152         {
    153             self->Err("gethostbyname\n");
    154         }
    155 
    156         sock_in.sin_addr=*(in_addr *) hostinfo->h_addr;
    157         sock_in.sin_port = htons(port);
    158         sock_in.sin_family = AF_INET;
    159     }
    160 }
    161 
    162 Socket_impl::~Socket_impl()
    163 {
    164 #ifdef __XENO__
    165     is_running=false;
    166 
    167     pthread_join(user_thread,NULL);
    168     int status=rt_pipe_delete(&pipe);
    169     if(status!=0) self->Err("rt_pipe_delete error (%s)\n",strerror(-status));
    170 
    171 #endif
    172     close(fd);
    173 }
    174 
    175 void Socket_impl::SendMessage(const char* src,size_t src_len)
    176 {
    177     ssize_t written;
    178     string to_send;
    179 
    180     if(broadcast==true)
    181     {
    182         to_send=getFrameworkManager()->ObjectName()+ ":" + string(src, src_len);
    183         src_len=to_send.size();
    184         src=(char*)to_send.c_str();
    185     }
    186 
    187 #ifdef __XENO__
    188 //Printf("send pipe %s\n",src);
    189     written=rt_pipe_write(&pipe,src,src_len,P_NORMAL);
    190 
    191     if(written<0)
    192     {
    193         self->Err("rt_pipe_write error (%s)\n",strerror(-written));
    194     }
    195     else if (written != (ssize_t)src_len)
    196     {
    197         self->Err("rt_pipe_write error %i/%i\n",written,to_send.size());
    198     }
     131  if (address != "") {
     132    struct hostent *hostinfo;
     133    hostinfo = gethostbyname(address.c_str());
     134    if (hostinfo == NULL) {
     135      self->Err("gethostbyname\n");
     136    }
     137
     138    sock_in.sin_addr = *(in_addr *)hostinfo->h_addr;
     139    sock_in.sin_port = htons(port);
     140    sock_in.sin_family = AF_INET;
     141  }
     142}
     143
     144Socket_impl::~Socket_impl() {
     145#ifdef __XENO__
     146  is_running = false;
     147
     148  pthread_join(user_thread, NULL);
     149  int status = rt_pipe_delete(&pipe);
     150  if (status != 0)
     151    self->Err("rt_pipe_delete error (%s)\n", strerror(-status));
     152
     153#endif
     154  close(fd);
     155}
     156
     157void Socket_impl::SendMessage(const char *src, size_t src_len) {
     158  ssize_t written;
     159  string to_send;
     160
     161  if (broadcast == true) {
     162    to_send = getFrameworkManager()->ObjectName() + ":" + string(src, src_len);
     163    src_len = to_send.size();
     164    src = (char *)to_send.c_str();
     165  }
     166
     167#ifdef __XENO__
     168  // Printf("send pipe %s\n",src);
     169  written = rt_pipe_write(&pipe, src, src_len, P_NORMAL);
     170
     171  if (written < 0) {
     172    self->Err("rt_pipe_write error (%s)\n", strerror(-written));
     173  } else if (written != (ssize_t)src_len) {
     174    self->Err("rt_pipe_write error %i/%i\n", written, to_send.size());
     175  }
    199176#else
    200     written=sendto(fd,src,src_len, 0, (struct sockaddr *) &sock_in, sizeof(sock_in));
    201 
    202     if(written!=(ssize_t)src_len)
    203     {
    204         self->Err("sendto error\n");
    205     }
    206 #endif
    207 }
    208 
    209 void Socket_impl::SendMessage(string message)
    210 {
    211     ssize_t written;
    212 
    213     if(broadcast==true) message=self->Parent()->ObjectName()+ ":" + message;
    214     //Printf("SendMessage %s\n",message.c_str());
    215 #ifdef __XENO__
    216     written=rt_pipe_write(&pipe,message.c_str(),message.size(),P_NORMAL);
    217 
    218     if(written<0)
    219     {
    220         self->Err("rt_pipe_write error (%s)\n",strerror(-written));
    221     }
    222     else if (written != (ssize_t)message.size())
    223     {
    224         self->Err("rt_pipe_write error %i/%i\n",written,message.size());
    225     }
     177  written =
     178      sendto(fd, src, src_len, 0, (struct sockaddr *)&sock_in, sizeof(sock_in));
     179
     180  if (written != (ssize_t)src_len) {
     181    self->Err("sendto error\n");
     182  }
     183#endif
     184}
     185
     186void Socket_impl::SendMessage(string message) {
     187  ssize_t written;
     188
     189  if (broadcast == true)
     190    message = self->Parent()->ObjectName() + ":" + message;
     191// Printf("SendMessage %s\n",message.c_str());
     192#ifdef __XENO__
     193  written = rt_pipe_write(&pipe, message.c_str(), message.size(), P_NORMAL);
     194
     195  if (written < 0) {
     196    self->Err("rt_pipe_write error (%s)\n", strerror(-written));
     197  } else if (written != (ssize_t)message.size()) {
     198    self->Err("rt_pipe_write error %i/%i\n", written, message.size());
     199  }
    226200#else
    227     written=sendto(fd,message.c_str(),message.size(), 0, (struct sockaddr *) &sock_in, sizeof(sock_in));
    228     if(written!=(ssize_t)message.size())
    229     {
    230         self->Err("sendto error\n");
    231     }
    232 
    233 #endif
    234 }
    235 
    236 ssize_t Socket_impl::RecvMessage(char* msg,size_t msg_len,Time timeout,char* src,size_t* src_len)
    237 {
    238     ssize_t nb_read;
    239     char buffer[128];
    240 #ifdef __XENO__
    241     nb_read=rt_pipe_read(&pipe,&buffer,sizeof(buffer),timeout);
     201  written = sendto(fd, message.c_str(), message.size(), 0,
     202                   (struct sockaddr *)&sock_in, sizeof(sock_in));
     203  if (written != (ssize_t)message.size()) {
     204    self->Err("sendto error\n");
     205  }
     206
     207#endif
     208}
     209
     210ssize_t Socket_impl::RecvMessage(char *msg, size_t msg_len, Time timeout,
     211                                 char *src, size_t *src_len) {
     212  ssize_t nb_read;
     213  char buffer[128];
     214#ifdef __XENO__
     215  nb_read = rt_pipe_read(&pipe, &buffer, sizeof(buffer), timeout);
    242216#else
    243     socklen_t sinsize = sizeof(sock_in);
    244     struct timeval tv;
    245 
    246     if(timeout!=TIME_NONBLOCK) {
    247         int attr=fcntl(fd, F_GETFL,0);
    248         fcntl(fd, F_SETFL, attr & (~O_NONBLOCK));
    249 
    250         tv.tv_sec = timeout/1000000000;
    251         timeout=timeout-(timeout/1000000000)*1000000000;
    252         tv.tv_usec = timeout/1000;
    253         if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,&tv,sizeof(tv)) < 0) {
    254             self->Err("setsockopt SO_RCVTIMEO failed\n");
    255         }
     217  socklen_t sinsize = sizeof(sock_in);
     218  struct timeval tv;
     219
     220  if (timeout != TIME_NONBLOCK) {
     221    int attr = fcntl(fd, F_GETFL, 0);
     222    fcntl(fd, F_SETFL, attr & (~O_NONBLOCK));
     223
     224    tv.tv_sec = timeout / 1000000000;
     225    timeout = timeout - (timeout / 1000000000) * 1000000000;
     226    tv.tv_usec = timeout / 1000;
     227    if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
     228      self->Err("setsockopt SO_RCVTIMEO failed\n");
     229    }
     230  } else {
     231    fcntl(fd, F_SETFL, O_NONBLOCK);
     232  }
     233
     234  if (broadcast == false) {
     235    nb_read =
     236        recvfrom(fd, buffer, sizeof(buffer), 0, (sockaddr *)&sock_in, &sinsize);
     237  } else {
     238    nb_read = recvfrom(fd, buffer, sizeof(buffer), 0, NULL, NULL);
     239  }
     240#endif
     241  if (nb_read <= 0) {
     242    return nb_read;
     243  } else {
     244    // printf("%s\n",buffer);
     245    if (broadcast == true) {
     246      int index = -1;
     247      for (int i = 0; i < nb_read; i++) {
     248        if (buffer[i] == ':') {
     249          index = i;
     250          break;
     251        }
     252      }
     253      if (index < 0) {
     254        self->Warn("Malformed message\n");
     255        return -1;
     256      } else if (src_len != NULL && src != NULL) {
     257        if (index + 1 > (int)(*src_len) &&
     258            src != NULL) { //+1 pour inserer un 0)
     259          self->Warn("insufficent src size\n");
     260          return -1;
     261        }
     262      } else if (nb_read - index - 1 + 1 >
     263                 (int)msg_len) { //+1 pour inserer un 0
     264        self->Warn("insufficent msg size (%i/%i)\n", nb_read - index - 1 + 1,
     265                   msg_len);
     266        return -1;
     267      }
     268      if (src != NULL) {
     269        memcpy(src, buffer, index);
     270        src[index] = 0;
     271      }
     272      memcpy(msg, &buffer[index + 1], nb_read - index - 1);
     273      msg[nb_read - index - 1] = 0;
     274      return nb_read - index;
    256275    } else {
    257         fcntl(fd, F_SETFL, O_NONBLOCK);
    258     }
    259 
    260     if(broadcast==false) {
    261         nb_read = recvfrom(fd, buffer, sizeof(buffer), 0, (sockaddr *) &sock_in, &sinsize);
     276      if (nb_read > (int)msg_len) {
     277        self->Warn("insufficent msg size (%i/%i)\n", nb_read, msg_len);
     278        return -1;
     279      }
     280      memcpy(msg, buffer, nb_read);
     281      return nb_read;
     282    }
     283  }
     284}
     285
     286#ifdef __XENO__
     287void *Socket_impl::user(void *arg) {
     288  Socket_impl *caller = (Socket_impl *)arg;
     289  int pipe_fd = -1;
     290  string devname;
     291
     292  devname = NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" +
     293            caller->self->ObjectName() + "-pipe";
     294  while (pipe_fd < 0) {
     295    pipe_fd = open(devname.c_str(), O_RDWR);
     296    if (pipe_fd < 0 && errno != ENOENT)
     297      caller->self->Err("open pipe_fd error (%s)\n", strerror(-errno));
     298    usleep(1000);
     299  }
     300
     301  while (caller->is_running == true) {
     302    fd_set set;
     303    struct timeval timeout;
     304    int rv;
     305
     306    FD_ZERO(&set);            // clear the set
     307    FD_SET(caller->fd, &set); // add our file descriptor to the set
     308    FD_SET(pipe_fd, &set);    // add our file descriptor to the set
     309
     310    timeout.tv_sec = 0;
     311    timeout.tv_usec = SELECT_TIMEOUT_MS * 1000;
     312    rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
     313
     314    if (rv == -1) {
     315      caller->self->Err("select error\n"); // an error occured
     316    } else if (rv == 0) {
     317      // printf("timeout\n");
    262318    } else {
    263         nb_read = recvfrom(fd, buffer, sizeof(buffer), 0, NULL,NULL);
    264     }
    265 #endif
    266     if(nb_read<=0) {
    267         return nb_read;
    268     } else {
    269         //printf("%s\n",buffer);
    270         if(broadcast==true) {
    271             int index=-1;
    272             for(int i=0;i<nb_read;i++) {
    273                 if(buffer[i]==':') {
    274                     index=i;
    275                     break;
    276                 }
    277             }
    278             if(index<0) {
    279                 self->Warn("Malformed message\n");
    280                 return -1;
    281             } else if(src_len!=NULL && src!=NULL) {
    282                 if(index+1>(int)(*src_len) && src!=NULL) { //+1 pour inserer un 0)
    283                     self->Warn("insufficent src size\n");
    284                     return -1;
    285                 }
    286             } else if(nb_read-index-1+1>(int)msg_len) { //+1 pour inserer un 0
    287                 self->Warn("insufficent msg size (%i/%i)\n",nb_read-index-1+1,msg_len);
    288                 return -1;
    289             }
    290             if(src!=NULL) {
    291                 memcpy(src,buffer,index);
    292                 src[index]=0;
    293             }
    294             memcpy(msg,&buffer[index+1],nb_read-index-1);
    295             msg[nb_read-index-1]=0;
    296             return nb_read-index;
     319      ssize_t nb_read, nb_write;
     320      char buffer[1024];
     321
     322      if (FD_ISSET(caller->fd, &set)) {
     323        socklen_t sinsize = sizeof(caller->sock_in);
     324        if (caller->broadcast == false) {
     325          nb_read = recvfrom(caller->fd, buffer, sizeof(buffer), 0,
     326                             (sockaddr *)&(caller->sock_in), &sinsize);
    297327        } else {
    298             if(nb_read>(int)msg_len) {
    299                 self->Warn("insufficent msg size (%i/%i)\n",nb_read,msg_len);
    300                 return -1;
    301             }
    302             memcpy(msg,buffer,nb_read);
    303             return nb_read;
    304         }
    305     }
    306 }
    307 
    308 #ifdef __XENO__
    309 void* Socket_impl::user(void * arg)
    310 {
    311     Socket_impl *caller = (Socket_impl*)arg;
    312     int pipe_fd=-1;
    313     string devname;
    314 
    315     devname= NRT_PIPE_PATH + getFrameworkManager()->ObjectName() + "-" + caller->self->ObjectName()+ "-pipe";
    316     while(pipe_fd<0)
    317     {
    318         pipe_fd = open(devname.c_str(), O_RDWR);
    319         if (pipe_fd < 0 && errno!=ENOENT) caller->self->Err("open pipe_fd error (%s)\n",strerror(-errno));
    320         usleep(1000);
    321     }
    322 
    323     while(caller->is_running==true)
    324     {
    325         fd_set set;
    326         struct timeval timeout;
    327         int rv;
    328 
    329         FD_ZERO(&set); // clear the set
    330         FD_SET(caller->fd, &set); // add our file descriptor to the set
    331         FD_SET(pipe_fd, &set); // add our file descriptor to the set
    332 
    333         timeout.tv_sec = 0;
    334         timeout.tv_usec = SELECT_TIMEOUT_MS*1000;
    335         rv = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
    336 
    337         if(rv == -1)
    338         {
    339             caller->self->Err("select error\n"); // an error occured
    340         }
    341         else if(rv == 0)
    342         {
    343             //printf("timeout\n");
    344         }
    345         else
    346         {
    347             ssize_t nb_read,nb_write;
    348             char buffer[1024];
    349 
    350             if(FD_ISSET(caller->fd, &set))
    351             {
    352                 socklen_t sinsize = sizeof(caller->sock_in);
    353                 if (caller->broadcast==false)
    354                 {
    355                     nb_read = recvfrom(caller->fd, buffer, sizeof(buffer), 0, (sockaddr *) &(caller->sock_in), &sinsize);
    356                 }
    357                 else
    358                 {
    359                     nb_read = recvfrom(caller->fd, buffer, sizeof(buffer), 0, NULL,NULL);
    360                 }
    361                 if(nb_read  < 0)
    362                 {
    363                     caller->self->Err("recvfrom error\n");
    364                 }
    365 //printf("user %s\n",buffer);
    366                 //on ne garde que les messages venant pas de soi meme
    367                 if (caller->broadcast==false || (caller->broadcast==true && getFrameworkManager()->ObjectName().compare(0,getFrameworkManager()->ObjectName().size(),buffer,getFrameworkManager()->ObjectName().size()) != 0))
    368                 {
    369                     nb_write=write(pipe_fd,buffer,nb_read);
    370                     if(nb_write!=nb_read)
    371                     {
    372                         caller->self->Err("write error\n");
    373                     }
    374                 }
    375                 else
    376                 {
    377                     //printf("self %s\n",buffer);
    378                 }
    379             }
    380             if(FD_ISSET(pipe_fd, &set))
    381             {
    382                 nb_read=read(pipe_fd,buffer,sizeof(buffer));
    383                 //printf("read pipe %i %s\n",nb_read,buffer);
    384                 if(nb_read>0)
    385                 {
    386                     //printf("send %s\n",buffer);
    387                     nb_write=sendto(caller->fd,buffer,nb_read, 0, (struct sockaddr *) &(caller->sock_in), sizeof(caller->sock_in));
    388                     if(nb_write!=nb_read)
    389                     {
    390                         caller->self->Err("sendto error\n");
    391                     }
    392                 }
    393             }
    394         }
    395     }
    396 
    397     close(pipe_fd);
    398     pthread_exit(0);
    399 }
    400 #endif
     328          nb_read = recvfrom(caller->fd, buffer, sizeof(buffer), 0, NULL, NULL);
     329        }
     330        if (nb_read < 0) {
     331          caller->self->Err("recvfrom error\n");
     332        }
     333        // printf("user %s\n",buffer);
     334        // on ne garde que les messages venant pas de soi meme
     335        if (caller->broadcast == false ||
     336            (caller->broadcast == true &&
     337             getFrameworkManager()->ObjectName().compare(
     338                 0, getFrameworkManager()->ObjectName().size(), buffer,
     339                 getFrameworkManager()->ObjectName().size()) != 0)) {
     340          nb_write = write(pipe_fd, buffer, nb_read);
     341          if (nb_write != nb_read) {
     342            caller->self->Err("write error\n");
     343          }
     344        } else {
     345          // printf("self %s\n",buffer);
     346        }
     347      }
     348      if (FD_ISSET(pipe_fd, &set)) {
     349        nb_read = read(pipe_fd, buffer, sizeof(buffer));
     350        // printf("read pipe %i %s\n",nb_read,buffer);
     351        if (nb_read > 0) {
     352          // printf("send %s\n",buffer);
     353          nb_write = sendto(caller->fd, buffer, nb_read, 0,
     354                            (struct sockaddr *)&(caller->sock_in),
     355                            sizeof(caller->sock_in));
     356          if (nb_write != nb_read) {
     357            caller->self->Err("sendto error\n");
     358          }
     359        }
     360      }
     361    }
     362  }
     363
     364  close(pipe_fd);
     365  pthread_exit(0);
     366}
     367#endif
Note: See TracChangeset for help on using the changeset viewer.