Changeset 15 in flair-src for trunk/lib/FlairCore/src/Thread_impl.cpp
- Timestamp:
- 04/08/16 15:40:57 (8 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/FlairCore/src/Thread_impl.cpp
r2 r15 33 33 using namespace flair::core; 34 34 35 Thread_impl::Thread_impl(Thread* self,uint8_t priority) 36 { 37 isRunning=false; 38 tobestopped=false; 39 is_suspended=false; 40 period_set=false; 41 42 this->self=self; 43 cond=new ConditionVariable(self,self->ObjectName()); 44 45 if(priority<MIN_THREAD_PRIORITY) 46 { 47 priority=MIN_THREAD_PRIORITY; 48 // printf("Thread::Thread, %s: priority set to %i\n",self->ObjectName().c_str(), priority); 35 Thread_impl::Thread_impl(Thread *self, uint8_t priority) { 36 isRunning = false; 37 tobestopped = false; 38 is_suspended = false; 39 period_set = false; 40 41 this->self = self; 42 cond = new ConditionVariable(self, self->ObjectName()); 43 44 if (priority < MIN_THREAD_PRIORITY) { 45 priority = MIN_THREAD_PRIORITY; 46 // printf("Thread::Thread, %s: priority set to 47 // %i\n",self->ObjectName().c_str(), priority); 48 } 49 if (priority > MAX_THREAD_PRIORITY) { 50 priority = MAX_THREAD_PRIORITY; 51 self->Warn("priority set to %i\n", MAX_THREAD_PRIORITY); 52 } 53 54 this->priority = priority; 55 period = 100 * 1000 * 1000; // 100ms par defaut 56 min_jitter = 1000 * 1000 * 1000; 57 max_jitter = 0; 58 mean_jitter = 0; 59 last = 0; 60 cpt = 0; 61 } 62 63 Thread_impl::~Thread_impl() { 64 SafeStop(); 65 Join(); 66 } 67 68 void Thread_impl::Start(void) { 69 int status; 70 71 isRunning = true; 72 73 #ifdef __XENO__ 74 string th_name = 75 getFrameworkManager()->ObjectName() + "-" + self->ObjectName(); 76 77 #ifdef RT_STACK_SIZE 78 status = rt_task_create(&task_rt, th_name.c_str(), RT_STACK_SIZE, 79 (int)priority, T_JOINABLE); 80 #else 81 status = 82 rt_task_create(&task_rt, th_name.c_str(), 0, (int)priority, T_JOINABLE); 83 #endif // RT_STACK_SIZE 84 if (status != 0) { 85 self->Err("rt_task_create error (%s)\n", strerror(-status)); 86 } else { 87 //_printf("rt_task_create ok %s\n",th_name); 88 } 89 90 status = rt_task_start(&task_rt, &main_rt, (void *)this); 91 if (status != 0) { 92 self->Err("rt_task_start error (%s)\n", strerror(-status)); 93 } else { 94 //_printf("rt_task_start ok %s\n",th_name); 95 } 96 97 // Initialise the rt_print buffer for this task explicitly 98 rt_print_init(512, th_name.c_str()); 99 100 #else //__XENO__ 101 102 // Initialize thread creation attributes 103 pthread_attr_t attr; 104 if (pthread_attr_init(&attr) != 0) { 105 self->Err("Error pthread_attr_init\n"); 106 } 107 108 #ifdef NRT_STACK_SIZE 109 if (pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) { 110 self->Err("Error pthread_attr_setstacksize\n"); 111 } 112 #endif // NRT_STACK_SIZE 113 114 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) { 115 self->Err("Error pthread_attr_setinheritsched\n"); 116 } 117 118 if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) { 119 self->Err("Error pthread_attr_setschedpolicy\n"); 120 } 121 122 struct sched_param parm; 123 parm.sched_priority = priority; 124 if (pthread_attr_setschedparam(&attr, &parm) != 0) { 125 self->Err("Error pthread_attr_setschedparam\n"); 126 } 127 128 next_time = GetTime() + period; 129 130 if (pthread_create(&task_nrt, &attr, main_nrt, (void *)this) != 0) { 131 self->Err("pthread_create error\n"); 132 } 133 134 if (pthread_attr_destroy(&attr) != 0) { 135 self->Err("Error pthread_attr_destroy\n"); 136 } 137 138 #endif //__XENO__ 139 } 140 141 #ifdef __XENO__ 142 void Thread_impl::main_rt(void *arg) { 143 Thread_impl *caller = (Thread_impl *)arg; 144 145 // Perform auto-init of rt_print buffers if the task doesn't do so 146 rt_print_auto_init(1); 147 148 caller->self->Run(); 149 150 caller->PrintStats(); 151 } 152 #else 153 void *Thread_impl::main_nrt(void *arg) { 154 Thread_impl *caller = (Thread_impl *)arg; 155 156 caller->self->Run(); 157 158 caller->PrintStats(); 159 160 pthread_exit(0); 161 } 162 #endif //__XENO__ 163 164 void Thread_impl::SetPeriodUS(uint32_t period) { 165 if (period == 0) { 166 self->Err("Period must be>0\n"); 167 return; 168 } 169 170 #ifdef __XENO__ 171 int status = rt_task_set_periodic(&task_rt, TM_NOW, period * 1000); 172 if (status != 0) 173 self->Err("Error rt_task_set_periodic %s\n", strerror(-status)); 174 #else 175 next_time -= period; 176 next_time += period * 1000; 177 #endif 178 this->period = period * 1000; 179 period_set = true; 180 } 181 182 uint32_t Thread_impl::GetPeriodUS() const { return this->period / 1000; } 183 184 void Thread_impl::SetPeriodMS(uint32_t period) { 185 if (period == 0) { 186 self->Err("Period must be>0\n"); 187 return; 188 } 189 190 #ifdef __XENO__ 191 int status = rt_task_set_periodic(&task_rt, TM_NOW, period * 1000 * 1000); 192 if (status != 0) 193 self->Err("Error rt_task_set_periodic %s\n", strerror(-status)); 194 #else 195 next_time -= period; 196 next_time += period * 1000 * 1000; 197 #endif 198 this->period = period * 1000 * 1000; 199 period_set = true; 200 } 201 202 uint32_t Thread_impl::GetPeriodMS() const { return this->period / 1000 / 1000; } 203 204 void Thread_impl::WaitPeriod(void) { 205 if (period_set == false) { 206 self->Err("Period must be set befaore calling this method\n"); 207 return; 208 } 209 #ifdef __XENO__ 210 unsigned long overruns_r; 211 int status = rt_task_wait_period(&overruns_r); 212 if (status != 0) 213 self->Err("Error rt_task_wait_period %s\n", strerror(-status)); 214 if (status == -ETIMEDOUT) 215 self->Err("overrun: %lld\n", overruns_r); 216 #else 217 self->SleepUntil(next_time); 218 next_time += period; 219 #endif 220 ComputeJitter(GetTime()); 221 } 222 223 void Thread_impl::Suspend(void) { 224 if (isRunning == false) { 225 self->Err("thread is not started\n"); 226 return; 227 } 228 229 cond->GetMutex(), is_suspended = true; 230 cond->CondWait(); 231 is_suspended = false; 232 cond->ReleaseMutex(); 233 } 234 235 bool Thread_impl::SuspendUntil(Time date) { 236 if (isRunning == false) { 237 self->Err("thread is not started\n"); 238 return false; 239 } 240 241 cond->GetMutex(), is_suspended = true; 242 bool success = cond->CondWaitUntil(date); 243 is_suspended = false; 244 cond->ReleaseMutex(); 245 return success; 246 } 247 248 bool Thread_impl::IsSuspended(void) { 249 bool result; 250 251 cond->GetMutex(); 252 result = is_suspended; 253 cond->ReleaseMutex(); 254 255 return result; 256 } 257 258 void Thread_impl::Resume(void) { 259 if (isRunning == false) { 260 self->Err("thread is not started\n"); 261 return; 262 } 263 264 cond->GetMutex(); 265 if (is_suspended == true) { 266 cond->CondSignal(); 267 } else { 268 self->Err("thread is not suspended\n"); 269 } 270 cond->ReleaseMutex(); 271 } 272 273 int Thread_impl::WaitUpdate(const IODevice *device) { 274 int status = 0; 275 276 if (IsSuspended() == true) { 277 self->Err("thread is already supended\n"); 278 status = -1; 279 } else { 280 cond->GetMutex(); 281 282 if (device->pimpl_->SetToWake(self) == 0) { 283 is_suspended = true; 284 cond->CondWait(); 285 is_suspended = false; 286 } else { 287 self->Err("%s is already waiting an update\n", 288 device->ObjectName().c_str()); 289 status = -1; 49 290 } 50 if(priority>MAX_THREAD_PRIORITY) 51 { 52 priority=MAX_THREAD_PRIORITY; 53 self->Warn("priority set to %i\n",MAX_THREAD_PRIORITY); 54 } 55 56 this->priority=priority; 57 period=100*1000*1000;//100ms par defaut 58 min_jitter=1000*1000*1000; 59 max_jitter=0; 60 mean_jitter=0; 61 last=0; 62 cpt=0; 63 } 64 65 Thread_impl::~Thread_impl() 66 { 67 SafeStop(); 68 Join(); 69 } 70 71 void Thread_impl::Start(void) 72 { 73 int status; 74 75 isRunning=true; 76 77 #ifdef __XENO__ 78 string th_name=getFrameworkManager()->ObjectName()+ "-" + self->ObjectName(); 79 80 #ifdef RT_STACK_SIZE 81 status=rt_task_create(&task_rt, th_name.c_str(), RT_STACK_SIZE, (int)priority, T_JOINABLE); 82 #else 83 status=rt_task_create(&task_rt, th_name.c_str(), 0,(int)priority, T_JOINABLE); 84 #endif //RT_STACK_SIZE 85 if(status!=0) 86 { 87 self->Err("rt_task_create error (%s)\n",strerror(-status)); 88 } 89 else 90 { 91 //_printf("rt_task_create ok %s\n",th_name); 92 } 93 94 status=rt_task_start(&task_rt, &main_rt, (void*)this); 95 if(status!=0) 96 { 97 self->Err("rt_task_start error (%s)\n",strerror(-status)); 98 } 99 else 100 { 101 //_printf("rt_task_start ok %s\n",th_name); 102 } 103 104 // Initialise the rt_print buffer for this task explicitly 105 rt_print_init(512, th_name.c_str()); 106 107 #else //__XENO__ 108 109 // Initialize thread creation attributes 110 pthread_attr_t attr; 111 if(pthread_attr_init(&attr) != 0) 112 { 113 self->Err("Error pthread_attr_init\n"); 114 } 115 116 #ifdef NRT_STACK_SIZE 117 if(pthread_attr_setstacksize(&attr, NRT_STACK_SIZE) != 0) 118 { 119 self->Err("Error pthread_attr_setstacksize\n"); 120 } 121 #endif //NRT_STACK_SIZE 122 123 if(pthread_attr_setinheritsched(&attr,PTHREAD_EXPLICIT_SCHED)!=0) 124 { 125 self->Err("Error pthread_attr_setinheritsched\n"); 126 } 127 128 if(pthread_attr_setschedpolicy(&attr, SCHED_FIFO)!=0) 129 { 130 self->Err("Error pthread_attr_setschedpolicy\n"); 131 } 132 133 struct sched_param parm; 134 parm.sched_priority = priority; 135 if(pthread_attr_setschedparam(&attr,&parm)!=0) 136 { 137 self->Err("Error pthread_attr_setschedparam\n"); 138 } 139 140 next_time=GetTime()+period; 141 142 if(pthread_create(&task_nrt, &attr, main_nrt, (void*)this) != 0) 143 { 144 self->Err("pthread_create error\n"); 145 } 146 147 if(pthread_attr_destroy(&attr) != 0) 148 { 149 self->Err("Error pthread_attr_destroy\n"); 150 } 151 152 #endif //__XENO__ 153 } 154 155 #ifdef __XENO__ 156 void Thread_impl::main_rt(void * arg) 157 { 158 Thread_impl *caller = (Thread_impl*)arg; 159 160 // Perform auto-init of rt_print buffers if the task doesn't do so 161 rt_print_auto_init(1); 162 163 caller->self->Run(); 164 165 caller->PrintStats(); 166 } 167 #else 168 void* Thread_impl::main_nrt(void * arg) 169 { 170 Thread_impl *caller = (Thread_impl*)arg; 171 172 caller->self->Run(); 173 174 caller->PrintStats(); 175 176 pthread_exit(0); 177 } 178 #endif //__XENO__ 179 180 void Thread_impl::SetPeriodUS(uint32_t period) { 181 if(period==0) { 182 self->Err("Period must be>0\n"); 183 return; 184 } 185 186 #ifdef __XENO__ 187 int status=rt_task_set_periodic(&task_rt, TM_NOW, period*1000); 188 if(status!=0) self->Err("Error rt_task_set_periodic %s\n",strerror(-status)); 189 #else 190 next_time-=period; 191 next_time+=period*1000; 192 #endif 193 this->period=period*1000; 194 period_set=true; 195 } 196 197 uint32_t Thread_impl::GetPeriodUS() const { 198 return this->period/1000; 199 } 200 201 void Thread_impl::SetPeriodMS(uint32_t period) { 202 if(period==0) { 203 self->Err("Period must be>0\n"); 204 return; 205 } 206 207 #ifdef __XENO__ 208 int status=rt_task_set_periodic(&task_rt, TM_NOW, period*1000*1000); 209 if(status!=0) self->Err("Error rt_task_set_periodic %s\n",strerror(-status)); 210 #else 211 next_time-=period; 212 next_time+=period*1000*1000; 213 #endif 214 this->period=period*1000*1000; 215 period_set=true; 216 } 217 218 uint32_t Thread_impl::GetPeriodMS() const { 219 return this->period/1000/1000; 220 } 221 222 void Thread_impl::WaitPeriod(void) 223 { 224 if(period_set==false) 225 { 226 self->Err("Period must be set befaore calling this method\n"); 227 return; 228 } 229 #ifdef __XENO__ 230 unsigned long overruns_r; 231 int status=rt_task_wait_period(&overruns_r); 232 if(status!=0) self->Err("Error rt_task_wait_period %s\n",strerror(-status)); 233 if(status==-ETIMEDOUT) self->Err("overrun: %lld\n",overruns_r); 234 #else 235 self->SleepUntil(next_time); 236 next_time+=period; 237 #endif 238 ComputeJitter(GetTime()); 239 } 240 241 242 void Thread_impl::Suspend(void) { 243 if(isRunning==false) { 244 self->Err("thread is not started\n"); 245 return; 246 } 247 248 cond->GetMutex(), 249 is_suspended=true; 250 cond->CondWait(); 251 is_suspended=false; 291 252 292 cond->ReleaseMutex(); 253 } 254 255 bool Thread_impl::SuspendUntil(Time date) { 256 if(isRunning==false) { 257 self->Err("thread is not started\n"); 258 return false; 259 } 260 261 cond->GetMutex(), 262 is_suspended=true; 263 bool success=cond->CondWaitUntil(date); 264 is_suspended=false; 265 cond->ReleaseMutex(); 266 return success; 267 } 268 269 bool Thread_impl::IsSuspended(void) 270 { 271 bool result; 272 273 cond->GetMutex(); 274 result=is_suspended; 275 cond->ReleaseMutex(); 276 277 return result; 278 } 279 280 void Thread_impl::Resume(void) 281 { 282 if(isRunning==false) 283 { 284 self->Err("thread is not started\n"); 285 return; 286 } 287 288 cond->GetMutex(); 289 if(is_suspended==true) 290 { 291 cond->CondSignal(); 292 } 293 else 294 { 295 self->Err("thread is not suspended\n"); 296 } 297 cond->ReleaseMutex(); 298 } 299 300 int Thread_impl::WaitUpdate(const IODevice* device) 301 { 302 int status=0; 303 304 if(IsSuspended()==true) 305 { 306 self->Err("thread is already supended\n"); 307 status=-1; 308 } 309 else 310 { 311 cond->GetMutex(); 312 313 if(device->pimpl_->SetToWake(self)==0) 314 { 315 is_suspended=true; 316 cond->CondWait(); 317 is_suspended=false; 318 } 319 else 320 { 321 self->Err("%s is already waiting an update\n",device->ObjectName().c_str()); 322 status=-1; 323 } 324 325 cond->ReleaseMutex(); 326 } 327 328 return status; 329 } 330 331 void Thread_impl::PrintStats(void) 332 { 333 #ifdef __XENO__ 334 RT_TASK_INFO info; 335 336 int status=rt_task_inquire(NULL, &info); 337 if(status!=0) 338 { 339 self->Err("Error rt_task_inquire %s\n",strerror(-status)); 340 } 341 else 342 #endif 343 { 293 } 294 295 return status; 296 } 297 298 void Thread_impl::PrintStats(void) { 299 #ifdef __XENO__ 300 RT_TASK_INFO info; 301 302 int status = rt_task_inquire(NULL, &info); 303 if (status != 0) { 304 self->Err("Error rt_task_inquire %s\n", strerror(-status)); 305 } else 306 #endif 307 { 344 308 #ifndef __XENO__ 345 //if(last!=0)346 #endif 347 {348 Printf("Thread::%s :\n",self->ObjectName().c_str()); 349 }350 #ifdef __XENO__ 351 Printf(" number of context switches: %i\n",info.ctxswitches);352 Printf(" number of primary->secondary mode switch: %i\n",info.modeswitches);353 //printf("number of page faults: %i\n",info.pagefaults);354 Printf(" execution time (ms) in primary mode: %lld\n",info.exectime/1000000);309 // if(last!=0) 310 #endif 311 { Printf("Thread::%s :\n", self->ObjectName().c_str()); } 312 #ifdef __XENO__ 313 Printf(" number of context switches: %i\n", info.ctxswitches); 314 Printf(" number of primary->secondary mode switch: %i\n", 315 info.modeswitches); 316 // printf("number of page faults: %i\n",info.pagefaults); 317 Printf(" execution time (ms) in primary mode: %lld\n", 318 info.exectime / 1000000); 355 319 #else 356 320 /* … … 358 322 getrusage(RUSAGE_THREAD,&r_usage); 359 323 printf(" memory usage = %ld\n",r_usage.ru_maxrss); 360 printf("RUSAGE :ru_utime => %lld [sec] : %lld [usec], :ru_stime => %lld [sec] : %lld [usec] \n", 324 printf("RUSAGE :ru_utime => %lld [sec] : %lld [usec], :ru_stime => %lld 325 [sec] : %lld [usec] \n", 361 326 (int64_t)r_usage.ru_utime.tv_sec, (int64_t)r_usage.ru_utime.tv_usec, 362 (int64_t)r_usage.ru_stime.tv_sec, (int64_t)r_usage.ru_stime.tv_usec);*/ 363 #endif 364 if(last!=0) 365 { 366 Printf(" min jitter (ns): %lld\n",min_jitter); 367 Printf(" max jitter (ns): %lld\n",max_jitter); 368 Printf(" jitter moy (ns): %lld\n",mean_jitter/cpt); 369 Printf(" itertions: %lld\n",cpt); 370 } 327 (int64_t)r_usage.ru_stime.tv_sec, 328 (int64_t)r_usage.ru_stime.tv_usec);*/ 329 #endif 330 if (last != 0) { 331 Printf(" min jitter (ns): %lld\n", min_jitter); 332 Printf(" max jitter (ns): %lld\n", max_jitter); 333 Printf(" jitter moy (ns): %lld\n", mean_jitter / cpt); 334 Printf(" itertions: %lld\n", cpt); 371 335 } 372 } 373 374 void Thread_impl::Join(void) 375 { 376 if(isRunning==true) 377 { 378 int status; 379 380 #ifdef __XENO__ 381 status=rt_task_join(&task_rt); 382 #else 383 status=pthread_join(task_nrt,NULL); 384 #endif 385 if(status!=0) self->Err("error %s\n",strerror(-status)); 386 isRunning=false; 387 } 388 } 389 390 void Thread_impl::ComputeJitter(Time time) 391 { 392 Time diff,delta; 393 diff=time-last; 394 395 if(diff>=period) 396 { 397 delta=diff-period; 398 } 399 else 400 { 401 delta=period-diff; 402 } 403 //if(delta==0) rt_printf("%lld %lld\n",time,last); 404 last=time; 405 if(diff==time) return; 406 407 if(delta>max_jitter) max_jitter=delta; 408 if(delta<min_jitter) min_jitter=delta; 409 mean_jitter+=delta; 410 cpt++; 411 412 //Printf("Thread::%s jitter moy (ns): %lld\n",self->ObjectName().c_str(),mean_jitter/cpt); 413 414 } 415 416 void Thread_impl::SafeStop(void) 417 { 418 tobestopped=true; 419 if(IsSuspended()) Resume(); 420 } 421 422 bool Thread_impl::ToBeStopped(void) 423 { 424 return tobestopped; 425 } 336 } 337 } 338 339 void Thread_impl::Join(void) { 340 if (isRunning == true) { 341 int status; 342 343 #ifdef __XENO__ 344 status = rt_task_join(&task_rt); 345 #else 346 status = pthread_join(task_nrt, NULL); 347 #endif 348 if (status != 0) 349 self->Err("error %s\n", strerror(-status)); 350 isRunning = false; 351 } 352 } 353 354 void Thread_impl::ComputeJitter(Time time) { 355 Time diff, delta; 356 diff = time - last; 357 358 if (diff >= period) { 359 delta = diff - period; 360 } else { 361 delta = period - diff; 362 } 363 // if(delta==0) rt_printf("%lld %lld\n",time,last); 364 last = time; 365 if (diff == time) 366 return; 367 368 if (delta > max_jitter) 369 max_jitter = delta; 370 if (delta < min_jitter) 371 min_jitter = delta; 372 mean_jitter += delta; 373 cpt++; 374 375 // Printf("Thread::%s jitter moy (ns): 376 // %lld\n",self->ObjectName().c_str(),mean_jitter/cpt); 377 } 378 379 void Thread_impl::SafeStop(void) { 380 tobestopped = true; 381 if (IsSuspended()) 382 Resume(); 383 } 384 385 bool Thread_impl::ToBeStopped(void) { return tobestopped; }
Note:
See TracChangeset
for help on using the changeset viewer.