source: flair-src/trunk/lib/FlairCore/src/Thread_impl.cpp@ 243

Last change on this file since 243 was 243, checked in by Sanahuja Guillaume, 4 years ago

resolve some bugs when closing connection with gcs

File size: 10.1 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5// created: 2012/10/04
6// filename: Thread_impl.cpp
7//
8// author: Guillaume Sanahuja
9// Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11// version: $Id: $
12//
13// purpose: classe implementant un thread rt ou non
14//
15//
16/*********************************************************************/
17
18#include "Thread_impl.h"
19#include "Thread.h"
20#include "IODevice.h"
21#include "IODevice_impl.h"
22#include "ConditionVariable.h"
23#include "config.h"
24#include "FrameworkManager.h"
25#include <string.h>
26#ifdef __XENO__
27#include <rtdk.h>
28#define TH_NAME getFrameworkManager()->ObjectName() + "-" + self->ObjectName()
29#else//__XENO__
30#include <sys/resource.h>
31#include <unistd.h>
32#include <sys/syscall.h>
33#endif//__XENO__
34
35using std::string;
36using namespace flair::core;
37
38Thread_impl::Thread_impl(Thread *self, uint8_t priority,uint32_t stackSize) {
39 isRunning = false;
40 tobestopped = false;
41 is_suspended = false;
42 period_set = false;
43 this->stackSize = stackSize;
44
45 this->self = self;
46 cond = new ConditionVariable(self, self->ObjectName());
47
48 if (priority < MIN_THREAD_PRIORITY) {
49 priority = MIN_THREAD_PRIORITY;
50 // printf("Thread::Thread, %s: priority set to
51 // %i\n",self->ObjectName().c_str(), priority);
52 }
53 if (priority > MAX_THREAD_PRIORITY) {
54 priority = MAX_THREAD_PRIORITY;
55 self->Warn("priority set to %i\n", MAX_THREAD_PRIORITY);
56 }
57
58 this->priority = priority;
59 period = 100 * 1000 * 1000; // 100ms par defaut
60 min_latency = 1000 * 1000 * 1000;
61 max_latency = 0;
62 mean_latency = 0;
63 last = 0;
64 cpt = 0;
65
66#ifdef __XENO__
67 // Perform auto-init of rt_print buffers if the task doesn't do so
68 rt_print_auto_init(1);
69 // Initialise the rt_print buffer for this task explicitly
70 rt_print_init(512, (TH_NAME).c_str());
71#endif //__XENO__
72}
73
74Thread_impl::~Thread_impl() {
75 SafeStop();
76 Join();
77}
78
79void Thread_impl::Start(void) {
80 int status;
81 isRunning = true;
82 tobestopped = false;
83 is_suspended = false;
84
85 Printf("Starting thread %s (priority=%i, stack size=%i bytes)\n",self->ObjectName().c_str(), priority,stackSize);
86#ifdef __XENO__
87 string th_name =TH_NAME;
88
89 status = rt_task_create(&task_rt, th_name.c_str(), stackSize,
90 (int)priority, T_JOINABLE);
91
92 if (status != 0) {
93 char errorMsg[256];
94 self->Err("rt_task_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
95 } else {
96 //_printf("rt_task_create ok %s\n",th_name);
97 }
98
99 status = rt_task_start(&task_rt, &main_rt, (void *)this);
100 if (status != 0) {
101 char errorMsg[256];
102 self->Err("rt_task_start error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
103 } else {
104 //_printf("rt_task_start ok %s\n",th_name);
105 }
106
107#else //__XENO__
108
109 // Initialize thread creation attributes
110 pthread_attr_t attr;
111 if (pthread_attr_init(&attr) != 0) {
112 self->Err("Error pthread_attr_init\n");
113 }
114
115 if (pthread_attr_setstacksize(&attr, stackSize) != 0) {
116 self->Err("Error pthread_attr_setstacksize\n");
117 }
118
119 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
120 self->Err("Error pthread_attr_setinheritsched\n");
121 }
122
123 if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) {
124 self->Err("Error pthread_attr_setschedpolicy\n");
125 }
126
127 struct sched_param parm;
128 parm.sched_priority = priority;
129 if (pthread_attr_setschedparam(&attr, &parm) != 0) {
130 self->Err("Error pthread_attr_setschedparam\n");
131 }
132
133 next_time = GetTime() + period;
134
135 if (pthread_create(&task_nrt, &attr, main_nrt, (void *)this) != 0) {
136 self->Err("pthread_create error\n");
137 }
138
139 if (pthread_attr_destroy(&attr) != 0) {
140 self->Err("Error pthread_attr_destroy\n");
141 }
142
143#endif //__XENO__
144}
145
146#ifdef __XENO__
147void Thread_impl::main_rt(void *arg) {
148 Thread_impl *caller = (Thread_impl *)arg;
149
150 caller->self->Run();
151 Printf("Stopping thread %s\n", caller->self->ObjectName().c_str());
152 caller->PrintStats();
153}
154#else
155void* Thread_impl::main_nrt(void * arg)
156{
157 Thread_impl *caller = (Thread_impl*)arg;
158/*string th_name=getFrameworkManager()->ObjectName()+ "-" + caller->self->ObjectName();
159caller->self->Info("pthread '%s' created with TID %x\n",th_name.c_str(),(pid_t)syscall(SYS_gettid));*/
160
161 caller->self->Run();
162 Printf("Stopping thread %s\n", caller->self->ObjectName().c_str());
163 caller->PrintStats();
164
165 pthread_exit(0);
166}
167#endif //__XENO__
168
169void Thread_impl::SetPeriodUS(uint32_t period) {
170 if (period == 0) {
171 self->Err("Period must be>0\n");
172 return;
173 }
174
175#ifdef __XENO__
176 int status = rt_task_set_periodic(&task_rt, TM_NOW, period * 1000);
177 if (status != 0) {
178 char errorMsg[256];
179 self->Err("Error rt_task_set_periodic %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
180 }
181#else
182 next_time -= period;
183 next_time += period * 1000;
184#endif
185 this->period = period * 1000;
186 period_set = true;
187}
188
189uint32_t Thread_impl::GetPeriodUS() const { return this->period / 1000; }
190
191void Thread_impl::SetPeriodMS(uint32_t period) {
192 if (period == 0) {
193 self->Err("Period must be>0\n");
194 return;
195 }
196
197#ifdef __XENO__
198 int status = rt_task_set_periodic(&task_rt, TM_NOW, period * 1000 * 1000);
199 if (status != 0) {
200 char errorMsg[256];
201 self->Err("Error rt_task_set_periodic %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
202 }
203#else
204 next_time -= period;
205 next_time += period * 1000 * 1000;
206#endif
207 this->period = period * 1000 * 1000;
208 period_set = true;
209}
210
211uint32_t Thread_impl::GetPeriodMS() const { return this->period / 1000 / 1000; }
212
213void Thread_impl::WaitPeriod(void) {
214 if (period_set == false) {
215 self->Err("Period must be set befaore calling this method\n");
216 return;
217 }
218#ifdef __XENO__
219 unsigned long overruns_r;
220 int status = rt_task_wait_period(&overruns_r);
221 if (status != 0) {
222 char errorMsg[256];
223 self->Err("Error rt_task_wait_period %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
224 }
225//avoid spamming messages
226/*
227 if (status == -ETIMEDOUT) {
228 self->Err("overrun: %lld\n", overruns_r);
229 }*/
230#else
231 self->SleepUntil(next_time);
232 Time current = GetTime();
233//avoid spamming messages
234 //if(current>next_time+period) self->Err("overrun of %lld\n", current-next_time);
235 while (next_time < current) {
236 next_time += period;
237 }
238#endif
239 ComputeLatency(GetTime());
240}
241
242void Thread_impl::Suspend(void) {
243 if (isRunning == false) {
244 self->Err("thread is not started\n");
245 return;
246 }
247
248 cond->GetMutex(), is_suspended = true;
249 cond->CondWait();
250 is_suspended = false;
251 cond->ReleaseMutex();
252}
253
254bool Thread_impl::SuspendUntil(Time date) {
255 if (isRunning == false) {
256 self->Err("thread is not started\n");
257 return false;
258 }
259
260 cond->GetMutex(), is_suspended = true;
261 bool success = cond->CondWaitUntil(date);
262 is_suspended = false;
263 cond->ReleaseMutex();
264 return success;
265}
266
267bool Thread_impl::IsSuspended(void) {
268 bool result;
269
270 cond->GetMutex();
271 result = is_suspended;
272 cond->ReleaseMutex();
273
274 return result;
275}
276
277void Thread_impl::Resume(void) {
278 if (isRunning == false) {
279 self->Err("thread is not started\n");
280 return;
281 }
282
283 cond->GetMutex();
284 if (is_suspended == true) {
285 cond->CondSignal();
286 } else {
287 self->Err("thread is not suspended\n");
288 }
289 cond->ReleaseMutex();
290}
291
292int Thread_impl::WaitUpdate(const IODevice *device) {
293 int status = 0;
294
295 if (IsSuspended() == true) {
296 self->Err("thread is already supended\n");
297 status = -1;
298 } else {
299 cond->GetMutex();
300
301 if (device->pimpl_->SetToWake(self) == 0) {
302 is_suspended = true;
303 cond->CondWait();
304 is_suspended = false;
305 } else {
306 self->Err("%s is already waiting an update\n",
307 device->ObjectName().c_str());
308 status = -1;
309 }
310
311 cond->ReleaseMutex();
312 }
313
314 return status;
315}
316
317void Thread_impl::PrintStats(void) {
318#ifdef __XENO__
319 RT_TASK_INFO info;
320
321 int status = rt_task_inquire(NULL, &info);
322 if (status != 0) {
323 char errorMsg[256];
324 self->Err("Error rt_task_inquire %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
325 } else
326#endif
327 {
328#ifndef __XENO__
329 if(last!=0)
330#endif
331 {
332 if(period_set) {Printf(" period (ns): %lld\n", period);}}
333#ifdef __XENO__
334 Printf(" number of context switches: %i\n", info.ctxswitches);
335 Printf(" number of primary->secondary mode switch: %i\n",
336 info.modeswitches);
337 // printf("number of page faults: %i\n",info.pagefaults);
338 Printf(" execution time (ms) in primary mode: %lld\n",
339 info.exectime / 1000000);
340#else
341/*
342 struct rusage r_usage;
343 getrusage(RUSAGE_THREAD,&r_usage);
344 printf(" memory usage = %ld\n",r_usage.ru_maxrss);
345 printf("RUSAGE :ru_utime => %lld [sec] : %lld [usec], :ru_stime => %lld
346 [sec] : %lld [usec] \n",
347 (int64_t)r_usage.ru_utime.tv_sec, (int64_t)r_usage.ru_utime.tv_usec,
348 (int64_t)r_usage.ru_stime.tv_sec,
349 (int64_t)r_usage.ru_stime.tv_usec);*/
350#endif
351 if (last != 0) {
352 Printf(" min latency (ns): %lld\n", min_latency);
353 Printf(" max latency (ns): %lld\n", max_latency);
354 Printf(" latency moy (ns): %lld\n", mean_latency / cpt);
355 Printf(" iterations: %lld\n", cpt);
356 }
357 }
358}
359
360void Thread_impl::Join(void) {
361 if (isRunning == true) {
362 int status;
363
364#ifdef __XENO__
365 status = rt_task_join(&task_rt);
366#else
367 status = pthread_join(task_nrt, NULL);
368#endif
369 if (status != 0) {
370 char errorMsg[256];
371 self->Err("error %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
372 }
373 isRunning = false;
374 }
375}
376
377void Thread_impl::ComputeLatency(Time time) {
378 Time diff, delta;
379 diff = time - last;
380
381 if (diff >= period) {
382 delta = diff - period;
383 } else {
384 delta = period - diff;
385 }
386 // if(delta==0) rt_printf("%lld %lld\n",time,last);
387 last = time;
388 if (diff == time)
389 return;
390
391 if (delta > max_latency)
392 max_latency = delta;
393 if (delta < min_latency)
394 min_latency = delta;
395 mean_latency += delta;
396 cpt++;
397
398 // Printf("Thread::%s latency moy (ns):
399 // %lld\n",self->ObjectName().c_str(),mean_latency/cpt);
400}
401
402void Thread_impl::SafeStop(void) {
403 tobestopped = true;
404 if (IsSuspended())
405 Resume();
406}
407
408bool Thread_impl::ToBeStopped(void) { return tobestopped; }
Note: See TracBrowser for help on using the repository browser.