source: flair-src/trunk/lib/FlairCore/src/Thread_impl.cpp@ 256

Last change on this file since 256 was 250, checked in by Bayard Gildas, 6 years ago

little problem with thread (improperly calculated delay for first iteration of a periodic task in some case)

File size: 10.3 KB
RevLine 
[2]1// %flair:license{
[15]2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
[2]4// %flair:license}
5// created: 2012/10/04
6// filename: Thread_impl.cpp
7//
8// author: Guillaume Sanahuja
9// Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11// version: $Id: $
12//
13// purpose: classe implementant un thread rt ou non
14//
15//
16/*********************************************************************/
17
18#include "Thread_impl.h"
19#include "Thread.h"
20#include "IODevice.h"
21#include "IODevice_impl.h"
22#include "ConditionVariable.h"
23#include "config.h"
24#include "FrameworkManager.h"
25#include <string.h>
26#ifdef __XENO__
27#include <rtdk.h>
[118]28#define TH_NAME getFrameworkManager()->ObjectName() + "-" + self->ObjectName()
29#else//__XENO__
[2]30#include <sys/resource.h>
[38]31#include <unistd.h>
32#include <sys/syscall.h>
[118]33#endif//__XENO__
[2]34
35using std::string;
36using namespace flair::core;
37
[213]38Thread_impl::Thread_impl(Thread *self, uint8_t priority,uint32_t stackSize) {
[15]39 isRunning = false;
40 tobestopped = false;
41 is_suspended = false;
42 period_set = false;
[213]43 this->stackSize = stackSize;
[2]44
[15]45 this->self = self;
46 cond = new ConditionVariable(self, self->ObjectName());
[2]47
[15]48 if (priority < MIN_THREAD_PRIORITY) {
49 priority = MIN_THREAD_PRIORITY;
50 // printf("Thread::Thread, %s: priority set to
51 // %i\n",self->ObjectName().c_str(), priority);
52 }
53 if (priority > MAX_THREAD_PRIORITY) {
54 priority = MAX_THREAD_PRIORITY;
55 self->Warn("priority set to %i\n", MAX_THREAD_PRIORITY);
56 }
[2]57
[15]58 this->priority = priority;
59 period = 100 * 1000 * 1000; // 100ms par defaut
[186]60 min_latency = 1000 * 1000 * 1000;
61 max_latency = 0;
62 mean_latency = 0;
[15]63 last = 0;
64 cpt = 0;
[118]65
66#ifdef __XENO__
67 // Perform auto-init of rt_print buffers if the task doesn't do so
68 rt_print_auto_init(1);
69 // Initialise the rt_print buffer for this task explicitly
70 rt_print_init(512, (TH_NAME).c_str());
71#endif //__XENO__
[2]72}
73
[15]74Thread_impl::~Thread_impl() {
75 SafeStop();
76 Join();
[2]77}
78
[15]79void Thread_impl::Start(void) {
80 int status;
81 isRunning = true;
[20]82 tobestopped = false;
83 is_suspended = false;
[2]84
[213]85 Printf("Starting thread %s (priority=%i, stack size=%i bytes)\n",self->ObjectName().c_str(), priority,stackSize);
[2]86#ifdef __XENO__
[118]87 string th_name =TH_NAME;
[2]88
[213]89 status = rt_task_create(&task_rt, th_name.c_str(), stackSize,
[15]90 (int)priority, T_JOINABLE);
[213]91
[15]92 if (status != 0) {
[213]93 char errorMsg[256];
[133]94 self->Err("rt_task_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
[15]95 } else {
96 //_printf("rt_task_create ok %s\n",th_name);
97 }
[2]98
[15]99 status = rt_task_start(&task_rt, &main_rt, (void *)this);
100 if (status != 0) {
[213]101 char errorMsg[256];
[133]102 self->Err("rt_task_start error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
[15]103 } else {
104 //_printf("rt_task_start ok %s\n",th_name);
105 }
[2]106
107#else //__XENO__
108
[15]109 // Initialize thread creation attributes
110 pthread_attr_t attr;
111 if (pthread_attr_init(&attr) != 0) {
112 self->Err("Error pthread_attr_init\n");
113 }
[2]114
[213]115 if (pthread_attr_setstacksize(&attr, stackSize) != 0) {
[15]116 self->Err("Error pthread_attr_setstacksize\n");
117 }
[2]118
[15]119 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
120 self->Err("Error pthread_attr_setinheritsched\n");
121 }
[2]122
[15]123 if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) {
124 self->Err("Error pthread_attr_setschedpolicy\n");
125 }
[2]126
[15]127 struct sched_param parm;
128 parm.sched_priority = priority;
129 if (pthread_attr_setschedparam(&attr, &parm) != 0) {
130 self->Err("Error pthread_attr_setschedparam\n");
131 }
[2]132
[15]133 next_time = GetTime() + period;
[2]134
[15]135 if (pthread_create(&task_nrt, &attr, main_nrt, (void *)this) != 0) {
136 self->Err("pthread_create error\n");
137 }
[2]138
[15]139 if (pthread_attr_destroy(&attr) != 0) {
140 self->Err("Error pthread_attr_destroy\n");
141 }
[2]142
143#endif //__XENO__
144}
145
146#ifdef __XENO__
[15]147void Thread_impl::main_rt(void *arg) {
148 Thread_impl *caller = (Thread_impl *)arg;
[2]149
[15]150 caller->self->Run();
[243]151 Printf("Stopping thread %s\n", caller->self->ObjectName().c_str());
[15]152 caller->PrintStats();
[2]153}
154#else
[38]155void* Thread_impl::main_nrt(void * arg)
156{
157 Thread_impl *caller = (Thread_impl*)arg;
158/*string th_name=getFrameworkManager()->ObjectName()+ "-" + caller->self->ObjectName();
159caller->self->Info("pthread '%s' created with TID %x\n",th_name.c_str(),(pid_t)syscall(SYS_gettid));*/
[2]160
[15]161 caller->self->Run();
[243]162 Printf("Stopping thread %s\n", caller->self->ObjectName().c_str());
[15]163 caller->PrintStats();
[2]164
[15]165 pthread_exit(0);
[2]166}
167#endif //__XENO__
168
169void Thread_impl::SetPeriodUS(uint32_t period) {
[15]170 if (period == 0) {
171 self->Err("Period must be>0\n");
172 return;
173 }
[2]174
175#ifdef __XENO__
[15]176 int status = rt_task_set_periodic(&task_rt, TM_NOW, period * 1000);
[133]177 if (status != 0) {
178 char errorMsg[256];
179 self->Err("Error rt_task_set_periodic %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
180 }
[2]181#else
[250]182 if (period_set) {
183 next_time -= period;
184 next_time += period * 1000;
185 } else next_time=GetTime()+period*1000;
[2]186#endif
[15]187 this->period = period * 1000;
188 period_set = true;
[2]189}
190
[15]191uint32_t Thread_impl::GetPeriodUS() const { return this->period / 1000; }
[2]192
193void Thread_impl::SetPeriodMS(uint32_t period) {
[15]194 if (period == 0) {
195 self->Err("Period must be>0\n");
196 return;
197 }
[2]198
199#ifdef __XENO__
[15]200 int status = rt_task_set_periodic(&task_rt, TM_NOW, period * 1000 * 1000);
[133]201 if (status != 0) {
202 char errorMsg[256];
203 self->Err("Error rt_task_set_periodic %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
204 }
[2]205#else
[250]206 if (period_set) {
207 next_time -= period;
208 next_time += period * 1000 * 1000;
209 } else next_time=GetTime()+period*1000*1000;
[2]210#endif
[15]211 this->period = period * 1000 * 1000;
212 period_set = true;
[2]213}
214
[15]215uint32_t Thread_impl::GetPeriodMS() const { return this->period / 1000 / 1000; }
[2]216
[15]217void Thread_impl::WaitPeriod(void) {
218 if (period_set == false) {
219 self->Err("Period must be set befaore calling this method\n");
220 return;
221 }
[2]222#ifdef __XENO__
[15]223 unsigned long overruns_r;
224 int status = rt_task_wait_period(&overruns_r);
[133]225 if (status != 0) {
226 char errorMsg[256];
227 self->Err("Error rt_task_wait_period %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
228 }
[206]229//avoid spamming messages
230/*
[133]231 if (status == -ETIMEDOUT) {
[15]232 self->Err("overrun: %lld\n", overruns_r);
[206]233 }*/
[2]234#else
[15]235 self->SleepUntil(next_time);
[202]236 Time current = GetTime();
[206]237//avoid spamming messages
238 //if(current>next_time+period) self->Err("overrun of %lld\n", current-next_time);
[202]239 while (next_time < current) {
240 next_time += period;
241 }
[2]242#endif
[186]243 ComputeLatency(GetTime());
[2]244}
245
246void Thread_impl::Suspend(void) {
[15]247 if (isRunning == false) {
248 self->Err("thread is not started\n");
249 return;
250 }
[2]251
[15]252 cond->GetMutex(), is_suspended = true;
253 cond->CondWait();
254 is_suspended = false;
255 cond->ReleaseMutex();
[2]256}
257
258bool Thread_impl::SuspendUntil(Time date) {
[15]259 if (isRunning == false) {
260 self->Err("thread is not started\n");
261 return false;
262 }
[2]263
[15]264 cond->GetMutex(), is_suspended = true;
265 bool success = cond->CondWaitUntil(date);
266 is_suspended = false;
267 cond->ReleaseMutex();
268 return success;
[2]269}
270
[15]271bool Thread_impl::IsSuspended(void) {
272 bool result;
[2]273
[15]274 cond->GetMutex();
275 result = is_suspended;
276 cond->ReleaseMutex();
[2]277
[15]278 return result;
[2]279}
280
[15]281void Thread_impl::Resume(void) {
282 if (isRunning == false) {
283 self->Err("thread is not started\n");
284 return;
285 }
[2]286
[15]287 cond->GetMutex();
288 if (is_suspended == true) {
289 cond->CondSignal();
290 } else {
291 self->Err("thread is not suspended\n");
292 }
293 cond->ReleaseMutex();
[2]294}
295
[15]296int Thread_impl::WaitUpdate(const IODevice *device) {
297 int status = 0;
[2]298
[15]299 if (IsSuspended() == true) {
300 self->Err("thread is already supended\n");
301 status = -1;
302 } else {
303 cond->GetMutex();
304
305 if (device->pimpl_->SetToWake(self) == 0) {
306 is_suspended = true;
307 cond->CondWait();
308 is_suspended = false;
309 } else {
310 self->Err("%s is already waiting an update\n",
311 device->ObjectName().c_str());
312 status = -1;
[2]313 }
314
[15]315 cond->ReleaseMutex();
316 }
[2]317
[15]318 return status;
[2]319}
320
[15]321void Thread_impl::PrintStats(void) {
[2]322#ifdef __XENO__
[15]323 RT_TASK_INFO info;
[2]324
[15]325 int status = rt_task_inquire(NULL, &info);
326 if (status != 0) {
[133]327 char errorMsg[256];
328 self->Err("Error rt_task_inquire %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
[15]329 } else
[2]330#endif
[15]331 {
[2]332#ifndef __XENO__
[186]333 if(last!=0)
[2]334#endif
[213]335 {
[202]336 if(period_set) {Printf(" period (ns): %lld\n", period);}}
[2]337#ifdef __XENO__
[15]338 Printf(" number of context switches: %i\n", info.ctxswitches);
339 Printf(" number of primary->secondary mode switch: %i\n",
340 info.modeswitches);
341 // printf("number of page faults: %i\n",info.pagefaults);
342 Printf(" execution time (ms) in primary mode: %lld\n",
343 info.exectime / 1000000);
[2]344#else
345/*
346 struct rusage r_usage;
347 getrusage(RUSAGE_THREAD,&r_usage);
348 printf(" memory usage = %ld\n",r_usage.ru_maxrss);
[15]349 printf("RUSAGE :ru_utime => %lld [sec] : %lld [usec], :ru_stime => %lld
350 [sec] : %lld [usec] \n",
[2]351 (int64_t)r_usage.ru_utime.tv_sec, (int64_t)r_usage.ru_utime.tv_usec,
[15]352 (int64_t)r_usage.ru_stime.tv_sec,
353 (int64_t)r_usage.ru_stime.tv_usec);*/
[2]354#endif
[15]355 if (last != 0) {
[186]356 Printf(" min latency (ns): %lld\n", min_latency);
357 Printf(" max latency (ns): %lld\n", max_latency);
[250]358 if (cpt) {
359 Printf(" latency moy (ns): %lld\n", mean_latency / cpt);
360 Printf(" iterations: %lld\n", cpt);
361 }
[2]362 }
[15]363 }
[2]364}
365
[15]366void Thread_impl::Join(void) {
367 if (isRunning == true) {
368 int status;
[213]369
[2]370#ifdef __XENO__
[15]371 status = rt_task_join(&task_rt);
[2]372#else
[15]373 status = pthread_join(task_nrt, NULL);
[2]374#endif
[213]375 if (status != 0) {
376 char errorMsg[256];
[133]377 self->Err("error %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
[213]378 }
[15]379 isRunning = false;
380 }
[2]381}
382
[186]383void Thread_impl::ComputeLatency(Time time) {
[15]384 Time diff, delta;
385 diff = time - last;
[2]386
[15]387 if (diff >= period) {
388 delta = diff - period;
389 } else {
390 delta = period - diff;
391 }
392 // if(delta==0) rt_printf("%lld %lld\n",time,last);
393 last = time;
394 if (diff == time)
395 return;
[2]396
[186]397 if (delta > max_latency)
398 max_latency = delta;
399 if (delta < min_latency)
400 min_latency = delta;
401 mean_latency += delta;
[15]402 cpt++;
[2]403
[186]404 // Printf("Thread::%s latency moy (ns):
405 // %lld\n",self->ObjectName().c_str(),mean_latency/cpt);
[2]406}
407
[15]408void Thread_impl::SafeStop(void) {
409 tobestopped = true;
410 if (IsSuspended())
411 Resume();
[2]412}
413
[15]414bool Thread_impl::ToBeStopped(void) { return tobestopped; }
Note: See TracBrowser for help on using the repository browser.