source: flair-src/trunk/lib/FlairCore/src/SharedMem_impl.cpp@ 149

Last change on this file since 149 was 149, checked in by Bayard Gildas, 5 years ago

SharedMem in producerConsumer mode

File size: 6.7 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5// created: 2014/02/10
6// filename: SharedMem_impl.cpp
7//
8// author: Guillaume Sanahuja
9// Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11// version: $Id: $
12//
13// purpose: Class defining a shared memory
14//
15//
16/*********************************************************************/
17
18#include "SharedMem_impl.h"
19#include "SharedMem.h"
20#include <fcntl.h>
21#include <unistd.h>
22#include <sys/mman.h>
23#include <string.h>
24
25using std::string;
26using namespace flair::core;
27
28SharedMem_impl::SharedMem_impl(const SharedMem *self, string name,
29 size_t size, SharedMem::Type &type):type(type) {
30 this->size = size;
31 this->self = self;
32 char errorMsg[256];
33
34#ifdef __XENO__
35 heap_binded = false;
36 int status = rt_heap_create(&heap, name.c_str(), size,
37 H_SHARED | H_FIFO | H_NONCACHED);
38 if (status == -EEXIST) {
39 heap_binded = true;
40 status = rt_heap_bind(&heap, name.c_str(), TM_INFINITE);
41 }
42 if (status != 0) {
43 self->Err("rt_heap_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
44 return;
45 }
46
47 void *ptr;
48 status = rt_heap_alloc(&heap, 0, TM_NONBLOCK, &ptr);
49 if (status != 0) {
50 self->Err("rt_heap_alloc error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
51 }
52 mem_segment = (char *)ptr;
53
54 mutex_binded = false;
55 string mutex_name = "mutex_" + name;
56 status = rt_mutex_create(&mutex, mutex_name.c_str());
57 if (status == -EEXIST) {
58 mutex_binded = true;
59 status = rt_mutex_bind(&mutex, mutex_name.c_str(), TM_INFINITE);
60 }
61 if (status != 0) {
62 self->Err("rt_mutex_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
63 return;
64 }
65
66#else
67 shm_name = "/" + name;
68 fd = shm_open(shm_name.c_str(), O_RDWR | O_CREAT, 0666);
69 if (fd == -1) {
70 self->Err("Error creating shared memory\n");
71 }
72 ftruncate(fd, size);
73
74 if (type==SharedMem::Type::mutex) {
75 sem_name="/" + name + "_mutex";
76 sem=sem_open(sem_name.c_str(), O_CREAT, 0666, 1);
77 if (sem==SEM_FAILED) {
78 self->Err("Error creating mutex semaphore\n");
79 }
80 } else if (type==SharedMem::Type::producerConsumer) {
81 sem_name_producer="/" + name + "_producer";
82 sem_producer=sem_open(sem_name_producer.c_str(), O_CREAT, 0666, 0);
83 if (sem==SEM_FAILED) {
84 self->Err("Error creating producer semaphore\n");
85 }
86Printf("semaphore %s opened\n",sem_name_producer.c_str());
87 sem_name_consumer="/" + name + "_consumer";
88 sem_consumer=sem_open(sem_name_consumer.c_str(), O_CREAT, 0666, 0);
89 if (sem==SEM_FAILED) {
90 self->Err("Error creating consumer semaphore\n");
91 }
92Printf("semaphore %s opened\n",sem_name_consumer.c_str());
93 }
94
95 mem_segment =
96 (char *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
97 if (mem_segment == MAP_FAILED) {
98 self->Err("Failed to map memory\n");
99 }
100#endif
101}
102
103SharedMem_impl::~SharedMem_impl() {
104 int status;
105 char errorMsg[256];
106
107#ifdef __XENO__
108 /* unnecessary because heap is opened in H_SINGLE mode
109 status=rt_heap_free(&heap,mem_segment);
110 if(status!=0)
111 {
112 self->Err("rt_heap_free error (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));
113 }
114 */
115
116 if (heap_binded == false) {
117 status = rt_heap_delete(&heap);
118 if (status != 0) {
119 self->Err("rt_heap_delete error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
120 }
121 }
122
123 if (mutex_binded == false) {
124 status = rt_mutex_delete(&mutex);
125 if (status != 0) {
126 self->Err("error destroying mutex (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
127 }
128 }
129#else
130 status = munmap(mem_segment, size);
131 if (status != 0) {
132 self->Err("Failed to unmap memory (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
133 }
134
135 status = close(fd);
136 if (status != 0) {
137 self->Err("Failed to close file (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
138 }
139
140 // do not check errors as it can be done by another process
141 status = shm_unlink(shm_name.c_str()); /*
142 if(status!=0)
143 {
144 self->Err("Failed to unlink memory (%s)\n",strerror_r(-status, errorMsg, sizeof(errorMsg)));
145 }
146*/
147 // do not check errors as it can be done by another process
148 if (type==SharedMem::Type::mutex) {
149 status = sem_unlink(sem_name.c_str());
150 status = sem_close(sem);
151 if (status != 0) {
152 self->Err("Failed to close mutex semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
153 }
154 } else if (type==SharedMem::Type::producerConsumer) {
155 status = sem_unlink(sem_name_producer.c_str());
156 status = sem_close(sem_producer);
157 if (status != 0) {
158 self->Err("Failed to close producer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
159 }
160 status = sem_unlink(sem_name_consumer.c_str());
161 status = sem_close(sem_consumer);
162 if (status != 0) {
163 self->Err("Failed to close consumer semaphore (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
164 }
165 }
166#endif
167}
168
169void SharedMem_impl::ReaderReady() {
170#ifdef __XENO__
171//TODO
172#else
173 sem_post(sem_consumer);
174#endif
175}
176
177void SharedMem_impl::Write(const char *buf, size_t size) {
178#ifdef __XENO__
179 int status = rt_mutex_acquire(&mutex, TM_INFINITE);
180 if (status != 0) {
181 char errorMsg[256];
182 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
183 }
184 memcpy(mem_segment, buf, size);
185 status = rt_mutex_release(&mutex);
186 if (status != 0) {
187 char errorMsg[256];
188 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
189 }
190#else
191 if (type==SharedMem::Type::mutex) {
192 sem_wait(sem);
193 memcpy(mem_segment, buf, size);
194 sem_post(sem);
195 } else if (type==SharedMem::Type::producerConsumer) {
196 //wait until consumer took the data away before writting a new one
197 //but should not block if nobody's there to read
198 if (sem_trywait(sem_consumer)==0) {
199 memcpy(mem_segment, buf, size);
200 sem_post(sem_producer);
201 }
202 }
203#endif
204}
205
206void SharedMem_impl::Read(char *buf, size_t size) {
207#ifdef __XENO__
208 int status = rt_mutex_acquire(&mutex, TM_INFINITE);
209 if (status != 0) {
210 char errorMsg[256];
211 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
212 }
213 memcpy(buf, mem_segment, size);
214 status = rt_mutex_release(&mutex);
215 if (status != 0) {
216 char errorMsg[256];
217 self->Err("error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
218 }
219#else
220 if (type==SharedMem::Type::mutex) {
221 sem_wait(sem);
222 memcpy(buf, mem_segment, size);
223 sem_post(sem);
224 } else if (type==SharedMem::Type::producerConsumer) {
225 sem_wait(sem_producer); //wait until some data is available
226 memcpy(buf, mem_segment, size);
227 sem_post(sem_consumer);
228 }
229#endif
230}
Note: See TracBrowser for help on using the repository browser.