Changeset 203 in flair-src for trunk/lib/FlairCore/src/SharedMem_impl.cpp
- Timestamp:
- Nov 9, 2017, 2:00:49 PM (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/SharedMem_impl.cpp
r152 r203 26 26 using namespace flair::core; 27 27 28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, 29 size_t size, SharedMem::Type &type):type(type) { 30 this->size = size; 31 this->self = self; 32 char errorMsg[256]; 28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size), 29 sem(self,1,"/" + name + "_mutex"), sem_producer(self,0,"/" + name + "_producer"),sem_consumer(self,0,"/" + name + "_consumer") { 30 31 char errorMsg[256]; 33 32 34 33 #ifdef __XENO__ … … 52 51 mem_segment = (char *)ptr; 53 52 54 mutex_binded = false;55 string mutex_name = "mutex_" + name;56 status = rt_mutex_create(&mutex, mutex_name.c_str());57 if (status == -EEXIST) {58 mutex_binded = true;59 status = rt_mutex_bind(&mutex, mutex_name.c_str(), TM_INFINITE);60 }61 if (status != 0) {62 self->Err("rt_mutex_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));63 return;64 }65 66 53 #else 67 54 shm_name = "/" + name; … … 72 59 ftruncate(fd, size); 73 60 74 if (type==SharedMem::Type::mutex) { 75 sem_name="/" + name + "_mutex"; 76 sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1); 77 if (sem==SEM_FAILED) { 78 self->Err("Error creating mutex semaphore\n"); 79 } 80 } else if (type==SharedMem::Type::producerConsumer) { 81 sem_name_producer="/" + name + "_producer"; 82 sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0); 83 if (sem==SEM_FAILED) { 84 self->Err("Error creating producer semaphore\n"); 85 } 86 sem_name_consumer="/" + name + "_consumer"; 87 sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0); 88 if (sem==SEM_FAILED) { 89 self->Err("Error creating consumer semaphore\n"); 90 } 91 } 92 93 mem_segment = 94 (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 61 mem_segment = (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 95 62 if (mem_segment == MAP_FAILED) { 96 63 self->Err("Failed to map memory\n"); … … 104 71 105 72 #ifdef __XENO__ 106 /* unnecessary because heap is opened in H_SINGLE mode107 status=rt_heap_free(&heap,mem_segment);108 if(status!=0)109 {110 self->Err("rt_heap_free error (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));111 }112 */113 73 114 74 if (heap_binded == false) { … … 119 79 } 120 80 121 if (mutex_binded == false) {122 status = rt_mutex_delete(&mutex);123 if (status != 0) {124 self->Err("error destroying mutex (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));125 }126 }127 81 #else 128 82 status = munmap(mem_segment, size); … … 143 97 } 144 98 */ 145 // do not check errors as it can be done by another process146 if (type==SharedMem::Type::mutex) {147 status = sem_unlink(sem_name.c_str());148 status = sem_close(sem);149 if (status != 0) {150 self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));151 }152 } else if (type==SharedMem::Type::producerConsumer) {153 status = sem_unlink(sem_name_producer.c_str());154 status = sem_close(sem_producer);155 if (status != 0) {156 self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));157 }158 status = sem_unlink(sem_name_consumer.c_str());159 status = sem_close(sem_consumer);160 if (status != 0) {161 self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));162 }163 }164 99 #endif 165 100 } 166 101 167 102 void SharedMem_impl::ReaderReady() { 168 #ifdef __XENO__ 169 //TODO 170 #else 171 sem_post(sem_consumer); 172 #endif 103 sem_consumer.ReleaseSemaphore(); 173 104 } 174 105 175 106 void SharedMem_impl::Write(const char *buf, size_t size) { 176 #ifdef __XENO__177 int status = rt_mutex_acquire(&mutex, TM_INFINITE);178 if (status != 0) {179 char errorMsg[256];180 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));181 }182 memcpy(mem_segment, buf, size);183 status = rt_mutex_release(&mutex);184 if (status != 0) {185 char errorMsg[256];186 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));187 }188 #else189 107 if (type==SharedMem::Type::mutex) { 190 sem _wait(sem);108 sem.GetSemaphore(); 191 109 memcpy(mem_segment, buf, size); 192 sem _post(sem);110 sem.ReleaseSemaphore(); 193 111 } else if (type==SharedMem::Type::producerConsumer) { 194 112 //wait until consumer took the data away before writting a new one 195 113 //but should not block if nobody's there to read 196 if (sem_ trywait(sem_consumer)==0) {114 if (sem_consumer.TryGetSemaphore()) { 197 115 memcpy(mem_segment, buf, size); 198 sem_p ost(sem_producer);116 sem_producer.ReleaseSemaphore(); 199 117 } 200 118 } 201 #endif202 119 } 203 120 204 void SharedMem_impl::Read(char *buf, size_t size) { 205 #ifdef __XENO__ 206 int status = rt_mutex_acquire(&mutex, TM_INFINITE); 207 if (status != 0) { 208 char errorMsg[256]; 209 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg))); 210 } 211 memcpy(buf, mem_segment, size); 212 status = rt_mutex_release(&mutex); 213 if (status != 0) { 214 char errorMsg[256]; 215 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg))); 216 } 217 #else 121 bool SharedMem_impl::Read(char *buf, size_t size, Time nsTimeout) { 218 122 if (type==SharedMem::Type::mutex) { 219 sem_wait(sem); 220 memcpy(buf, mem_segment, size); 221 sem_post(sem); 123 if (sem.GetSemaphore()) { 124 memcpy(buf, mem_segment, size); 125 sem.ReleaseSemaphore(); 126 return true; 127 } 222 128 } else if (type==SharedMem::Type::producerConsumer) { 223 sem_wait(sem_producer); //wait until some data is available 224 memcpy(buf, mem_segment, size); 225 sem_post(sem_consumer); 129 if (sem_producer.GetSemaphore(nsTimeout)) { 130 memcpy(buf, mem_segment, size); 131 sem_consumer.ReleaseSemaphore(); 132 return true; 133 } 226 134 } 227 #endif 135 return false; 228 136 }
Note:
See TracChangeset
for help on using the changeset viewer.