Changeset 149 in flair-src for trunk/lib/FlairCore/src/SharedMem_impl.cpp


Ignore:
Timestamp:
03/01/17 14:36:10 (7 years ago)
Author:
Bayard Gildas
Message:

SharedMem in producerConsumer mode

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/FlairCore/src/SharedMem_impl.cpp

    r133 r149  
    2727
    2828SharedMem_impl::SharedMem_impl(const SharedMem *self, string name,
    29                                size_t size) {
     29                               size_t size, SharedMem::Type &type):type(type) {
    3030  this->size = size;
    3131  this->self = self;
     
    7272  ftruncate(fd, size);
    7373
    74   sem_name = "/" + name;
    75   sem = sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
    76   if (sem == SEM_FAILED) {
    77     self->Err("Error creating semaphore\n");
     74  if (type==SharedMem::Type::mutex) {
     75    sem_name="/" + name + "_mutex";
     76    sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
     77    if (sem==SEM_FAILED) {
     78      self->Err("Error creating mutex semaphore\n");
     79    }
     80  } else if (type==SharedMem::Type::producerConsumer) {
     81    sem_name_producer="/" + name + "_producer";
     82    sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0);
     83    if (sem==SEM_FAILED) {
     84      self->Err("Error creating producer semaphore\n");
     85    }
     86Printf("semaphore %s opened\n",sem_name_producer.c_str());
     87    sem_name_consumer="/" + name + "_consumer";
     88    sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0);
     89    if (sem==SEM_FAILED) {
     90      self->Err("Error creating consumer semaphore\n");
     91    }
     92Printf("semaphore %s opened\n",sem_name_consumer.c_str());
    7893  }
    7994
     
    123138  }
    124139
    125   // do not check erros as it can be done by another process
     140  // do not check errors as it can be done by another process
    126141  status = shm_unlink(shm_name.c_str()); /*
    127142   if(status!=0)
     
    130145   }
    131146*/
    132   // do not check erros as it can be done by another process
    133   status = sem_unlink(sem_name.c_str()); /*
    134    if(status!=0)
    135    {
    136        self->Err("Failed to unlink semaphore (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));
    137    }*/
    138 
    139   status = sem_close(sem);
    140   if (status != 0) {
    141     self->Err("Failed to close semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
    142   }
     147  // do not check errors as it can be done by another process
     148  if (type==SharedMem::Type::mutex) {
     149    status = sem_unlink(sem_name.c_str());
     150    status = sem_close(sem);
     151    if (status != 0) {
     152      self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
     153    }
     154  } else if (type==SharedMem::Type::producerConsumer) {
     155    status = sem_unlink(sem_name_producer.c_str());
     156    status = sem_close(sem_producer);
     157    if (status != 0) {
     158      self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
     159    }
     160    status = sem_unlink(sem_name_consumer.c_str());
     161    status = sem_close(sem_consumer);
     162    if (status != 0) {
     163      self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
     164    }
     165  }
     166#endif
     167}
     168
     169void SharedMem_impl::ReaderReady() {
     170#ifdef __XENO__
     171//TODO
     172#else
     173  sem_post(sem_consumer);
    143174#endif
    144175}
     
    158189        }
    159190#else
    160   sem_wait(sem);
    161   memcpy(mem_segment, buf, size);
    162   sem_post(sem);
     191  if (type==SharedMem::Type::mutex) {
     192    sem_wait(sem);
     193    memcpy(mem_segment, buf, size);
     194    sem_post(sem);
     195  } else if (type==SharedMem::Type::producerConsumer) {
     196    //wait until consumer took the data away before writting a new one
     197    //but should not block if nobody's there to read
     198    if (sem_trywait(sem_consumer)==0) {
     199      memcpy(mem_segment, buf, size);
     200      sem_post(sem_producer);
     201    }
     202  }
    163203#endif
    164204}
     
    178218        }
    179219#else
    180   sem_wait(sem);
    181   memcpy(buf, mem_segment, size);
    182   sem_post(sem);
    183 #endif
    184 }
     220  if (type==SharedMem::Type::mutex) {
     221    sem_wait(sem);
     222    memcpy(buf, mem_segment, size);
     223    sem_post(sem);
     224  } else if (type==SharedMem::Type::producerConsumer) {
     225    sem_wait(sem_producer); //wait until some data is available
     226    memcpy(buf, mem_segment, size);
     227    sem_post(sem_consumer);
     228  }
     229#endif
     230}
Note: See TracChangeset for help on using the changeset viewer.