source: flair-src/trunk/lib/FlairCore/src/Thread_impl.cpp

Last change on this file was 330, checked in by Sanahuja Guillaume, 2 years ago

use less bandwidth in vprnlite

File size: 9.9 KB
Line 
1// %flair:license{
2// This file is part of the Flair framework distributed under the
3// CECILL-C License, Version 1.0.
4// %flair:license}
5//  created:    2012/10/04
6//  filename:   Thread_impl.cpp
7//
8//  author:     Guillaume Sanahuja
9//              Copyright Heudiasyc UMR UTC/CNRS 7253
10//
11//  version:    $Id: $
12//
13//  purpose:    classe implementant un thread rt ou non
14//
15//
16/*********************************************************************/
17
18#include "Thread_impl.h"
19#include "Thread.h"
20#include "IODevice.h"
21#include "IODevice_impl.h"
22#include "ConditionVariable.h"
23#include "config.h"
24#include "FrameworkManager.h"
25#include <string.h>
26#ifdef __XENO__
27#include <rtdk.h>
28#define TH_NAME getFrameworkManager()->ObjectName() + "-" + self->ObjectName()
29#else//__XENO__
30#include <sys/resource.h>
31#include <unistd.h>
32#include <sys/syscall.h>
33#endif//__XENO__
34
35using std::string;
36using namespace flair::core;
37
38Thread_impl::Thread_impl(Thread *self, uint8_t priority,uint32_t stackSize) {
39  isRunning = false;
40  tobestopped = false;
41  is_suspended = false;
42  period_set = false;
43  this->stackSize = stackSize;
44
45  this->self = self;
46  cond = new ConditionVariable(self, self->ObjectName());
47
48  if (priority < MIN_THREAD_PRIORITY) {
49    priority = MIN_THREAD_PRIORITY;
50    //        printf("Thread::Thread, %s: priority set to
51    //        %i\n",self->ObjectName().c_str(), priority);
52  }
53  if (priority > MAX_THREAD_PRIORITY) {
54    priority = MAX_THREAD_PRIORITY;
55    self->Warn("priority set to %i\n", MAX_THREAD_PRIORITY);
56  }
57
58  this->priority = priority;
59  period = 100 * 1000 * 1000; // 100ms par defaut
60  min_latency = 1000 * 1000 * 1000;
61  max_latency = 0;
62  mean_latency = 0;
63  last = 0;
64  cpt = 0;
65
66#ifdef __XENO__
67  // Perform auto-init of rt_print buffers if the task doesn't do so
68  rt_print_auto_init(1);
69    // Initialise the rt_print buffer for this task explicitly
70  rt_print_init(512, (TH_NAME).c_str());
71#endif //__XENO__
72}
73
74Thread_impl::~Thread_impl() {
75  SafeStop();
76  Join();
77}
78
79void Thread_impl::Start(void) {
80  int status;
81  isRunning = true;
82  tobestopped = false;
83  is_suspended = false;
84
85 Printf("Starting thread %s (priority=%i, stack size=%i bytes)\n",self->ObjectName().c_str(), priority,stackSize);
86#ifdef __XENO__
87  string th_name =TH_NAME;
88
89  status = rt_task_create(&task_rt, th_name.c_str(), stackSize,
90                          (int)priority, T_JOINABLE);
91
92  if (status != 0) {
93    char errorMsg[256];
94    self->Err("rt_task_create error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
95  } else {
96    //_printf("rt_task_create ok %s\n",th_name);
97  }
98
99  status = rt_task_start(&task_rt, &main_rt, (void *)this);
100  if (status != 0) {
101    char errorMsg[256];
102    self->Err("rt_task_start error (%s)\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
103  } else {
104    //_printf("rt_task_start ok %s\n",th_name);
105  }
106
107#else //__XENO__
108
109  // Initialize thread creation attributes
110  pthread_attr_t attr;
111  if (pthread_attr_init(&attr) != 0) {
112    self->Err("Error pthread_attr_init\n");
113  }
114
115  if (pthread_attr_setstacksize(&attr, stackSize) != 0) {
116    self->Err("Error pthread_attr_setstacksize\n");
117  }
118
119  if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
120    self->Err("Error pthread_attr_setinheritsched\n");
121  }
122
123  if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) {
124    self->Err("Error pthread_attr_setschedpolicy\n");
125  }
126
127  struct sched_param parm;
128  parm.sched_priority = priority;
129  if (pthread_attr_setschedparam(&attr, &parm) != 0) {
130    self->Err("Error pthread_attr_setschedparam\n");
131  }
132
133  next_time = GetTime() + period;
134
135  if (pthread_create(&task_nrt, &attr, main_nrt, (void *)this) != 0) {
136    self->Err("pthread_create error\n");
137  }
138
139  if (pthread_attr_destroy(&attr) != 0) {
140    self->Err("Error pthread_attr_destroy\n");
141  }
142
143#endif //__XENO__
144}
145
146#ifdef __XENO__
147void Thread_impl::main_rt(void *arg) {
148  Thread_impl *caller = (Thread_impl *)arg;
149
150  caller->self->Run();
151  Printf("Stopping thread %s\n", caller->self->ObjectName().c_str()); 
152  caller->PrintStats();
153}
154#else
155void* Thread_impl::main_nrt(void * arg)
156{
157    Thread_impl *caller = (Thread_impl*)arg;
158/*string th_name=getFrameworkManager()->ObjectName()+ "-" + caller->self->ObjectName();
159caller->self->Info("pthread '%s' created with TID %x\n",th_name.c_str(),(pid_t)syscall(SYS_gettid));*/
160
161  caller->self->Run();
162  Printf("Stopping thread %s\n", caller->self->ObjectName().c_str()); 
163  caller->PrintStats();
164
165  pthread_exit(0);
166}
167#endif //__XENO__
168
169void Thread_impl::SetPeriodUS(uint32_t period) {
170  if (period == 0) {
171    self->Err("Period must be>0\n");
172    return;
173  }
174
175#ifdef __XENO__
176  int status = rt_task_set_periodic(&task_rt, TM_NOW, (Time)period * (Time)1000);
177  if (status != 0) {
178                char errorMsg[256];
179    self->Err("Error rt_task_set_periodic %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
180        }
181#else
182  if (period_set) {
183    next_time -= (Time)period;
184    next_time += (Time)period * (Time)1000;
185  } else next_time=GetTime()+(Time)period*(Time)1000;
186#endif
187  this->period = (Time)period * (Time)1000;
188  period_set = true;
189}
190
191uint32_t Thread_impl::GetPeriodUS() const { return this->period / 1000; }
192
193void Thread_impl::SetPeriodMS(uint32_t period) {
194  SetPeriodUS(period*1000);
195}
196
197uint32_t Thread_impl::GetPeriodMS() const { return this->period / 1000 / 1000; }
198
199void Thread_impl::WaitPeriod(void) {
200  if (period_set == false) {
201    self->Err("Period must be set befaore calling this method\n");
202    return;
203  }
204#ifdef __XENO__
205  unsigned long overruns_r;
206  int status = rt_task_wait_period(&overruns_r);
207  if (status != 0) {
208                char errorMsg[256];
209    self->Err("Error rt_task_wait_period %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
210        }
211//avoid spamming messages
212/*
213  if (status == -ETIMEDOUT) {
214    self->Err("overrun: %lld\n", overruns_r);
215        }*/
216#else
217  self->SleepUntil(next_time);
218  Time current = GetTime();
219//avoid spamming messages
220  //if(current>next_time+period) self->Err("overrun of %lld\n", current-next_time);
221  while (next_time < current) {
222    next_time += period;
223  }
224#endif
225  ComputeLatency(GetTime());
226}
227
228void Thread_impl::Suspend(void) {
229  if (isRunning == false) {
230    self->Err("thread is not started\n");
231    return;
232  }
233
234  cond->GetMutex(), is_suspended = true;
235  cond->CondWait();
236  is_suspended = false;
237  cond->ReleaseMutex();
238}
239
240bool Thread_impl::SuspendUntil(Time date) {
241  if (isRunning == false) {
242    self->Err("thread is not started\n");
243    return false;
244  }
245
246  cond->GetMutex(), is_suspended = true;
247  bool success = cond->CondWaitUntil(date);
248  is_suspended = false;
249  cond->ReleaseMutex();
250  return success;
251}
252
253bool Thread_impl::IsSuspended(void) {
254  bool result;
255
256  cond->GetMutex();
257  result = is_suspended;
258  cond->ReleaseMutex();
259
260  return result;
261}
262
263void Thread_impl::Resume(void) {
264  if (isRunning == false) {
265    self->Err("thread is not started\n");
266    return;
267  }
268
269  cond->GetMutex();
270  if (is_suspended == true) {
271    cond->CondSignal();
272  } else {
273    self->Err("thread is not suspended\n");
274  }
275  cond->ReleaseMutex();
276}
277
278bool Thread_impl::WaitUpdate(const IODevice *device,Time timeout) {
279  bool status = true;
280
281  if (IsSuspended() == true) {
282    self->Err("thread is already supended\n");
283    status = false;
284  } else {
285    cond->GetMutex();
286
287    if (device->pimpl_->SetToWake(self) == 0) {
288      is_suspended = true;
289      status=cond->CondWait(timeout);
290      if(status==false) device->pimpl_->SetToWake(NULL);//condwait timedout
291      is_suspended = false;
292    } else {
293      self->Err("%s is already waiting an update\n", device->ObjectName().c_str());
294      status = false;
295    }
296
297    cond->ReleaseMutex();
298  }
299
300  return status;
301}
302
303void Thread_impl::PrintStats(void) {
304#ifdef __XENO__
305  RT_TASK_INFO info;
306
307  int status = rt_task_inquire(NULL, &info);
308  if (status != 0) {
309                char errorMsg[256];
310    self->Err("Error rt_task_inquire %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
311  } else
312#endif
313  {
314#ifndef __XENO__
315  if(last!=0)
316#endif
317    {
318    if(period_set) {Printf("    period (ns): %lld\n", period);}}
319#ifdef __XENO__
320    Printf("    number of context switches: %i\n", info.ctxswitches);
321    Printf("    number of primary->secondary mode switch: %i\n",
322           info.modeswitches);
323    // printf("number of page faults: %i\n",info.pagefaults);
324    Printf("    execution time (ms) in primary mode: %lld\n",
325           info.exectime / 1000000);
326#else
327/*
328        struct rusage r_usage;
329        getrusage(RUSAGE_THREAD,&r_usage);
330        printf("    memory usage = %ld\n",r_usage.ru_maxrss);
331        printf("RUSAGE :ru_utime => %lld [sec] : %lld [usec], :ru_stime => %lld
332   [sec] : %lld [usec] \n",
333           (int64_t)r_usage.ru_utime.tv_sec, (int64_t)r_usage.ru_utime.tv_usec,
334           (int64_t)r_usage.ru_stime.tv_sec,
335   (int64_t)r_usage.ru_stime.tv_usec);*/
336#endif
337    if (last != 0) {
338      Printf("    min latency (ns): %lld\n", min_latency);
339      Printf("    max latency (ns): %lld\n", max_latency);
340      if (cpt) {
341        Printf("    latency moy (ns): %lld\n", mean_latency / cpt);
342        Printf("    iterations: %lld\n", cpt);
343      }
344    }
345  }
346}
347
348void Thread_impl::Join(void) {
349  if (isRunning == true) {
350    int status;
351 
352#ifdef __XENO__
353    status = rt_task_join(&task_rt);
354#else
355    status = pthread_join(task_nrt, NULL);
356#endif
357    if (status != 0) {
358      char errorMsg[256];
359      self->Err("error %s\n", strerror_r(-status, errorMsg, sizeof(errorMsg)));
360    }
361    isRunning = false;
362  }
363}
364
365void Thread_impl::ComputeLatency(Time time) {
366  Time diff, delta;
367  diff = time - last;
368
369  if (diff >= period) {
370    delta = diff - period;
371  } else {
372    delta = period - diff;
373  }
374  // if(delta==0) rt_printf("%lld %lld\n",time,last);
375  last = time;
376  if (diff == time)
377    return;
378
379  if (delta > max_latency)
380    max_latency = delta;
381  if (delta < min_latency)
382    min_latency = delta;
383  mean_latency += delta;
384  cpt++;
385
386  // Printf("Thread::%s latency moy (ns):
387  // %lld\n",self->ObjectName().c_str(),mean_latency/cpt);
388}
389
390void Thread_impl::SafeStop(void) {
391  tobestopped = true;
392  if (IsSuspended())
393    Resume();
394}
395
396bool Thread_impl::ToBeStopped(void) { return tobestopped; }
Note: See TracBrowser for help on using the repository browser.