source: pacpusframework/trunk/include/Pacpus/kernel/InputOutputInterface.h@ 336

Last change on this file since 336 was 293, checked in by Marek Kurdej, 11 years ago

REVERTED: Reworked threading in InputInterfaceBase, each slot has its own processing thread.

  • Property svn:executable set to *
File size: 8.9 KB
RevLine 
[89]1#ifndef IN_OUT_INTERFACE_H
2#define IN_OUT_INTERFACE_H
3
[199]4#include <Pacpus/kernel/InputOutputBase.h>
[95]5#include <Pacpus/kernel/Log.h>
[199]6
[273]7#include <boost/function.hpp>
[153]8#include <QByteArray>
[200]9#include <QCoreApplication>
[293]10//#include <QThread>
[110]11#include <typeinfo>
[89]12
[287]13namespace pacpus
14{
[89]15
[152]16template <typename T, class C>
[148]17class InputInterface
18 : public InputInterfaceBase
[89]19{
20public:
[199]21 typedef T data_type;
22 typedef C component_type;
[269]23 typedef void (C::*process_data_function_type_with_event)(T&, PacpusEvent);
24 typedef void (C::*process_data_function_type)(T&);
25 typedef void (C::*process_const_data_function_type_with_event)(T const&, PacpusEvent);
26 typedef void (C::*process_const_data_function_type)(T const&);
[273]27 //typedef boost::function<void (C*, T&, PacpusEvent)> process_data_function_type_with_event;
28 //typedef boost::function<void (C*, T&)> process_data_function_type;
29 //typedef boost::function<void (C*, T const&, PacpusEvent)> process_const_data_function_type_with_event;
30 //typedef boost::function<void (C*, T const&)> process_const_data_function_type;
[199]31
[287]32 void initialize()
33 {
34 // each input slot will be processed by a separate thread
[293]35 //moveToThread(&mProcessingThread);
36 //mProcessingThread.start();
[287]37 }
38
[269]39 InputInterface(QString name, C* component, process_data_function_type_with_event processingMethod)
[152]40 : InputInterfaceBase(name, component, component)
[206]41 {
[269]42 mProcessingMethod.fe = processingMethod;
[287]43 initialize();
[206]44 }
[269]45
46 InputInterface(QString name, C* component, process_data_function_type processingMethod)
47 : InputInterfaceBase(name, component, component)
48 {
49 mProcessingMethod.f = processingMethod;
[287]50 initialize();
[269]51 }
52
53 InputInterface(QString name, C* component, process_const_data_function_type_with_event processingMethod)
54 : InputInterfaceBase(name, component, component)
55 {
56 mProcessingMethod.cfe = processingMethod;
[287]57 initialize();
[269]58 }
59
60 InputInterface(QString name, C* component, process_const_data_function_type processingMethod)
61 : InputInterfaceBase(name, component, component)
62 {
63 mProcessingMethod.cf = processingMethod;
[287]64 initialize();
[269]65 }
[89]66
[148]67 ~InputInterface()
[206]68 {
69 }
[89]70
[199]71 std::size_t getDataSize() const
[148]72 {
73 return sizeof(T);
74 }
[89]75
[269]76 std::type_info const& getDataType() const
[148]77 {
[199]78 return typeid(T);
[148]79 }
[96]80
[199]81 // FIXME: what's the purpose of this function?
[206]82 //PacpusEvent * getEventTemplate()
83 //{
84 // return new PacpusTypedEvent<T>(TYPED_EVENT);
85 //}
[148]86
[199]87 // FIXME: what's the purpose of this function?
[269]88 void customEvent(QEvent* event)
[148]89 {
[269]90 static road_timerange_t const kMaximumBoundedTimeRange = 500;
91
92 if (!event) {
93 LOG_WARN("null event");
94 return;
95 }
96
[206]97 // check that component has been started
98 if ((NULL == getComponent()) || (!getComponent()->isActive())) {
99 LOG_DEBUG("component is not active");
100 return;
101 }
102
[258]103 LOG_TRACE("Receiver: " << getSignature());
[206]104
[269]105 //PacpusTypedEvent<T>* typedEvent = dynamic_cast<PacpusTypedEvent<T> *>(event);
106 PacpusEvent* pacpusEvent = dynamic_cast<PacpusEvent*>(event);
[206]107 if (!pacpusEvent) {
108 LOG_WARN("dynamic_cast failed: not a PacpusEvent");
109 return;
110 }
[269]111 PacpusTypedEvent<T>* typedEvent = dynamic_cast<PacpusTypedEvent<T>*>(pacpusEvent);
[206]112 if (!typedEvent) {
113 LOG_WARN("dynamic_cast failed: incompatible event types");
114 return;
115 }
116
[89]117 switch (event->type()) {
[148]118 case TYPED_EVENT:
[269]119 if (TimeBounded == readingMode() && typedEvent->timerange() < kMaximumBoundedTimeRange) {
120 LOG_WARN("Incorrect TimeRange (< " << kMaximumBoundedTimeRange << "), switching to NeverSkip mode");
[153]121 readingMode() = NeverSkip;
122 }
[110]123
[148]124 switch (readingMode()) {
[110]125 case TimeBounded:
[148]126 if (road_time() - typedEvent->time() > typedEvent->timerange()) {
[206]127 LOG_TRACE("Data skipped, receiver: " << this->getSignature());
[148]128 break;
129 }
[110]130
[269]131 mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent);
[89]132 break;
[110]133
134 case GetLast:
[269]135 mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent);
136
[153]137 // delete all remaining events
138 QCoreApplication::removePostedEvents(this, TYPED_EVENT);
[110]139 break;
140
141 case NeverSkip:
[269]142 mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent);
[159]143 break;
[110]144
145 default:
[159]146 LOG_WARN("Unknown reading mode " << readingMode());
[206]147 break;
[89]148 }
[110]149 break;
[89]150
[184]151 // Add here new event type if needed
[89]152
[110]153 default:
[153]154 LOG_WARN("Unknown event ID " << event->type());
[110]155 break;
[89]156 }
[206]157 event->accept();
[110]158 }
[89]159
[269]160 // TODO: pull mode (NOT YET IMPLEMENTED!!!)
161 T& getData()
[206]162 {
[110]163 T data;
[269]164 // TODO: ask the output data;
[110]165 return data;
[89]166 }
167
168protected:
[269]169 struct ProcessingMethod
170 {
171 ProcessingMethod()
172 {
173 fe = NULL;
174 f = NULL;
175 cfe = NULL;
176 cf = NULL;
177 }
178
179 void invoke(ComponentBase* component, T& data, PacpusEvent event)
180 {
181 C* comp = dynamic_cast<C*>(component);
182 if (NULL == comp) {
183 LOG_WARN("NULL component");
184 return;
185 }
[273]186 //if (fe) {
187 // fe(comp, data, event);
188 //} else if (f) {
189 // f(comp, data);
190 //} else if (cfe) {
191 // cfe(comp, data, event);
192 //} else if (cf) {
193 // cf(comp, data);
194 //} else {
195 // LOG_WARN("no method to invoke");
196 //}
[269]197 if (fe) {
198 (comp->*fe)(data, event);
199 } else if (f) {
200 (comp->*f)(data);
201 } else if (cfe) {
202 (comp->*cfe)(data, event);
203 } else if (cf) {
204 (comp->*cf)(data);
205 } else {
206 LOG_WARN("no method to invoke");
207 }
208 }
209
210 process_data_function_type_with_event fe;
211 process_data_function_type f;
212 process_const_data_function_type_with_event cfe;
213 process_const_data_function_type cf;
214 };
215
216 ProcessingMethod mProcessingMethod;
[293]217 //QThread mProcessingThread;
[89]218};
219
[152]220template <typename T, class C>
[185]221class OutputInterface
222 : public OutputInterfaceBase
[89]223{
224public:
[199]225 typedef T data_type;
226 typedef C component_type;
227
[269]228 OutputInterface(QString name, C* component)
[185]229 : OutputInterfaceBase(name, component, component)
[184]230 {}
[89]231
[199]232 ~OutputInterface()
233 {}
234
[206]235 /// Send data through a typed output
[269]236 void send(T const& data, road_time_t t = road_time(), road_timerange_t tr = 0);
[184]237
[199]238 std::size_t getDataSize() const
[148]239 {
240 return sizeof(T);
241 }
242
[269]243 std::type_info const& getDataType() const
[148]244 {
[199]245 return typeid(T);
[148]246 }
[89]247};
248
[185]249template <typename T, class C>
[269]250void OutputInterface<T, C>::send(T const& data, road_time_t t, road_timerange_t tr)
[185]251{
[269]252 // FIXME: use shared data
[185]253 //QSharedPointer<T> sharedPointer = new T(data);
254
[200]255 for (QList<ConnectionBase>::iterator it = connections().begin(), itend = connections().end(); it != itend; ++it) {
[269]256 // Qt documentation:
[202]257 // The event must be allocated on the heap since the post event queue will take ownership of the event and delete it once it has been posted.
258 // It is not safe to access the event after it has been posted.
[269]259 QEvent* pacpusEvent = new PacpusTypedEvent<T>(TYPED_EVENT, data, t, tr);
[206]260 QCoreApplication::postEvent(
261 it->getInterface(),
[269]262 pacpusEvent,
[206]263 it->getPriority()
264 );
[290]265 LOG_TRACE("Sending from " << getSignature() << " to " << it->getInterface()->getSignature());
[185]266 }
267}
268
[202]269template <typename T1, typename T2, class C>
[269]270bool checkedSend(OutputInterface<T1, C>* sender, T2 const& data, road_time_t t = road_time(), road_timerange_t tr = 0);
[202]271
272template <typename T1, typename T2, class C>
[269]273bool checkedSend(OutputInterface<T1, C>* sender, T2 const& data, road_time_t t, road_timerange_t tr)
[202]274{
275 if (sender && sender->hasConnection()) {
276 sender->send(data, t, tr);
277 return true;
278 }
279 return false;
280}
281
[290]282//template <typename T1, typename T2, class C>
283//bool checkedSend(boost::shared_ptr< OutputInterface<T1, C> > sender, T2 const& data, road_time_t t = road_time(), road_timerange_t tr = 0);
284//
285//template <typename T1, typename T2, class C>
286//bool checkedSend(boost::shared_ptr< OutputInterface<T1, C> > sender, T2 const& data, road_time_t t, road_timerange_t tr)
287//{
288// return checkedSend(sender.get(), data, t, tr);
289//}
290
[89]291} // namespace pacpus
292
293#endif // IN_OUT_INTERFACE_H
Note: See TracBrowser for help on using the repository browser.