Changeset 149 in flair-src for trunk/lib/FlairCore


Ignore:
Timestamp:
Mar 1, 2017, 2:36:10 PM (8 years ago)
Author:
Bayard Gildas
Message:

SharedMem in producerConsumer mode

Location:
trunk/lib/FlairCore/src
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/SharedMem.cpp

    r15 r149  
    2424namespace core {
    2525
    26 SharedMem::SharedMem(const Object *parent, string name, size_t size)
    27     : Object(parent, name, "shmem") {
    28   pimpl_ = new SharedMem_impl(this, name, size);
     26SharedMem::SharedMem(const Object *parent, string name, size_t size, Type type)
     27    : Object(parent, name, "shmem"), type(type) {
     28  pimpl_ = new SharedMem_impl(this, name, size, type);
    2929}
    3030
     
    3535}
    3636
    37 void SharedMem::Read(char *buf, size_t size) const { pimpl_->Read(buf, size); }
     37void SharedMem::Read(char *buf, size_t size) const {
     38  pimpl_->Read(buf, size);
     39}
     40
     41void SharedMem::ReaderReady() {
     42  if (type==Type::producerConsumer) pimpl_->ReaderReady();
     43  else Warn("Function called for a non producerConsumer\n");
     44}
    3845
    3946} // end namespace core
  • trunk/lib/FlairCore/src/SharedMem.h

    r15 r149  
    3232class SharedMem : public Object {
    3333public:
     34  enum class Type { mutex, producerConsumer };
    3435  /*!
    3536  * \brief Constructor
     
    4041  * \param name name
    4142  * \param size size of the shared memory
     43  * \param blockOnRead if true reading will block if nothing written
    4244  */
    43   SharedMem(const Object *parent, std::string name, size_t size);
     45  SharedMem(const Object *parent, std::string name, size_t size, Type type=Type::mutex);
    4446
    4547  /*!
     
    6567  void Read(char *buf, size_t size) const;
    6668
     69
     70  /*!
     71  * \brief This function should be called when reader starts (in case of a SharedMem of type producerConsumer)
     72  */
     73   void ReaderReady();
    6774private:
    6875  SharedMem_impl *pimpl_;
     76  Type type;
    6977};
    7078
  • trunk/lib/FlairCore/src/SharedMem_impl.cpp

    r133 r149  
    2727
    2828SharedMem_impl::SharedMem_impl(const SharedMem *self, string name,
    29                                size_t size) {
     29                               size_t size, SharedMem::Type &type):type(type) {
    3030  this->size = size;
    3131  this->self = self;
     
    7272  ftruncate(fd, size);
    7373
    74   sem_name = "/" + name;
    75   sem = sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
    76   if (sem == SEM_FAILED) {
    77     self->Err("Error creating semaphore\n");
     74  if (type==SharedMem::Type::mutex) {
     75    sem_name="/" + name + "_mutex";
     76    sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
     77    if (sem==SEM_FAILED) {
     78      self->Err("Error creating mutex semaphore\n");
     79    }
     80  } else if (type==SharedMem::Type::producerConsumer) {
     81    sem_name_producer="/" + name + "_producer";
     82    sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0);
     83    if (sem==SEM_FAILED) {
     84      self->Err("Error creating producer semaphore\n");
     85    }
     86Printf("semaphore %s opened\n",sem_name_producer.c_str());
     87    sem_name_consumer="/" + name + "_consumer";
     88    sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0);
     89    if (sem==SEM_FAILED) {
     90      self->Err("Error creating consumer semaphore\n");
     91    }
     92Printf("semaphore %s opened\n",sem_name_consumer.c_str());
    7893  }
    7994
     
    123138  }
    124139
    125   // do not check erros as it can be done by another process
     140  // do not check errors as it can be done by another process
    126141  status = shm_unlink(shm_name.c_str()); /*
    127142   if(status!=0)
     
    130145   }
    131146*/
    132   // do not check erros as it can be done by another process
    133   status = sem_unlink(sem_name.c_str()); /*
    134    if(status!=0)
    135    {
    136        self->Err("Failed to unlink semaphore (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));
    137    }*/
    138 
    139   status = sem_close(sem);
    140   if (status != 0) {
    141     self->Err("Failed to close semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    142   }
     147  // do not check errors as it can be done by another process
     148  if (type==SharedMem::Type::mutex) {
     149    status = sem_unlink(sem_name.c_str());
     150    status = sem_close(sem);
     151    if (status != 0) {
     152      self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
     153    }
     154  } else if (type==SharedMem::Type::producerConsumer) {
     155    status = sem_unlink(sem_name_producer.c_str());
     156    status = sem_close(sem_producer);
     157    if (status != 0) {
     158      self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
     159    }
     160    status = sem_unlink(sem_name_consumer.c_str());
     161    status = sem_close(sem_consumer);
     162    if (status != 0) {
     163      self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
     164    }
     165  }
     166#endif
     167}
     168
     169void SharedMem_impl::ReaderReady() {
     170#ifdef __XENO__
     171//TODO
     172#else
     173  sem_post(sem_consumer);
    143174#endif
    144175}
     
    158189        }
    159190#else
    160   sem_wait(sem);
    161   memcpy(mem_segment, buf, size);
    162   sem_post(sem);
     191  if (type==SharedMem::Type::mutex) {
     192    sem_wait(sem);
     193    memcpy(mem_segment, buf, size);
     194    sem_post(sem);
     195  } else if (type==SharedMem::Type::producerConsumer) {
     196    //wait until consumer took the data away before writting a new one
     197    //but should not block if nobody's there to read
     198    if (sem_trywait(sem_consumer)==0) {
     199      memcpy(mem_segment, buf, size);
     200      sem_post(sem_producer);
     201    }
     202  }
    163203#endif
    164204}
     
    178218        }
    179219#else
    180   sem_wait(sem);
    181   memcpy(buf, mem_segment, size);
    182   sem_post(sem);
    183 #endif
    184 }
     220  if (type==SharedMem::Type::mutex) {
     221    sem_wait(sem);
     222    memcpy(buf, mem_segment, size);
     223    sem_post(sem);
     224  } else if (type==SharedMem::Type::producerConsumer) {
     225    sem_wait(sem_producer); //wait until some data is available
     226    memcpy(buf, mem_segment, size);
     227    sem_post(sem_consumer);
     228  }
     229#endif
     230}
  • trunk/lib/FlairCore/src/unexported/SharedMem_impl.h

    r15 r149  
    2323#endif
    2424
    25 namespace flair {
    26 namespace core {
    27 class SharedMem;
    28 }
    29 }
     25#include <SharedMem.h>
    3026
    3127/*! \class SharedMem_impl
     
    3733public:
    3834  SharedMem_impl(const flair::core::SharedMem *self, std::string name,
    39                  size_t size);
     35                 size_t size, flair::core::SharedMem::Type &type);
    4036  ~SharedMem_impl();
    4137
    4238  void Write(const char *buf, size_t size);
    4339  void Read(char *buf, size_t size);
     40  void ReaderReady();
    4441
    4542private:
    4643  const flair::core::SharedMem *self;
     44  flair::core::SharedMem::Type type;
    4745  size_t size;
    4846  char *mem_segment;
     
    5452#else
    5553  int fd;
    56   sem_t *sem;
     54  sem_t *sem,*sem_producer,*sem_consumer;
    5755  std::string sem_name, shm_name;
     56  std::string sem_name_producer,sem_name_consumer;
    5857#endif
    5958};
Note: See TracChangeset for help on using the changeset viewer.