Changeset 238 in flair-src for trunk/lib/FlairCore/src


Ignore:
Timestamp:
May 15, 2018, 4:41:02 PM (6 years ago)
Author:
Bayard Gildas
Message:

correction sémaphore. bloquant tout ça...

Location:
trunk/lib/FlairCore/src
Files:
12 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/ConnectedSocket.cpp

    r15 r238  
    3232  char buffer[sizeof(uint16_t)];
    3333  size_t alreadyReceived = 0;
    34   Time remainingTimeout = timeout; // ms
     34  Time remainingTimeout = timeout; // ns
    3535  do {
    3636    Time beforeTime = GetTime(); // ns
     
    3838        RecvMessage(buffer + alreadyReceived,
    3939                    sizeof(uint16_t) - alreadyReceived, remainingTimeout);
    40     remainingTimeout -= (GetTime() - beforeTime) / 100000;
    41     if ((received < 0) || (remainingTimeout < 0))
    42       throw std::runtime_error("Timeout");
     40    remainingTimeout -= GetTime() - beforeTime;
     41    if ((received < 0) || (remainingTimeout < 0)) throw std::runtime_error("Timeout");
    4342    alreadyReceived += received;
    4443  } while (alreadyReceived != sizeof(uint16_t));
     
    5251  char *buffer = (char *)&dataInNetworkEndianness;
    5352  size_t alreadySent = 0;
    54   Time remainingTimeout = timeout; // ms
     53  Time remainingTimeout = timeout; // ns
    5554  do {
    5655    Time beforeTime = GetTime(); // ns
    5756    ssize_t sent = SendMessage(
    5857        buffer + alreadySent, sizeof(uint16_t) - alreadySent, remainingTimeout);
    59     remainingTimeout -= (GetTime() - beforeTime) / 100000;
     58    remainingTimeout -= GetTime() - beforeTime;
    6059    if ((sent < 0) || (remainingTimeout < 0))
    6160      throw std::runtime_error("Timeout");
     
    6766  char buffer[sizeof(uint32_t)];
    6867  size_t alreadyReceived = 0;
    69   Time remainingTimeout = timeout; // ms
     68  Time remainingTimeout = timeout; // ns
    7069  do {
    7170    Time beforeTime = GetTime(); // ns
     
    7372        RecvMessage(buffer + alreadyReceived,
    7473                    sizeof(uint32_t) - alreadyReceived, remainingTimeout);
    75     remainingTimeout -= (GetTime() - beforeTime) / 100000;
     74    remainingTimeout -= GetTime() - beforeTime;
    7675    if ((received < 0) || (remainingTimeout < 0))
    7776      throw std::runtime_error("Timeout");
     
    8786  char *buffer = (char *)&dataInNetworkEndianness;
    8887  size_t alreadySent = 0;
    89   Time remainingTimeout = timeout; // ms
     88  Time remainingTimeout = timeout; // ns
    9089  do {
    9190    Time beforeTime = GetTime(); // ns
    9291    ssize_t sent = SendMessage(
    9392        buffer + alreadySent, sizeof(uint32_t) - alreadySent, remainingTimeout);
    94     remainingTimeout -= (GetTime() - beforeTime) / 100000;
     93    remainingTimeout -= GetTime() - beforeTime;
    9594    if ((sent < 0) || (remainingTimeout < 0))
    9695      throw std::runtime_error("Timeout");
     
    108107        RecvMessage(buffer + alreadyReceived, stringSize - alreadyReceived,
    109108                    remainingTimeout);
    110     remainingTimeout -= (GetTime() - beforeTime) / 100000;
     109    remainingTimeout -= GetTime() - beforeTime;
    111110    if ((received < 0) || (remainingTimeout < 0))
    112111      throw std::runtime_error("Timeout");
  • trunk/lib/FlairCore/src/ConnectedSocket.h

    r16 r238  
    5353  * \brief Returns a socket on a new incoming connexion
    5454  *
    55   * \param ConnectedSocket &listeningSocket
     55  * \param timeout timeout (in nanoseconds)
    5656  */
    5757  virtual ConnectedSocket *Accept(
     
    6565  * \param unsigned int port
    6666  * \param const distantAddress
    67   * \param timeout timeout (in milliseconds)
     67  * \param timeout timeout (in nanoseconds)
    6868  */
    6969  virtual bool Connect(const unsigned int port,
     
    7171
    7272  /*!
    73   * \brief Send a message
     73  * \brief Send a message waiting up to timeout ns
    7474  *
    7575  * \param message message
    7676  * \param message_len message length
    77   * \param timeout timeout (in milliseconds)
     77  * \param timeout timeout (in nanoseconds)
    7878  */
    7979  virtual ssize_t SendMessage(const char *message, size_t message_len,
     
    8383  * \brief Receive a message
    8484  *
    85   * Receive a message and wait up to timeout. \n
     85  * Receive a message and wait up to timeout ns. \n
    8686  *
    8787  * \param buf buffer to put the message
    8888  * \param buf_len buffer length
    89   * \param timeout timeout (in milliseconds)
     89  * \param timeout timeout (in nanoseconds)
    9090  *
    9191  * \return size of the received message
  • trunk/lib/FlairCore/src/IODevice.h

    r157 r238  
    151151    void SetIsReady(bool status);
    152152
    153 private:
    154153  /*!
    155154  * \brief Update using provided datas
     
    164163  virtual void UpdateFrom(const io_data *data) = 0;
    165164
     165  private:
    166166  class IODevice_impl *pimpl_;
    167167};
  • trunk/lib/FlairCore/src/Semaphore.cpp

    r203 r238  
    2424namespace core {
    2525
    26 Semaphore::Semaphore(const Object *parent, uint32_t initialValue, string name)
    27     : Object(parent, name, "semaphore") {
    28   pimpl_ = new Semaphore_impl(this, initialValue);
     26Semaphore::Semaphore(const Object *parent, uint32_t initialValue, string name, Type type)
     27    : Object(parent, name, "semaphore"),type(type) {
     28  pimpl_ = new Semaphore_impl(this, name, initialValue,type);
    2929}
    3030
  • trunk/lib/FlairCore/src/Semaphore.h

    r203 r238  
    3131
    3232public:
     33  enum class Type { anonymous, named };
    3334  /*!
    3435  * \brief Constructor
     
    3940  * \param name name
    4041  */
    41   Semaphore(const Object *parent, uint32_t initialValue, std::string name = "");
     42  Semaphore(const Object *parent, uint32_t initialValue, std::string name = "", Type type=Type::anonymous);
    4243
    4344  /*!
     
    7475private:
    7576  class Semaphore_impl *pimpl_;
     77  Type type;
    7678};
    7779
  • trunk/lib/FlairCore/src/Semaphore_impl.cpp

    r213 r238  
    2525using namespace flair::core;
    2626
    27 Semaphore_impl::Semaphore_impl(Semaphore *self, uint32_t initialValue) {
     27Semaphore_impl::Semaphore_impl(Semaphore *self, string name, uint32_t initialValue, Semaphore::Type &type):type(type) {
    2828  this->self = self;
    2929  int status;
     
    3232  status = rt_sem_create(&semaphore, NULL, initialValue, S_FIFO);
    3333#else
    34   status = sem_init(&semaphore, 0, initialValue);
     34  if (type==Semaphore::Type::named) {
     35    sem_name = "/" + name;
     36    semaphore=sem_open(sem_name.c_str(),O_CREAT, 0666, initialValue);
     37    if (semaphore==SEM_FAILED) status=errno; else status=0;
     38  } else { //anonymous
     39    semaphore=new sem_t;
     40    status = sem_init(semaphore, 0, initialValue);
     41  }
    3542#endif
    3643  if (status != 0) {
     
    4653  status = rt_sem_delete(&semaphore);
    4754#else
    48   status = sem_destroy(&semaphore);
     55  if (type==Semaphore::Type::named) {
     56    status=sem_close(semaphore);
     57    sem_unlink(sem_name.c_str());
     58  } else { //anonymous
     59    status = sem_destroy(semaphore);
     60  }
    4961#endif
    5062  if (status != 0) {
     
    5870  return !rt_sem_p(&semaphore, TM_NONBLOCK);
    5971#else
    60   return !sem_trywait(&semaphore);
     72  return !sem_trywait(semaphore);
    6173#endif
    6274}
     
    7789        semTimeout.tv_nsec -= 1000000000ULL;
    7890    }
    79     status = sem_timedwait(&semaphore, &semTimeout);
     91    status = sem_timedwait(semaphore, &semTimeout);
    8092  } else {
    81     status = sem_wait(&semaphore);
     93    status = sem_wait(semaphore);
    8294  }
    8395#endif
     
    102114  status = rt_sem_v(&semaphore);
    103115#else
    104   status = sem_post(&semaphore);
     116  status = sem_post(semaphore);
    105117#endif
    106118  if (status != 0) {
  • trunk/lib/FlairCore/src/SharedMem.h

    r203 r238  
    4040  * \param parent parent
    4141  * \param name name
    42   * \param size size of the shared memory
    43   * \param blockOnRead if true reading will block if nothing written
     42  * \param size size of the shared memory:w
     43   *
     44  * \param type mutex or producerConsumer. Mutex type is for symmetrical use (default)
    4445  */
    4546  SharedMem(const Object *parent, std::string name, size_t size, Type type=Type::mutex);
  • trunk/lib/FlairCore/src/SharedMem_impl.cpp

    r213 r238  
    2626using namespace flair::core;
    2727
    28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size),
    29                                sem(self,1,"/" + name + "_mutex"), sem_producer(self,0,"/" + name + "_producer"),sem_consumer(self,0,"/" + name + "_consumer") {
    30 
     28SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size) {
    3129#ifdef __XENO__
    3230  heap_binded = false;
     
    5250
    5351#else
     52  if (type==SharedMem::Type::mutex) {
     53      sem = new Semaphore(self,1,"/" + name + "_mutex",Semaphore::Type::named);
     54  } else { //producerConsumer
     55      sem_producer=new Semaphore(self,0,"/" + name + "_producer", Semaphore::Type::named);
     56      sem_consumer=new Semaphore(self,0,"/" + name + "_consumer", Semaphore::Type::named);
     57  }
    5458  shm_name = "/" + name;
    5559  fd = shm_open(shm_name.c_str(), O_RDWR | O_CREAT, 0666);
     
    100104   }
    101105*/
     106    if (type==SharedMem::Type::mutex) {
     107      delete sem;
     108    } else {
     109      delete sem_producer;
     110      delete sem_consumer;
     111    }
    102112#endif
    103113}
    104114
    105115void SharedMem_impl::ReaderReady() {
    106   sem_consumer.ReleaseSemaphore();
     116  if (type==SharedMem::Type::producerConsumer) {
     117    sem_consumer->ReleaseSemaphore();
     118  } else {
     119    self->Warn("Called on a non producerConsumer type of shared memory! (this is seriously wrong)");
     120  }
    107121}
    108122
    109123void SharedMem_impl::Write(const char *buf, size_t size) {
    110124  if (type==SharedMem::Type::mutex) {
    111     sem.GetSemaphore();
     125    sem->GetSemaphore();
    112126    memcpy(mem_segment, buf, size);
    113     sem.ReleaseSemaphore();
     127    sem->ReleaseSemaphore();
    114128  } else if (type==SharedMem::Type::producerConsumer) {
    115     //wait until consumer took the data away before writting a new one
     129    //wait until consumer took the data away before writing a new one
    116130    //but should not block if nobody's there to read
    117     if (sem_consumer.TryGetSemaphore()) {
     131    if (sem_consumer->TryGetSemaphore()) {
    118132      memcpy(mem_segment, buf, size);
    119       sem_producer.ReleaseSemaphore();
     133      sem_producer->ReleaseSemaphore();
    120134    }
    121135  }
     
    124138bool SharedMem_impl::Read(char *buf, size_t size, Time nsTimeout) {
    125139  if (type==SharedMem::Type::mutex) {
    126     if (sem.GetSemaphore()) {
     140    if (sem->GetSemaphore()) { //may block for ever
    127141      memcpy(buf, mem_segment, size);
    128       sem.ReleaseSemaphore();
     142      sem->ReleaseSemaphore();
    129143      return true;
    130144    }
    131145  } else if (type==SharedMem::Type::producerConsumer) {
    132     if (sem_producer.GetSemaphore(nsTimeout)) {
     146    if (sem_producer->GetSemaphore(nsTimeout)) { //if nobody ever writes we need to exit this after a while
    133147      memcpy(buf, mem_segment, size);
    134       sem_consumer.ReleaseSemaphore();
     148      sem_consumer->ReleaseSemaphore();
    135149      return true;
    136     }
     150    };
    137151  }
    138152  return false;
  • trunk/lib/FlairCore/src/TcpSocket.cpp

    r231 r238  
    2222#include <fcntl.h>
    2323#include <string.h>
     24#include <system_error>
    2425
    2526using std::string;
     
    3031TcpSocket::TcpSocket(const Object *parent, const std::string name,
    3132                     bool _blockOnSend, bool _blockOnReceive)
    32     : ConnectedSocket(parent, name), isConnected(false) {
     33    : ConnectedSocket(parent, name), socket(0), isConnected(false), isListening(false), distantPort(0), distantAddress() {
    3334  blockOnSend = _blockOnSend;
    3435  blockOnReceive = _blockOnReceive;
     
    3738TcpSocket::~TcpSocket() {
    3839  // Info("Debug: destroying TCP socket %s", ObjectName().c_str());
    39   close(socket);
     40  if (socket) close(socket);
    4041}
    4142
     
    5960  }
    6061
    61   listen(socket, 1);
     62  int ret=listen(socket, 1);
     63  if (ret < 0) {
     64    char errorMsg[256];
     65    Err("select: %s\n", strerror_r(errno, errorMsg, sizeof(errorMsg)));
     66  } else {
     67    isListening=true;
     68  }
    6269}
    6370
    6471TcpSocket *TcpSocket::Accept(Time timeout) {
     72  if (!isListening) throw std::logic_error("Can't call Accept on a non listening socket");
     73
    6574  TcpSocket *acceptedSocket = nullptr;
    6675
    6776  struct timeval tv;
    6877  if (timeout != 0) {
    69     tv.tv_sec = timeout / 1000; // timeout is in ms
    70     tv.tv_usec = (timeout % 1000) * 1000;
     78    tv.tv_sec = timeout / 1000000000;
     79    tv.tv_usec = (timeout % 1000000000) / 1000;
    7180  }
    7281  fd_set rset;
     
    8190    if (ret == 0) {
    8291      // timeout reached
    83       // Err("timeout reached\n");
     92      throw std::runtime_error("Timeout");
    8493    } else {
    8594      // our socket is readable, a new connection can be accepted
     
    8897      sockaddr_in their_addr;
    8998      socklen_t namelen = sizeof(their_addr);
    90       if ((acceptedSocket->socket =
    91                accept(socket, (sockaddr *)&their_addr, &namelen)) < 0) {
    92         char errorMsg[256];
    93         Err("error: %s\n", strerror_r(errno, errorMsg, sizeof(errorMsg)));
     99      ret = accept(socket, (sockaddr *)&their_addr, &namelen);
     100      if (ret < 0) {
    94101        delete acceptedSocket;
    95         acceptedSocket = nullptr;
     102        throw std::system_error(std::error_code(ret, std::generic_category()));
     103      } else {
     104        acceptedSocket->socket = ret;
    96105      }
    97106    }
     
    101110}
    102111
    103 bool TcpSocket::Connect(const unsigned int distantPort,
    104                         const std::string distantAddress, Time timeout) {
     112bool TcpSocket::Connect(const unsigned int _distantPort,
     113                        const std::string _distantAddress, Time timeout) {
    105114  bool success = false;
    106 
     115  bool criticalError=false; //meaning: if true then this socket can't be re-used
     116
     117  //should we open a new socket, close it, or re-use it?
    107118  if (isConnected) {
    108     if (this->distantPort == distantPort &&
    109         this->distantAddress == distantAddress) {
     119    // if already connected to the same host, just return true
     120    if (distantPort == _distantPort && distantAddress == _distantAddress) {
    110121      return true;
    111     } else {
     122    }
     123    // if already connected, but to a different host, close the current socket
     124    else {
     125      isConnected=false;
    112126      close(socket);
    113     }
    114   }
    115   socket = ::socket(AF_INET, SOCK_STREAM, 0);
    116   if (socket == -1)
    117     return false;
     127      socket=0;
     128    }
     129  }
     130  if (!socket) {
     131    socket = ::socket(AF_INET, SOCK_STREAM, 0);
     132    if (socket == -1)
     133      return false;
     134  }
    118135
    119136  sockaddr_in serv_addr;
     
    123140    printf("incorrect network address.");
    124141    close(socket);
     142    socket=0;
    125143    return false;
    126144  }
     
    135153      Err("socket connect: %s\n",
    136154          strerror_r(errno, errorMsg, sizeof(errorMsg)));
    137       success = false;
     155      criticalError = true;
    138156    } else {
    139157      // now block with a timeout
     
    152170        char errorMsg[256];
    153171        Err("select: %s\n", strerror_r(errno, errorMsg, sizeof(errorMsg)));
    154         success = false;
     172        criticalError = true;
    155173      } else {
    156         if (ret == 0) {
    157           // timeout reached
    158           // Err("timeout reached\n");
    159           success = false;
    160         } else {
    161           // something happened on our socket. Check if an error occured
     174        if (ret != 0) { //if ret==0 we're on timeout (nothing to do since success is already set to false)
     175          // something happened on our socket. Check if an error occurred
    162176          int error;
    163177          socklen_t len = sizeof(error);
     
    165179            //char errorMsg[256];
    166180            // Err("getsockopt: %s\n",strerror_r(errno,errorMsg,sizeof(errorMsg)));
    167             success = false;
     181            criticalError = true;
    168182          } else if (error != 0) {
    169183            //char errorMsg[256];
    170184            // Err("socket error: %d(%s)\n",error,strerror_r(error,errorMsg,sizeof(errorMsg)));
    171             success = false;
     185            criticalError = true;
    172186          } else {
    173             if (connect(socket, (sockaddr *)&serv_addr, sizeof(serv_addr)) ==
    174                 -1) {
    175               success = false;
    176             } else {
     187            if (connect(socket, (sockaddr *)&serv_addr, sizeof(serv_addr)) != -1) {
    177188              // Info("connected indeed ^^\n");
    178189              success = true;
     190            } else {
     191              //connect failed
     192              if ((errno != EINPROGRESS) && (errno != EAGAIN)) {
     193                criticalError = true;
     194              }
    179195            }
    180196          }
     
    183199    }
    184200  } else {
    185     success = true; // never reached suprisingly...
     201    success = true; // should never happen since we go non blocking and connect can't be immediate
    186202  }
    187203  // switch back to blocking mode (default)
     
    189205
    190206  if (!success) {
    191     close(socket);
     207    if (criticalError) {
     208      close(socket);
     209      socket=0;
     210    }
     211    isConnected=false;
    192212     //Info("Debug: Connect to %s:%d failed\n", distantAddress.c_str(), distantPort);
    193213    return false;
    194214  } else {
    195215    isConnected = true;
    196     this->distantPort = distantPort;
    197     this->distantAddress = distantAddress;
     216    distantPort = _distantPort;
     217    distantAddress = _distantAddress;
    198218     //Info("Debug: Connect to %s:%d succeeded\n", distantAddress.c_str(), distantPort);
    199219    return true;
     
    208228  } else {
    209229    struct timeval tv;
    210     tv.tv_sec = timeout / 1000; // timeout is in ms
    211     tv.tv_usec = (timeout % 1000) * 1000;
     230    tv.tv_sec = timeout / 1000000000; // timeout is in ns
     231    tv.tv_usec = (timeout % 1000000000) / 1000;
    212232
    213233    setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
     
    225245  } else {
    226246    struct timeval tv;
    227     tv.tv_sec = timeout / 1000; // timeout is in ms
    228     tv.tv_usec = (timeout % 1000) * 1000;
     247    tv.tv_sec = timeout / 1000000000; // timeout is in ns
     248    tv.tv_usec = (timeout % 1000000000) / 1000;
    229249
    230250    setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,
  • trunk/lib/FlairCore/src/TcpSocket.h

    r15 r238  
    3030  void Listen(const unsigned int port, const std::string localAddress = "ANY");
    3131  TcpSocket *Accept(
    32       Time timeout = 0); // should throw an exception if not a listening socket
     32      Time timeout = TIME_INFINITE);
    3333  bool Connect(const unsigned int distantPort, const std::string distantAddress,
    34                Time timeout = 0); // timeout in milliseconds
    35   ssize_t SendMessage(const char *message, size_t message_len,
    36                       Time timeout = 0); // timeout in milliseconds
    37   ssize_t RecvMessage(char *buf, size_t buf_len,
    38                       Time timeout = 0); // timeout in milliseconds
     34               Time timeout = TIME_INFINITE);
     35  ssize_t SendMessage(const char *message, size_t message_len, Time timeout = TIME_INFINITE);
     36  ssize_t RecvMessage(char *buf, size_t buf_len, Time timeout = TIME_INFINITE);
    3937
    4038  uint16_t NetworkToHost16(uint16_t data);
     
    4846  bool blockOnReceive;
    4947  bool isConnected;
     48  bool isListening;
    5049  unsigned int distantPort;
    5150  std::string distantAddress;
     51  char *sendRingBuffer, *recvRingBuffer;
    5252};
    5353
  • trunk/lib/FlairCore/src/unexported/Semaphore_impl.h

    r203 r238  
    1717#include <native/sem.h>
    1818#else
     19#include <fcntl.h>
    1920#include <semaphore.h>
    2021#endif
     
    2930class Semaphore_impl {
    3031public:
    31   Semaphore_impl(flair::core::Semaphore *self, uint32_t initialValue);
     32  Semaphore_impl(flair::core::Semaphore *self, std::string name, uint32_t initialValue, flair::core::Semaphore::Type &type);
    3233  ~Semaphore_impl();
    3334  bool TryGetSemaphore();
     
    3738  RT_SEM semaphore;
    3839#else
    39   sem_t semaphore;
     40  std::string sem_name;
     41  sem_t *semaphore;
    4042#endif
    4143
    4244private:
    4345  flair::core::Semaphore *self;
     46  flair::core::Semaphore::Type type;
    4447};
    4548
  • trunk/lib/FlairCore/src/unexported/SharedMem_impl.h

    r203 r238  
    4444  size_t size;
    4545  char *mem_segment;
    46   flair::core::Semaphore sem,sem_producer,sem_consumer;
     46  flair::core::Semaphore *sem; //for mutex type shared memory
     47  flair::core::Semaphore *sem_producer,*sem_consumer; //sem_[produc|consum]er meaning = is [produc|consum]er ready?
    4748#ifdef __XENO__
    4849  RT_HEAP heap;
Note: See TracChangeset for help on using the changeset viewer.