Changeset 238 in flair-src for trunk/lib/FlairCore/src
- Timestamp:
- May 15, 2018, 4:41:02 PM (6 years ago)
- Location:
- trunk/lib/FlairCore/src
- Files:
-
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/ConnectedSocket.cpp
r15 r238 32 32 char buffer[sizeof(uint16_t)]; 33 33 size_t alreadyReceived = 0; 34 Time remainingTimeout = timeout; // ms34 Time remainingTimeout = timeout; // ns 35 35 do { 36 36 Time beforeTime = GetTime(); // ns … … 38 38 RecvMessage(buffer + alreadyReceived, 39 39 sizeof(uint16_t) - alreadyReceived, remainingTimeout); 40 remainingTimeout -= (GetTime() - beforeTime) / 100000; 41 if ((received < 0) || (remainingTimeout < 0)) 42 throw std::runtime_error("Timeout"); 40 remainingTimeout -= GetTime() - beforeTime; 41 if ((received < 0) || (remainingTimeout < 0)) throw std::runtime_error("Timeout"); 43 42 alreadyReceived += received; 44 43 } while (alreadyReceived != sizeof(uint16_t)); … … 52 51 char *buffer = (char *)&dataInNetworkEndianness; 53 52 size_t alreadySent = 0; 54 Time remainingTimeout = timeout; // ms53 Time remainingTimeout = timeout; // ns 55 54 do { 56 55 Time beforeTime = GetTime(); // ns 57 56 ssize_t sent = SendMessage( 58 57 buffer + alreadySent, sizeof(uint16_t) - alreadySent, remainingTimeout); 59 remainingTimeout -= (GetTime() - beforeTime) / 100000;58 remainingTimeout -= GetTime() - beforeTime; 60 59 if ((sent < 0) || (remainingTimeout < 0)) 61 60 throw std::runtime_error("Timeout"); … … 67 66 char buffer[sizeof(uint32_t)]; 68 67 size_t alreadyReceived = 0; 69 Time remainingTimeout = timeout; // ms68 Time remainingTimeout = timeout; // ns 70 69 do { 71 70 Time beforeTime = GetTime(); // ns … … 73 72 RecvMessage(buffer + alreadyReceived, 74 73 sizeof(uint32_t) - alreadyReceived, remainingTimeout); 75 remainingTimeout -= (GetTime() - beforeTime) / 100000;74 remainingTimeout -= GetTime() - beforeTime; 76 75 if ((received < 0) || (remainingTimeout < 0)) 77 76 throw std::runtime_error("Timeout"); … … 87 86 char *buffer = (char *)&dataInNetworkEndianness; 88 87 size_t alreadySent = 0; 89 Time remainingTimeout = timeout; // ms88 Time remainingTimeout = timeout; // ns 90 89 do { 91 90 Time beforeTime = GetTime(); // ns 92 91 ssize_t sent = SendMessage( 93 92 buffer + alreadySent, sizeof(uint32_t) - alreadySent, remainingTimeout); 94 remainingTimeout -= (GetTime() - beforeTime) / 100000;93 remainingTimeout -= GetTime() - beforeTime; 95 94 if ((sent < 0) || (remainingTimeout < 0)) 96 95 throw std::runtime_error("Timeout"); … … 108 107 RecvMessage(buffer + alreadyReceived, stringSize - alreadyReceived, 109 108 remainingTimeout); 110 remainingTimeout -= (GetTime() - beforeTime) / 100000;109 remainingTimeout -= GetTime() - beforeTime; 111 110 if ((received < 0) || (remainingTimeout < 0)) 112 111 throw std::runtime_error("Timeout"); -
trunk/lib/FlairCore/src/ConnectedSocket.h
r16 r238 53 53 * \brief Returns a socket on a new incoming connexion 54 54 * 55 * \param ConnectedSocket &listeningSocket55 * \param timeout timeout (in nanoseconds) 56 56 */ 57 57 virtual ConnectedSocket *Accept( … … 65 65 * \param unsigned int port 66 66 * \param const distantAddress 67 * \param timeout timeout (in milliseconds)67 * \param timeout timeout (in nanoseconds) 68 68 */ 69 69 virtual bool Connect(const unsigned int port, … … 71 71 72 72 /*! 73 * \brief Send a message 73 * \brief Send a message waiting up to timeout ns 74 74 * 75 75 * \param message message 76 76 * \param message_len message length 77 * \param timeout timeout (in milliseconds)77 * \param timeout timeout (in nanoseconds) 78 78 */ 79 79 virtual ssize_t SendMessage(const char *message, size_t message_len, … … 83 83 * \brief Receive a message 84 84 * 85 * Receive a message and wait up to timeout . \n85 * Receive a message and wait up to timeout ns. \n 86 86 * 87 87 * \param buf buffer to put the message 88 88 * \param buf_len buffer length 89 * \param timeout timeout (in milliseconds)89 * \param timeout timeout (in nanoseconds) 90 90 * 91 91 * \return size of the received message -
trunk/lib/FlairCore/src/IODevice.h
r157 r238 151 151 void SetIsReady(bool status); 152 152 153 private:154 153 /*! 155 154 * \brief Update using provided datas … … 164 163 virtual void UpdateFrom(const io_data *data) = 0; 165 164 165 private: 166 166 class IODevice_impl *pimpl_; 167 167 }; -
trunk/lib/FlairCore/src/Semaphore.cpp
r203 r238 24 24 namespace core { 25 25 26 Semaphore::Semaphore(const Object *parent, uint32_t initialValue, string name )27 : Object(parent, name, "semaphore") {28 pimpl_ = new Semaphore_impl(this, initialValue);26 Semaphore::Semaphore(const Object *parent, uint32_t initialValue, string name, Type type) 27 : Object(parent, name, "semaphore"),type(type) { 28 pimpl_ = new Semaphore_impl(this, name, initialValue,type); 29 29 } 30 30 -
trunk/lib/FlairCore/src/Semaphore.h
r203 r238 31 31 32 32 public: 33 enum class Type { anonymous, named }; 33 34 /*! 34 35 * \brief Constructor … … 39 40 * \param name name 40 41 */ 41 Semaphore(const Object *parent, uint32_t initialValue, std::string name = "" );42 Semaphore(const Object *parent, uint32_t initialValue, std::string name = "", Type type=Type::anonymous); 42 43 43 44 /*! … … 74 75 private: 75 76 class Semaphore_impl *pimpl_; 77 Type type; 76 78 }; 77 79 -
trunk/lib/FlairCore/src/Semaphore_impl.cpp
r213 r238 25 25 using namespace flair::core; 26 26 27 Semaphore_impl::Semaphore_impl(Semaphore *self, uint32_t initialValue) {27 Semaphore_impl::Semaphore_impl(Semaphore *self, string name, uint32_t initialValue, Semaphore::Type &type):type(type) { 28 28 this->self = self; 29 29 int status; … … 32 32 status = rt_sem_create(&semaphore, NULL, initialValue, S_FIFO); 33 33 #else 34 status = sem_init(&semaphore, 0, initialValue); 34 if (type==Semaphore::Type::named) { 35 sem_name = "/" + name; 36 semaphore=sem_open(sem_name.c_str(),O_CREAT, 0666, initialValue); 37 if (semaphore==SEM_FAILED) status=errno; else status=0; 38 } else { //anonymous 39 semaphore=new sem_t; 40 status = sem_init(semaphore, 0, initialValue); 41 } 35 42 #endif 36 43 if (status != 0) { … … 46 53 status = rt_sem_delete(&semaphore); 47 54 #else 48 status = sem_destroy(&semaphore); 55 if (type==Semaphore::Type::named) { 56 status=sem_close(semaphore); 57 sem_unlink(sem_name.c_str()); 58 } else { //anonymous 59 status = sem_destroy(semaphore); 60 } 49 61 #endif 50 62 if (status != 0) { … … 58 70 return !rt_sem_p(&semaphore, TM_NONBLOCK); 59 71 #else 60 return !sem_trywait( &semaphore);72 return !sem_trywait(semaphore); 61 73 #endif 62 74 } … … 77 89 semTimeout.tv_nsec -= 1000000000ULL; 78 90 } 79 status = sem_timedwait( &semaphore, &semTimeout);91 status = sem_timedwait(semaphore, &semTimeout); 80 92 } else { 81 status = sem_wait( &semaphore);93 status = sem_wait(semaphore); 82 94 } 83 95 #endif … … 102 114 status = rt_sem_v(&semaphore); 103 115 #else 104 status = sem_post( &semaphore);116 status = sem_post(semaphore); 105 117 #endif 106 118 if (status != 0) { -
trunk/lib/FlairCore/src/SharedMem.h
r203 r238 40 40 * \param parent parent 41 41 * \param name name 42 * \param size size of the shared memory 43 * \param blockOnRead if true reading will block if nothing written 42 * \param size size of the shared memory:w 43 * 44 * \param type mutex or producerConsumer. Mutex type is for symmetrical use (default) 44 45 */ 45 46 SharedMem(const Object *parent, std::string name, size_t size, Type type=Type::mutex); -
trunk/lib/FlairCore/src/SharedMem_impl.cpp
r213 r238 26 26 using namespace flair::core; 27 27 28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size), 29 sem(self,1,"/" + name + "_mutex"), sem_producer(self,0,"/" + name + "_producer"),sem_consumer(self,0,"/" + name + "_consumer") { 30 28 SharedMem_impl::SharedMem_impl(const SharedMem *self, string name, size_t size, SharedMem::Type &type):self(self),type(type),size(size) { 31 29 #ifdef __XENO__ 32 30 heap_binded = false; … … 52 50 53 51 #else 52 if (type==SharedMem::Type::mutex) { 53 sem = new Semaphore(self,1,"/" + name + "_mutex",Semaphore::Type::named); 54 } else { //producerConsumer 55 sem_producer=new Semaphore(self,0,"/" + name + "_producer", Semaphore::Type::named); 56 sem_consumer=new Semaphore(self,0,"/" + name + "_consumer", Semaphore::Type::named); 57 } 54 58 shm_name = "/" + name; 55 59 fd = shm_open(shm_name.c_str(), O_RDWR | O_CREAT, 0666); … … 100 104 } 101 105 */ 106 if (type==SharedMem::Type::mutex) { 107 delete sem; 108 } else { 109 delete sem_producer; 110 delete sem_consumer; 111 } 102 112 #endif 103 113 } 104 114 105 115 void SharedMem_impl::ReaderReady() { 106 sem_consumer.ReleaseSemaphore(); 116 if (type==SharedMem::Type::producerConsumer) { 117 sem_consumer->ReleaseSemaphore(); 118 } else { 119 self->Warn("Called on a non producerConsumer type of shared memory! (this is seriously wrong)"); 120 } 107 121 } 108 122 109 123 void SharedMem_impl::Write(const char *buf, size_t size) { 110 124 if (type==SharedMem::Type::mutex) { 111 sem .GetSemaphore();125 sem->GetSemaphore(); 112 126 memcpy(mem_segment, buf, size); 113 sem .ReleaseSemaphore();127 sem->ReleaseSemaphore(); 114 128 } else if (type==SharedMem::Type::producerConsumer) { 115 //wait until consumer took the data away before writ ting a new one129 //wait until consumer took the data away before writing a new one 116 130 //but should not block if nobody's there to read 117 if (sem_consumer .TryGetSemaphore()) {131 if (sem_consumer->TryGetSemaphore()) { 118 132 memcpy(mem_segment, buf, size); 119 sem_producer .ReleaseSemaphore();133 sem_producer->ReleaseSemaphore(); 120 134 } 121 135 } … … 124 138 bool SharedMem_impl::Read(char *buf, size_t size, Time nsTimeout) { 125 139 if (type==SharedMem::Type::mutex) { 126 if (sem .GetSemaphore()) {140 if (sem->GetSemaphore()) { //may block for ever 127 141 memcpy(buf, mem_segment, size); 128 sem .ReleaseSemaphore();142 sem->ReleaseSemaphore(); 129 143 return true; 130 144 } 131 145 } else if (type==SharedMem::Type::producerConsumer) { 132 if (sem_producer .GetSemaphore(nsTimeout)) {146 if (sem_producer->GetSemaphore(nsTimeout)) { //if nobody ever writes we need to exit this after a while 133 147 memcpy(buf, mem_segment, size); 134 sem_consumer .ReleaseSemaphore();148 sem_consumer->ReleaseSemaphore(); 135 149 return true; 136 } 150 }; 137 151 } 138 152 return false; -
trunk/lib/FlairCore/src/TcpSocket.cpp
r231 r238 22 22 #include <fcntl.h> 23 23 #include <string.h> 24 #include <system_error> 24 25 25 26 using std::string; … … 30 31 TcpSocket::TcpSocket(const Object *parent, const std::string name, 31 32 bool _blockOnSend, bool _blockOnReceive) 32 : ConnectedSocket(parent, name), isConnected(false) {33 : ConnectedSocket(parent, name), socket(0), isConnected(false), isListening(false), distantPort(0), distantAddress() { 33 34 blockOnSend = _blockOnSend; 34 35 blockOnReceive = _blockOnReceive; … … 37 38 TcpSocket::~TcpSocket() { 38 39 // Info("Debug: destroying TCP socket %s", ObjectName().c_str()); 39 close(socket);40 if (socket) close(socket); 40 41 } 41 42 … … 59 60 } 60 61 61 listen(socket, 1); 62 int ret=listen(socket, 1); 63 if (ret < 0) { 64 char errorMsg[256]; 65 Err("select: %s\n", strerror_r(errno, errorMsg, sizeof(errorMsg))); 66 } else { 67 isListening=true; 68 } 62 69 } 63 70 64 71 TcpSocket *TcpSocket::Accept(Time timeout) { 72 if (!isListening) throw std::logic_error("Can't call Accept on a non listening socket"); 73 65 74 TcpSocket *acceptedSocket = nullptr; 66 75 67 76 struct timeval tv; 68 77 if (timeout != 0) { 69 tv.tv_sec = timeout / 1000 ; // timeout is in ms70 tv.tv_usec = (timeout % 1000 ) *1000;78 tv.tv_sec = timeout / 1000000000; 79 tv.tv_usec = (timeout % 1000000000) / 1000; 71 80 } 72 81 fd_set rset; … … 81 90 if (ret == 0) { 82 91 // timeout reached 83 // Err("timeout reached\n");92 throw std::runtime_error("Timeout"); 84 93 } else { 85 94 // our socket is readable, a new connection can be accepted … … 88 97 sockaddr_in their_addr; 89 98 socklen_t namelen = sizeof(their_addr); 90 if ((acceptedSocket->socket = 91 accept(socket, (sockaddr *)&their_addr, &namelen)) < 0) { 92 char errorMsg[256]; 93 Err("error: %s\n", strerror_r(errno, errorMsg, sizeof(errorMsg))); 99 ret = accept(socket, (sockaddr *)&their_addr, &namelen); 100 if (ret < 0) { 94 101 delete acceptedSocket; 95 acceptedSocket = nullptr; 102 throw std::system_error(std::error_code(ret, std::generic_category())); 103 } else { 104 acceptedSocket->socket = ret; 96 105 } 97 106 } … … 101 110 } 102 111 103 bool TcpSocket::Connect(const unsigned int distantPort,104 const std::string distantAddress, Time timeout) {112 bool TcpSocket::Connect(const unsigned int _distantPort, 113 const std::string _distantAddress, Time timeout) { 105 114 bool success = false; 106 115 bool criticalError=false; //meaning: if true then this socket can't be re-used 116 117 //should we open a new socket, close it, or re-use it? 107 118 if (isConnected) { 108 if (this->distantPort == distantPort &&109 this->distantAddress ==distantAddress) {119 // if already connected to the same host, just return true 120 if (distantPort == _distantPort && distantAddress == _distantAddress) { 110 121 return true; 111 } else { 122 } 123 // if already connected, but to a different host, close the current socket 124 else { 125 isConnected=false; 112 126 close(socket); 113 } 114 } 115 socket = ::socket(AF_INET, SOCK_STREAM, 0); 116 if (socket == -1) 117 return false; 127 socket=0; 128 } 129 } 130 if (!socket) { 131 socket = ::socket(AF_INET, SOCK_STREAM, 0); 132 if (socket == -1) 133 return false; 134 } 118 135 119 136 sockaddr_in serv_addr; … … 123 140 printf("incorrect network address."); 124 141 close(socket); 142 socket=0; 125 143 return false; 126 144 } … … 135 153 Err("socket connect: %s\n", 136 154 strerror_r(errno, errorMsg, sizeof(errorMsg))); 137 success = false;155 criticalError = true; 138 156 } else { 139 157 // now block with a timeout … … 152 170 char errorMsg[256]; 153 171 Err("select: %s\n", strerror_r(errno, errorMsg, sizeof(errorMsg))); 154 success = false;172 criticalError = true; 155 173 } else { 156 if (ret == 0) { 157 // timeout reached 158 // Err("timeout reached\n"); 159 success = false; 160 } else { 161 // something happened on our socket. Check if an error occured 174 if (ret != 0) { //if ret==0 we're on timeout (nothing to do since success is already set to false) 175 // something happened on our socket. Check if an error occurred 162 176 int error; 163 177 socklen_t len = sizeof(error); … … 165 179 //char errorMsg[256]; 166 180 // Err("getsockopt: %s\n",strerror_r(errno,errorMsg,sizeof(errorMsg))); 167 success = false;181 criticalError = true; 168 182 } else if (error != 0) { 169 183 //char errorMsg[256]; 170 184 // Err("socket error: %d(%s)\n",error,strerror_r(error,errorMsg,sizeof(errorMsg))); 171 success = false;185 criticalError = true; 172 186 } else { 173 if (connect(socket, (sockaddr *)&serv_addr, sizeof(serv_addr)) == 174 -1) { 175 success = false; 176 } else { 187 if (connect(socket, (sockaddr *)&serv_addr, sizeof(serv_addr)) != -1) { 177 188 // Info("connected indeed ^^\n"); 178 189 success = true; 190 } else { 191 //connect failed 192 if ((errno != EINPROGRESS) && (errno != EAGAIN)) { 193 criticalError = true; 194 } 179 195 } 180 196 } … … 183 199 } 184 200 } else { 185 success = true; // never reached suprisingly...201 success = true; // should never happen since we go non blocking and connect can't be immediate 186 202 } 187 203 // switch back to blocking mode (default) … … 189 205 190 206 if (!success) { 191 close(socket); 207 if (criticalError) { 208 close(socket); 209 socket=0; 210 } 211 isConnected=false; 192 212 //Info("Debug: Connect to %s:%d failed\n", distantAddress.c_str(), distantPort); 193 213 return false; 194 214 } else { 195 215 isConnected = true; 196 this->distantPort =distantPort;197 this->distantAddress =distantAddress;216 distantPort = _distantPort; 217 distantAddress = _distantAddress; 198 218 //Info("Debug: Connect to %s:%d succeeded\n", distantAddress.c_str(), distantPort); 199 219 return true; … … 208 228 } else { 209 229 struct timeval tv; 210 tv.tv_sec = timeout / 1000 ; // timeout is in ms211 tv.tv_usec = (timeout % 1000 ) *1000;230 tv.tv_sec = timeout / 1000000000; // timeout is in ns 231 tv.tv_usec = (timeout % 1000000000) / 1000; 212 232 213 233 setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, … … 225 245 } else { 226 246 struct timeval tv; 227 tv.tv_sec = timeout / 1000 ; // timeout is in ms228 tv.tv_usec = (timeout % 1000 ) *1000;247 tv.tv_sec = timeout / 1000000000; // timeout is in ns 248 tv.tv_usec = (timeout % 1000000000) / 1000; 229 249 230 250 setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, -
trunk/lib/FlairCore/src/TcpSocket.h
r15 r238 30 30 void Listen(const unsigned int port, const std::string localAddress = "ANY"); 31 31 TcpSocket *Accept( 32 Time timeout = 0); // should throw an exception if not a listening socket32 Time timeout = TIME_INFINITE); 33 33 bool Connect(const unsigned int distantPort, const std::string distantAddress, 34 Time timeout = 0); // timeout in milliseconds 35 ssize_t SendMessage(const char *message, size_t message_len, 36 Time timeout = 0); // timeout in milliseconds 37 ssize_t RecvMessage(char *buf, size_t buf_len, 38 Time timeout = 0); // timeout in milliseconds 34 Time timeout = TIME_INFINITE); 35 ssize_t SendMessage(const char *message, size_t message_len, Time timeout = TIME_INFINITE); 36 ssize_t RecvMessage(char *buf, size_t buf_len, Time timeout = TIME_INFINITE); 39 37 40 38 uint16_t NetworkToHost16(uint16_t data); … … 48 46 bool blockOnReceive; 49 47 bool isConnected; 48 bool isListening; 50 49 unsigned int distantPort; 51 50 std::string distantAddress; 51 char *sendRingBuffer, *recvRingBuffer; 52 52 }; 53 53 -
trunk/lib/FlairCore/src/unexported/Semaphore_impl.h
r203 r238 17 17 #include <native/sem.h> 18 18 #else 19 #include <fcntl.h> 19 20 #include <semaphore.h> 20 21 #endif … … 29 30 class Semaphore_impl { 30 31 public: 31 Semaphore_impl(flair::core::Semaphore *self, uint32_t initialValue);32 Semaphore_impl(flair::core::Semaphore *self, std::string name, uint32_t initialValue, flair::core::Semaphore::Type &type); 32 33 ~Semaphore_impl(); 33 34 bool TryGetSemaphore(); … … 37 38 RT_SEM semaphore; 38 39 #else 39 sem_t semaphore; 40 std::string sem_name; 41 sem_t *semaphore; 40 42 #endif 41 43 42 44 private: 43 45 flair::core::Semaphore *self; 46 flair::core::Semaphore::Type type; 44 47 }; 45 48 -
trunk/lib/FlairCore/src/unexported/SharedMem_impl.h
r203 r238 44 44 size_t size; 45 45 char *mem_segment; 46 flair::core::Semaphore sem,sem_producer,sem_consumer; 46 flair::core::Semaphore *sem; //for mutex type shared memory 47 flair::core::Semaphore *sem_producer,*sem_consumer; //sem_[produc|consum]er meaning = is [produc|consum]er ready? 47 48 #ifdef __XENO__ 48 49 RT_HEAP heap;
Note:
See TracChangeset
for help on using the changeset viewer.