Changeset 203 in flair-src
- Timestamp:
- Nov 9, 2017, 2:00:49 PM (7 years ago)
- Location:
- trunk/lib/FlairCore/src
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/Semaphore.cpp
r126 r203 31 31 Semaphore::~Semaphore() { delete pimpl_; } 32 32 33 bool Semaphore::TryGetSemaphore() const { return pimpl_->TryGetSemaphore(); } 33 34 bool Semaphore::GetSemaphore(Time timeout) const { return pimpl_->GetSemaphore(timeout); } 34 35 -
trunk/lib/FlairCore/src/Semaphore.h
r126 r203 48 48 49 49 /*! 50 * \brief TryGetSemaphore 51 * 52 * Lock the semaphore. If not possible immediately returns false 53 * 54 */ 55 bool TryGetSemaphore() const; 56 57 /*! 50 58 * \brief GetSemaphore 51 59 * -
trunk/lib/FlairCore/src/Semaphore_impl.cpp
r133 r203 53 53 } 54 54 55 bool Semaphore_impl::TryGetSemaphore() { 56 #ifdef __XENO__ 57 return !rt_sem_p(&semaphore, TM_NONBLOCK); 58 #else 59 return !sem_trywait(&semaphore); 60 #endif 61 } 62 55 63 bool Semaphore_impl::GetSemaphore(Time timeout) { 56 64 int status; … … 75 83 if (status != 0) { 76 84 if (errno == ETIMEDOUT) { 77 self->Warn("warning : semaphore timedout\n"); 85 #ifdef DEBUG 86 self->Warn("warning : semaphore timedout\n"); 87 #endif 78 88 } else { 79 89 char errorMsg[256]; -
trunk/lib/FlairCore/src/SharedMem.cpp
r149 r203 35 35 } 36 36 37 void SharedMem::Read(char *buf, size_t size) const {38 pimpl_->Read(buf, size);37 bool SharedMem::Read(char *buf, size_t size, Time nsTimeout) const { 38 return pimpl_->Read(buf, size, nsTimeout); 39 39 } 40 40 -
trunk/lib/FlairCore/src/SharedMem.h
r149 r203 27 27 * 28 28 * Shared memory is identified by its name so it can be accessed 29 * by another process ususing its name.29 * by another process using its name. 30 30 */ 31 31 … … 65 65 * \param size buffer size 66 66 */ 67 void Read(char *buf, size_t size) const;67 bool Read(char *buf, size_t size, Time nsTimeout=TIME_INFINITE) const; 68 68 69 69 70 70 /*! 71 * \brief This function should be called when reader starts(in case of a SharedMem of type producerConsumer)71 * \brief This function should be called when reader is ready (in case of a SharedMem of type producerConsumer) 72 72 */ 73 73 void ReaderReady(); -
trunk/lib/FlairCore/src/SharedMem_impl.cpp
r152 r203 26 26 using namespace flair::core; 27 27 28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, 29 size_t size, SharedMem::Type &type):type(type) { 30 this->size = size; 31 this->self = self; 32 char errorMsg[256]; 28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size), 29 sem(self,1,"/" + name + "_mutex"), sem_producer(self,0,"/" + name + "_producer"),sem_consumer(self,0,"/" + name + "_consumer") { 30 31 char errorMsg[256]; 33 32 34 33 #ifdef __XENO__ … … 52 51 mem_segment = (char *)ptr; 53 52 54 mutex_binded = false;55 string mutex_name = "mutex_" + name;56 status = rt_mutex_create(&mutex, mutex_name.c_str());57 if (status == -EEXIST) {58 mutex_binded = true;59 status = rt_mutex_bind(&mutex, mutex_name.c_str(), TM_INFINITE);60 }61 if (status != 0) {62 self->Err("rt_mutex_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));63 return;64 }65 66 53 #else 67 54 shm_name = "/" + name; … … 72 59 ftruncate(fd, size); 73 60 74 if (type==SharedMem::Type::mutex) { 75 sem_name="/" + name + "_mutex"; 76 sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1); 77 if (sem==SEM_FAILED) { 78 self->Err("Error creating mutex semaphore\n"); 79 } 80 } else if (type==SharedMem::Type::producerConsumer) { 81 sem_name_producer="/" + name + "_producer"; 82 sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0); 83 if (sem==SEM_FAILED) { 84 self->Err("Error creating producer semaphore\n"); 85 } 86 sem_name_consumer="/" + name + "_consumer"; 87 sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0); 88 if (sem==SEM_FAILED) { 89 self->Err("Error creating consumer semaphore\n"); 90 } 91 } 92 93 mem_segment = 94 (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 61 mem_segment = (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 95 62 if (mem_segment == MAP_FAILED) { 96 63 self->Err("Failed to map memory\n"); … … 104 71 105 72 #ifdef __XENO__ 106 /* unnecessary because heap is opened in H_SINGLE mode107 status=rt_heap_free(&heap,mem_segment);108 if(status!=0)109 {110 self->Err("rt_heap_free error (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));111 }112 */113 73 114 74 if (heap_binded == false) { … … 119 79 } 120 80 121 if (mutex_binded == false) {122 status = rt_mutex_delete(&mutex);123 if (status != 0) {124 self->Err("error destroying mutex (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));125 }126 }127 81 #else 128 82 status = munmap(mem_segment, size); … … 143 97 } 144 98 */ 145 // do not check errors as it can be done by another process146 if (type==SharedMem::Type::mutex) {147 status = sem_unlink(sem_name.c_str());148 status = sem_close(sem);149 if (status != 0) {150 self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));151 }152 } else if (type==SharedMem::Type::producerConsumer) {153 status = sem_unlink(sem_name_producer.c_str());154 status = sem_close(sem_producer);155 if (status != 0) {156 self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));157 }158 status = sem_unlink(sem_name_consumer.c_str());159 status = sem_close(sem_consumer);160 if (status != 0) {161 self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));162 }163 }164 99 #endif 165 100 } 166 101 167 102 void SharedMem_impl::ReaderReady() { 168 #ifdef __XENO__ 169 //TODO 170 #else 171 sem_post(sem_consumer); 172 #endif 103 sem_consumer.ReleaseSemaphore(); 173 104 } 174 105 175 106 void SharedMem_impl::Write(const char *buf, size_t size) { 176 #ifdef __XENO__177 int status = rt_mutex_acquire(&mutex, TM_INFINITE);178 if (status != 0) {179 char errorMsg[256];180 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));181 }182 memcpy(mem_segment, buf, size);183 status = rt_mutex_release(&mutex);184 if (status != 0) {185 char errorMsg[256];186 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));187 }188 #else189 107 if (type==SharedMem::Type::mutex) { 190 sem _wait(sem);108 sem.GetSemaphore(); 191 109 memcpy(mem_segment, buf, size); 192 sem _post(sem);110 sem.ReleaseSemaphore(); 193 111 } else if (type==SharedMem::Type::producerConsumer) { 194 112 //wait until consumer took the data away before writting a new one 195 113 //but should not block if nobody's there to read 196 if (sem_ trywait(sem_consumer)==0) {114 if (sem_consumer.TryGetSemaphore()) { 197 115 memcpy(mem_segment, buf, size); 198 sem_p ost(sem_producer);116 sem_producer.ReleaseSemaphore(); 199 117 } 200 118 } 201 #endif202 119 } 203 120 204 void SharedMem_impl::Read(char *buf, size_t size) { 205 #ifdef __XENO__ 206 int status = rt_mutex_acquire(&mutex, TM_INFINITE); 207 if (status != 0) { 208 char errorMsg[256]; 209 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg))); 210 } 211 memcpy(buf, mem_segment, size); 212 status = rt_mutex_release(&mutex); 213 if (status != 0) { 214 char errorMsg[256]; 215 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg))); 216 } 217 #else 121 bool SharedMem_impl::Read(char *buf, size_t size, Time nsTimeout) { 218 122 if (type==SharedMem::Type::mutex) { 219 sem_wait(sem); 220 memcpy(buf, mem_segment, size); 221 sem_post(sem); 123 if (sem.GetSemaphore()) { 124 memcpy(buf, mem_segment, size); 125 sem.ReleaseSemaphore(); 126 return true; 127 } 222 128 } else if (type==SharedMem::Type::producerConsumer) { 223 sem_wait(sem_producer); //wait until some data is available 224 memcpy(buf, mem_segment, size); 225 sem_post(sem_consumer); 129 if (sem_producer.GetSemaphore(nsTimeout)) { 130 memcpy(buf, mem_segment, size); 131 sem_consumer.ReleaseSemaphore(); 132 return true; 133 } 226 134 } 227 #endif 135 return false; 228 136 } -
trunk/lib/FlairCore/src/unexported/Semaphore_impl.h
r127 r203 31 31 Semaphore_impl(flair::core::Semaphore *self, uint32_t initialValue); 32 32 ~Semaphore_impl(); 33 bool TryGetSemaphore(); 33 34 bool GetSemaphore(flair::core::Time timeout = TIME_INFINITE); 34 35 bool ReleaseSemaphore(void); -
trunk/lib/FlairCore/src/unexported/SharedMem_impl.h
r149 r203 18 18 #ifdef __XENO__ 19 19 #include <native/heap.h> 20 #include <native/mutex.h>21 #else22 #include <semaphore.h>23 20 #endif 21 22 #include <Semaphore.h> 24 23 25 24 #include <SharedMem.h> … … 37 36 38 37 void Write(const char *buf, size_t size); 39 void Read(char *buf, size_t size);38 bool Read(char *buf, size_t size, flair::core::Time nsTimeout); 40 39 void ReaderReady(); 41 40 … … 45 44 size_t size; 46 45 char *mem_segment; 46 flair::core::Semaphore sem,sem_producer,sem_consumer; 47 47 #ifdef __XENO__ 48 48 RT_HEAP heap; 49 RT_MUTEX mutex;50 49 bool heap_binded; 51 bool mutex_binded;52 50 #else 53 51 int fd; 54 sem_t *sem,*sem_producer,*sem_consumer; 55 std::string sem_name, shm_name; 56 std::string sem_name_producer,sem_name_consumer; 52 std::string shm_name; 57 53 #endif 58 54 };
Note:
See TracChangeset
for help on using the changeset viewer.