Changeset 203 in flair-src for trunk/lib/FlairCore/src/SharedMem_impl.cpp


Ignore:
Timestamp:
Nov 9, 2017, 2:00:49 PM (7 years ago)
Author:
Bayard Gildas
Message:

Added timeout on SharedMem Read

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/SharedMem_impl.cpp

    r152 r203  
    2626using namespace flair::core;
    2727
    28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name,
    29                                size_t size, SharedMem::Type &type):type(type) {
    30   this->size = size;
    31   this->self = self;
    32         char errorMsg[256];
     28SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size),
     29                               sem(self,1,"/" + name + "_mutex"), sem_producer(self,0,"/" + name + "_producer"),sem_consumer(self,0,"/" + name + "_consumer") {
     30
     31  char errorMsg[256];
    3332
    3433#ifdef __XENO__
     
    5251  mem_segment = (char *)ptr;
    5352
    54   mutex_binded = false;
    55   string mutex_name = "mutex_" + name;
    56   status = rt_mutex_create(&mutex, mutex_name.c_str());
    57   if (status == -EEXIST) {
    58     mutex_binded = true;
    59     status = rt_mutex_bind(&mutex, mutex_name.c_str(), TM_INFINITE);
    60   }
    61   if (status != 0) {
    62     self->Err("rt_mutex_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    63     return;
    64   }
    65 
    6653#else
    6754  shm_name = "/" + name;
     
    7259  ftruncate(fd, size);
    7360
    74   if (type==SharedMem::Type::mutex) {
    75     sem_name="/" + name + "_mutex";
    76     sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
    77     if (sem==SEM_FAILED) {
    78       self->Err("Error creating mutex semaphore\n");
    79     }
    80   } else if (type==SharedMem::Type::producerConsumer) {
    81     sem_name_producer="/" + name + "_producer";
    82     sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0);
    83     if (sem==SEM_FAILED) {
    84       self->Err("Error creating producer semaphore\n");
    85     }
    86     sem_name_consumer="/" + name + "_consumer";
    87     sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0);
    88     if (sem==SEM_FAILED) {
    89       self->Err("Error creating consumer semaphore\n");
    90     }
    91   }
    92 
    93   mem_segment =
    94       (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
     61  mem_segment = (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    9562  if (mem_segment == MAP_FAILED) {
    9663    self->Err("Failed to map memory\n");
     
    10471       
    10572#ifdef __XENO__
    106   /* unnecessary because heap is opened in H_SINGLE mode
    107   status=rt_heap_free(&heap,mem_segment);
    108   if(status!=0)
    109   {
    110       self->Err("rt_heap_free error (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));
    111   }
    112   */
    11373
    11474  if (heap_binded == false) {
     
    11979  }
    12080
    121   if (mutex_binded == false) {
    122     status = rt_mutex_delete(&mutex);
    123     if (status != 0) {
    124       self->Err("error destroying mutex (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    125     }
    126   }
    12781#else
    12882  status = munmap(mem_segment, size);
     
    14397   }
    14498*/
    145   // do not check errors as it can be done by another process
    146   if (type==SharedMem::Type::mutex) {
    147     status = sem_unlink(sem_name.c_str());
    148     status = sem_close(sem);
    149     if (status != 0) {
    150       self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    151     }
    152   } else if (type==SharedMem::Type::producerConsumer) {
    153     status = sem_unlink(sem_name_producer.c_str());
    154     status = sem_close(sem_producer);
    155     if (status != 0) {
    156       self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    157     }
    158     status = sem_unlink(sem_name_consumer.c_str());
    159     status = sem_close(sem_consumer);
    160     if (status != 0) {
    161       self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    162     }
    163   }
    16499#endif
    165100}
    166101
    167102void SharedMem_impl::ReaderReady() {
    168 #ifdef __XENO__
    169 //TODO
    170 #else
    171   sem_post(sem_consumer);
    172 #endif
     103  sem_consumer.ReleaseSemaphore();
    173104}
    174105
    175106void SharedMem_impl::Write(const char *buf, size_t size) {
    176 #ifdef __XENO__
    177   int status = rt_mutex_acquire(&mutex, TM_INFINITE);
    178   if (status != 0) {
    179                 char errorMsg[256];
    180     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    181         }
    182   memcpy(mem_segment, buf, size);
    183   status = rt_mutex_release(&mutex);
    184   if (status != 0) {
    185                 char errorMsg[256];
    186     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    187         }
    188 #else
    189107  if (type==SharedMem::Type::mutex) {
    190     sem_wait(sem);
     108    sem.GetSemaphore();
    191109    memcpy(mem_segment, buf, size);
    192     sem_post(sem);
     110    sem.ReleaseSemaphore();
    193111  } else if (type==SharedMem::Type::producerConsumer) {
    194112    //wait until consumer took the data away before writting a new one
    195113    //but should not block if nobody's there to read
    196     if (sem_trywait(sem_consumer)==0) {
     114    if (sem_consumer.TryGetSemaphore()) {
    197115      memcpy(mem_segment, buf, size);
    198       sem_post(sem_producer);
     116      sem_producer.ReleaseSemaphore();
    199117    }
    200118  }
    201 #endif
    202119}
    203120
    204 void SharedMem_impl::Read(char *buf, size_t size) {
    205 #ifdef __XENO__
    206   int status = rt_mutex_acquire(&mutex, TM_INFINITE);
    207   if (status != 0) {
    208                 char errorMsg[256];
    209     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    210         }
    211   memcpy(buf, mem_segment, size);
    212   status = rt_mutex_release(&mutex);
    213   if (status != 0) {
    214                 char errorMsg[256];
    215     self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    216         }
    217 #else
     121bool SharedMem_impl::Read(char *buf, size_t size, Time nsTimeout) {
    218122  if (type==SharedMem::Type::mutex) {
    219     sem_wait(sem);
    220     memcpy(buf, mem_segment, size);
    221     sem_post(sem);
     123    if (sem.GetSemaphore()) {
     124      memcpy(buf, mem_segment, size);
     125      sem.ReleaseSemaphore();
     126      return true;
     127    }
    222128  } else if (type==SharedMem::Type::producerConsumer) {
    223     sem_wait(sem_producer); //wait until some data is available
    224     memcpy(buf, mem_segment, size);
    225     sem_post(sem_consumer);
     129      if (sem_producer.GetSemaphore(nsTimeout)) {
     130        memcpy(buf, mem_segment, size);
     131        sem_consumer.ReleaseSemaphore();
     132      return true;
     133    }
    226134  }
    227 #endif
     135  return false;
    228136}
Note: See TracChangeset for help on using the changeset viewer.