Changeset 203 in flair-src


Ignore:
Timestamp:
11/09/17 14:00:49 (6 years ago)
Author:
Bayard Gildas
Message:

Added timeout on SharedMem Read

Location:
trunk/lib/FlairCore/src
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/Semaphore.cpp

    r126 r203  
    3131Semaphore::~Semaphore() { delete pimpl_; }
    3232
     33bool Semaphore::TryGetSemaphore() const { return pimpl_->TryGetSemaphore(); }
    3334bool Semaphore::GetSemaphore(Time timeout) const { return pimpl_->GetSemaphore(timeout); }
    3435
  • trunk/lib/FlairCore/src/Semaphore.h

    r126 r203  
    4848
    4949  /*!
     50  * \brief TryGetSemaphore
     51  *
     52  * Lock the semaphore. If not possible immediately returns false
     53  *
     54  */
     55  bool TryGetSemaphore() const;
     56
     57  /*!
    5058  * \brief GetSemaphore
    5159  *
  • trunk/lib/FlairCore/src/Semaphore_impl.cpp

    r133 r203  
    5353}
    5454
     55bool Semaphore_impl::TryGetSemaphore() {
     56#ifdef __XENO__
     57  return !rt_sem_p(&semaphore, TM_NONBLOCK);
     58#else
     59  return !sem_trywait(&semaphore);
     60#endif
     61}
     62
    5563bool Semaphore_impl::GetSemaphore(Time timeout) {
    5664  int status;
     
    7583        if (status != 0) {
    7684                if (errno == ETIMEDOUT) {
    77                         self->Warn("warning : semaphore timedout\n");   
     85#ifdef DEBUG
     86                        self->Warn("warning : semaphore timedout\n");
     87#endif
    7888                } else {
    7989                                        char errorMsg[256];
  • trunk/lib/FlairCore/src/SharedMem.cpp

    r149 r203  
    3535}
    3636
    37 void SharedMem::Read(char *buf, size_t size) const {
    38   pimpl_->Read(buf, size);
     37bool SharedMem::Read(char *buf, size_t size, Time nsTimeout) const {
     38  return pimpl_->Read(buf, size, nsTimeout);
    3939}
    4040
  • trunk/lib/FlairCore/src/SharedMem.h

    r149 r203  
    2727*
    2828* Shared memory is identified by its name so it can be accessed
    29 * by another processus using its name.
     29* by another process using its name.
    3030*/
    3131
     
    6565  * \param size buffer size
    6666  */
    67   void Read(char *buf, size_t size) const;
     67  bool Read(char *buf, size_t size, Time nsTimeout=TIME_INFINITE) const;
    6868
    6969
    7070  /*!
    71   * \brief This function should be called when reader starts (in case of a SharedMem of type producerConsumer)
     71  * \brief This function should be called when reader is ready (in case of a SharedMem of type producerConsumer)
    7272  */
    7373   void ReaderReady();
  • trunk/lib/FlairCore/src/SharedMem_impl.cpp

    r152 r203  
    2626using namespace flair::core;
    2727
    28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name,
    29                                size_t size, SharedMem::Type &type):type(type) {
    30   this->size = size;
    31   this->self = self;
    32         char errorMsg[256];
     28SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size),
     29                               sem(self,1,"/" + name + "_mutex"), sem_producer(self,0,"/" + name + "_producer"),sem_consumer(self,0,"/" + name + "_consumer") {
     30
     31  char errorMsg[256];
    3332
    3433#ifdef __XENO__
     
    5251  mem_segment = (char *)ptr;
    5352
    54   mutex_binded = false;
    55   string mutex_name = "mutex_" + name;
    56   status = rt_mutex_create(&mutex, mutex_name.c_str());
    57   if (status == -EEXIST) {
    58     mutex_binded = true;
    59     status = rt_mutex_bind(&mutex, mutex_name.c_str(), TM_INFINITE);
    60   }
    61   if (status != 0) {
    62     self->Err("rt_mutex_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    63     return;
    64   }
    65 
    6653#else
    6754  shm_name = "/" + name;
     
    7259  ftruncate(fd, size);
    7360
    74   if (type==SharedMem::Type::mutex) {
    75     sem_name="/" + name + "_mutex";
    76     sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
    77     if (sem==SEM_FAILED) {
    78       self->Err("Error creating mutex semaphore\n");
    79     }
    80   } else if (type==SharedMem::Type::producerConsumer) {
    81     sem_name_producer="/" + name + "_producer";
    82     sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0);
    83     if (sem==SEM_FAILED) {
    84       self->Err("Error creating producer semaphore\n");
    85     }
    86     sem_name_consumer="/" + name + "_consumer";
    87     sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0);
    88     if (sem==SEM_FAILED) {
    89       self->Err("Error creating consumer semaphore\n");
    90     }
    91   }
    92 
    93   mem_segment =
    94       (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
     61  mem_segment = (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    9562  if (mem_segment == MAP_FAILED) {
    9663    self->Err("Failed to map memory\n");
     
    10471       
    10572#ifdef __XENO__
    106   /* unnecessary because heap is opened in H_SINGLE mode
    107   status=rt_heap_free(&heap,mem_segment);
    108   if(status!=0)
    109   {
    110       self->Err("rt_heap_free error (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));
    111   }
    112   */
    11373
    11474  if (heap_binded == false) {
     
    11979  }
    12080
    121   if (mutex_binded == false) {
    122     status = rt_mutex_delete(&mutex);
    123     if (status != 0) {
    124       self->Err("error destroying mutex (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    125     }
    126   }
    12781#else
    12882  status = munmap(mem_segment, size);
     
    14397   }
    14498*/
    145   // do not check errors as it can be done by another process
    146   if (type==SharedMem::Type::mutex) {
    147     status = sem_unlink(sem_name.c_str());
    148     status = sem_close(sem);
    149     if (status != 0) {
    150       self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    151     }
    152   } else if (type==SharedMem::Type::producerConsumer) {
    153     status = sem_unlink(sem_name_producer.c_str());
    154     status = sem_close(sem_producer);
    155     if (status != 0) {
    156       self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    157     }
    158     status = sem_unlink(sem_name_consumer.c_str());
    159     status = sem_close(sem_consumer);
    160     if (status != 0) {
    161       self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    162     }
    163   }
    16499#endif
    165100}
    166101
    167102void SharedMem_impl::ReaderReady() {
    168 #ifdef __XENO__
    169 //TODO
    170 #else
    171   sem_post(sem_consumer);
    172 #endif
     103  sem_consumer.ReleaseSemaphore();
    173104}
    174105
    175106void SharedMem_impl::Write(const char *buf, size_t size) {
    176 #ifdef __XENO__
    177   int status = rt_mutex_acquire(&mutex, TM_INFINITE);
    178   if (status != 0) {
    179                 char errorMsg[256];
    180     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    181         }
    182   memcpy(mem_segment, buf, size);
    183   status = rt_mutex_release(&mutex);
    184   if (status != 0) {
    185                 char errorMsg[256];
    186     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    187         }
    188 #else
    189107  if (type==SharedMem::Type::mutex) {
    190     sem_wait(sem);
     108    sem.GetSemaphore();
    191109    memcpy(mem_segment, buf, size);
    192     sem_post(sem);
     110    sem.ReleaseSemaphore();
    193111  } else if (type==SharedMem::Type::producerConsumer) {
    194112    //wait until consumer took the data away before writting a new one
    195113    //but should not block if nobody's there to read
    196     if (sem_trywait(sem_consumer)==0) {
     114    if (sem_consumer.TryGetSemaphore()) {
    197115      memcpy(mem_segment, buf, size);
    198       sem_post(sem_producer);
     116      sem_producer.ReleaseSemaphore();
    199117    }
    200118  }
    201 #endif
    202119}
    203120
    204 void SharedMem_impl::Read(char *buf, size_t size) {
    205 #ifdef __XENO__
    206   int status = rt_mutex_acquire(&mutex, TM_INFINITE);
    207   if (status != 0) {
    208                 char errorMsg[256];
    209     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    210         }
    211   memcpy(buf, mem_segment, size);
    212   status = rt_mutex_release(&mutex);
    213   if (status != 0) {
    214                 char errorMsg[256];
    215     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    216         }
    217 #else
     121bool SharedMem_impl::Read(char *buf, size_t size, Time nsTimeout) {
    218122  if (type==SharedMem::Type::mutex) {
    219     sem_wait(sem);
    220     memcpy(buf, mem_segment, size);
    221     sem_post(sem);
     123    if (sem.GetSemaphore()) {
     124      memcpy(buf, mem_segment, size);
     125      sem.ReleaseSemaphore();
     126      return true;
     127    }
    222128  } else if (type==SharedMem::Type::producerConsumer) {
    223     sem_wait(sem_producer); //wait until some data is available
    224     memcpy(buf, mem_segment, size);
    225     sem_post(sem_consumer);
     129      if (sem_producer.GetSemaphore(nsTimeout)) {
     130        memcpy(buf, mem_segment, size);
     131        sem_consumer.ReleaseSemaphore();
     132      return true;
     133    }
    226134  }
    227 #endif
     135  return false;
    228136}
  • trunk/lib/FlairCore/src/unexported/Semaphore_impl.h

    r127 r203  
    3131  Semaphore_impl(flair::core::Semaphore *self, uint32_t initialValue);
    3232  ~Semaphore_impl();
     33  bool TryGetSemaphore();
    3334  bool GetSemaphore(flair::core::Time timeout = TIME_INFINITE);
    3435  bool ReleaseSemaphore(void);
  • trunk/lib/FlairCore/src/unexported/SharedMem_impl.h

    r149 r203  
    1818#ifdef __XENO__
    1919#include <native/heap.h>
    20 #include <native/mutex.h>
    21 #else
    22 #include <semaphore.h>
    2320#endif
     21
     22#include <Semaphore.h>
    2423
    2524#include <SharedMem.h>
     
    3736
    3837  void Write(const char *buf, size_t size);
    39   void Read(char *buf, size_t size);
     38  bool Read(char *buf, size_t size, flair::core::Time nsTimeout);
    4039  void ReaderReady();
    4140
     
    4544  size_t size;
    4645  char *mem_segment;
     46  flair::core::Semaphore sem,sem_producer,sem_consumer;
    4747#ifdef __XENO__
    4848  RT_HEAP heap;
    49   RT_MUTEX mutex;
    5049  bool heap_binded;
    51   bool mutex_binded;
    5250#else
    5351  int fd;
    54   sem_t *sem,*sem_producer,*sem_consumer;
    55   std::string sem_name, shm_name;
    56   std::string sem_name_producer,sem_name_consumer;
     52  std::string shm_name;
    5753#endif
    5854};
Note: See TracChangeset for help on using the changeset viewer.