Changeset 439 in flair-src for trunk/lib/FlairCore/src/ui_com.cpp
- Timestamp:
- 08/25/21 13:13:36 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/ui_com.cpp
r253 r439 153 153 } 154 154 155 //send, public part; dispatch to SendNRT (nrt) or rt_pipe (rt) 156 //rt_pipe is read by user thread which calls SendNRT 155 157 void ui_com::Send(char *buf, ssize_t size,int ttl) { 156 158 if (connection_lost == true) … … 158 160 159 161 char *tosend = buf; 160 ssize_t nb_write;161 162 162 if (buf[0] == XML_HEADER) { 163 163 // cut xml header … … 167 167 168 168 #ifdef __XENO__ 169 nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL);170 169 //data buf 170 ssize_t nb_write = rt_pipe_write(&pipe, tosend, size, P_NORMAL); 171 171 if (nb_write < 0) { 172 172 char errorMsg[256]; … … 175 175 Err("rt_pipe_write error %i/%i\n", nb_write, size); 176 176 } 177 //ttl 178 nb_write = rt_pipe_write(&pipe, &ttl, sizeof(ttl), P_NORMAL); 179 if (nb_write < 0) { 180 char errorMsg[256]; 181 Err("rt_pipe_write error (%s)\n", strerror_r(-nb_write, errorMsg, sizeof(errorMsg))); 182 } else if (nb_write != sizeof(ttl)) { 183 Err("rt_pipe_write error %i/%i\n", nb_write, size); 184 } 177 185 #else //__XENO__ 178 186 SendNRT(tosend,size,ttl); 187 #endif //__XENO__ 188 } 189 190 //send, private part; called to effectively send to udt 191 //this is always in nrt 192 void ui_com::SendNRT(char *buf, ssize_t size,int ttl) { 193 ssize_t nb_write; 194 bool sendItCompressed = false; 195 179 196 #ifdef COMPRESS_FRAMES 180 bool sendItCompressed = false;181 197 if (buf[0] == XML_HEADER) { 182 198 sendItCompressed = true; 183 199 } 200 #endif // COMPRESS_FRAMES 184 201 185 202 if (sendItCompressed) { 186 203 char *out; 187 204 ssize_t out_size; 188 if (compressBuffer( tosend, size, &out, &out_size, 9) == Z_OK) {205 if (compressBuffer(buf, size, &out, &out_size, 9) == Z_OK) { 189 206 size = out_size; 190 207 nb_write = UDT::sendmsg(socket_fd, out, size,ttl, true); … … 197 214 198 215 if (!sendItCompressed) { 199 nb_write = UDT::sendmsg(socket_fd, tosend, size, ttl, true); 200 } 201 202 #else // COMPRESS_FRAMES 203 nb_write = UDT::sendmsg(socket_fd, tosend, size, ttl, true); 204 #endif // COMPRESS_FRAMES 216 nb_write = UDT::sendmsg(socket_fd, buf, size, ttl, true); 217 } 218 205 219 //Printf("write %i %i\n",nb_write,size); 206 220 if (nb_write < 0) { … … 214 228 UDT::getlasterror().getErrorCode(), nb_write, size); 215 229 } 216 #endif //__XENO__217 230 } 218 231 … … 240 253 } 241 254 255 //todo: le run (rt) push les données une par une 256 //le nrt recupere et decide quand envoyer : 257 // -quand toutes les données d'une meme periode sont recues 258 // -quand on a atteind le nb buffering 259 // -le nrt alloue dynamiquement le buffer d'envoi 260 //-> enelver l'alloc du buffer au startup (UpdateDataToSendSize) 242 261 void ui_com::Run(void) { 243 262 // check endianness … … 287 306 288 307 for (size_t i = 0; i < data_to_send.size(); i++) { 289 if (data_to_send.at(i)->SendPeriod() == resume_period && 290 data_to_send.at(i)->IsEnabled() == true) { 308 if (data_to_send.at(i)->SendPeriod() == resume_period && data_to_send.at(i)->IsEnabled() == true) { 291 309 data_to_send.at(i)->CopyDatas(send_buffer + offset); 292 310 offset += data_to_send.at(i)->SendSize(); … … 335 353 fd_set set; 336 354 struct timeval timeout; 337 ssize_t nb_read, nb_write; 355 ssize_t nb_read; 356 int ttl; 338 357 339 358 buf = (char *)malloc(NRT_PIPE_SIZE); … … 363 382 364 383 if (rv == -1) { 365 if (caller->is_running == false && 366 UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT) 384 if (caller->is_running == false && UDT::getlasterror().getErrorCode() == CUDTException::ETIMEOUT) 367 385 break; // timeout 368 386 if (UDT::getlasterror().getErrorCode() != CUDTException::ETIMEOUT) 369 387 caller->Err("epoll_wait, %s, code %i\n", 370 UDT::getlasterror().getErrorMessage(), 371 UDT::getlasterror().getErrorCode()); 388 UDT::getlasterror().getErrorMessage(), UDT::getlasterror().getErrorCode()); 372 389 } else if (rv == 0) { 373 390 // printf("timeout\n"); // a timeout occured … … 376 393 377 394 } else { 395 //read data buf and ttl 396 //RT part has made 2 rt_pipe_write; they should be available in nrt 397 //(select ensure at least one data is ready) 378 398 nb_read = read(pipe_fd, buf, NRT_PIPE_SIZE); 379 399 buf[nb_read] = 0; 400 read(pipe_fd, &ttl, sizeof(ttl)); 380 401 // printf("envoi\n%s\n",buf); 381 402 382 #ifdef COMPRESS_FRAMES 383 char *out; 384 ssize_t out_size; 385 386 if (buf[0] == XML_HEADER) { 387 if (compressBuffer(buf, nb_read, &out, &out_size, 9) == Z_OK) { 388 nb_read = out_size; 389 nb_write = UDT::sendmsg(caller->socket_fd, out, out_size, -1, true); 390 free(out); 391 } else { 392 caller->Warn("Compress error, sending it uncompressed\n"); 393 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true); 394 } 395 } else { 396 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true); 397 } 398 #else 399 nb_write = UDT::sendmsg(caller->socket_fd, buf, nb_read, -1, true); 400 #endif 401 402 if (nb_write < 0) { 403 caller->Err("UDT::sendmsg error (%s)\n", 404 UDT::getlasterror().getErrorMessage()); 405 if (UDT::getlasterror().getErrorCode() == CUDTException::ECONNLOST || 406 UDT::getlasterror().getErrorCode() == CUDTException::EINVSOCK) { 407 caller->connection_lost = true; 408 } 409 } else if (nb_write != nb_read) { 410 caller->Err("UDT::sendmsg error %i/%i\n", nb_write, nb_read); 411 } 403 caller->SendNRT(buf, nb_read,ttl); 412 404 } 413 405 } … … 558 550 } 559 551 552 //allocate a buffer at startup for the worst case 553 //(every data send with same pedriod) 554 //TODO: dynamic size depending on what gcs asks? 560 555 void ui_com::UpdateDataToSendSize(void) { 561 556 send_mutex->GetMutex();
Note:
See TracChangeset
for help on using the changeset viewer.