source: pacpusframework/trunk/include/Pacpus/kernel/InputOutputInterface.h@ 287

Last change on this file since 287 was 287, checked in by Marek Kurdej, 10 years ago

MAJOR: InputOutputInterface has a separate thread for each input slot.

  • Property svn:executable set to *
File size: 8.4 KB
Line 
1#ifndef IN_OUT_INTERFACE_H
2#define IN_OUT_INTERFACE_H
3
4#include <Pacpus/kernel/InputOutputBase.h>
5#include <Pacpus/kernel/Log.h>
6
7#include <boost/function.hpp>
8#include <QByteArray>
9#include <QCoreApplication>
10#include <QThread>
11#include <typeinfo>
12
13namespace pacpus
14{
15
16template <typename T, class C>
17class InputInterface
18 : public InputInterfaceBase
19{
20public:
21 typedef T data_type;
22 typedef C component_type;
23 typedef void (C::*process_data_function_type_with_event)(T&, PacpusEvent);
24 typedef void (C::*process_data_function_type)(T&);
25 typedef void (C::*process_const_data_function_type_with_event)(T const&, PacpusEvent);
26 typedef void (C::*process_const_data_function_type)(T const&);
27 //typedef boost::function<void (C*, T&, PacpusEvent)> process_data_function_type_with_event;
28 //typedef boost::function<void (C*, T&)> process_data_function_type;
29 //typedef boost::function<void (C*, T const&, PacpusEvent)> process_const_data_function_type_with_event;
30 //typedef boost::function<void (C*, T const&)> process_const_data_function_type;
31
32 void initialize()
33 {
34 // each input slot will be processed by a separate thread
35 moveToThread(&mProcessingThread);
36 mProcessingThread.start();
37 }
38
39 InputInterface(QString name, C* component, process_data_function_type_with_event processingMethod)
40 : InputInterfaceBase(name, component, component)
41 {
42 mProcessingMethod.fe = processingMethod;
43 initialize();
44 }
45
46 InputInterface(QString name, C* component, process_data_function_type processingMethod)
47 : InputInterfaceBase(name, component, component)
48 {
49 mProcessingMethod.f = processingMethod;
50 initialize();
51 }
52
53 InputInterface(QString name, C* component, process_const_data_function_type_with_event processingMethod)
54 : InputInterfaceBase(name, component, component)
55 {
56 mProcessingMethod.cfe = processingMethod;
57 initialize();
58 }
59
60 InputInterface(QString name, C* component, process_const_data_function_type processingMethod)
61 : InputInterfaceBase(name, component, component)
62 {
63 mProcessingMethod.cf = processingMethod;
64 initialize();
65 }
66
67 ~InputInterface()
68 {
69 }
70
71 std::size_t getDataSize() const
72 {
73 return sizeof(T);
74 }
75
76 std::type_info const& getDataType() const
77 {
78 return typeid(T);
79 }
80
81 // FIXME: what's the purpose of this function?
82 //PacpusEvent * getEventTemplate()
83 //{
84 // return new PacpusTypedEvent<T>(TYPED_EVENT);
85 //}
86
87 // FIXME: what's the purpose of this function?
88 void customEvent(QEvent* event)
89 {
90 static road_timerange_t const kMaximumBoundedTimeRange = 500;
91
92 if (!event) {
93 LOG_WARN("null event");
94 return;
95 }
96
97 // check that component has been started
98 if ((NULL == getComponent()) || (!getComponent()->isActive())) {
99 LOG_DEBUG("component is not active");
100 return;
101 }
102
103 LOG_TRACE("Receiver: " << getSignature());
104
105 //PacpusTypedEvent<T>* typedEvent = dynamic_cast<PacpusTypedEvent<T> *>(event);
106 PacpusEvent* pacpusEvent = dynamic_cast<PacpusEvent*>(event);
107 if (!pacpusEvent) {
108 LOG_WARN("dynamic_cast failed: not a PacpusEvent");
109 return;
110 }
111 PacpusTypedEvent<T>* typedEvent = dynamic_cast<PacpusTypedEvent<T>*>(pacpusEvent);
112 if (!typedEvent) {
113 LOG_WARN("dynamic_cast failed: incompatible event types");
114 return;
115 }
116
117 switch (event->type()) {
118 case TYPED_EVENT:
119 if (TimeBounded == readingMode() && typedEvent->timerange() < kMaximumBoundedTimeRange) {
120 LOG_WARN("Incorrect TimeRange (< " << kMaximumBoundedTimeRange << "), switching to NeverSkip mode");
121 readingMode() = NeverSkip;
122 }
123
124 switch (readingMode()) {
125 case TimeBounded:
126 if (road_time() - typedEvent->time() > typedEvent->timerange()) {
127 LOG_TRACE("Data skipped, receiver: " << this->getSignature());
128 break;
129 }
130
131 mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent);
132 break;
133
134 case GetLast:
135 mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent);
136
137 // delete all remaining events
138 QCoreApplication::removePostedEvents(this, TYPED_EVENT);
139 break;
140
141 case NeverSkip:
142 mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent);
143 break;
144
145 default:
146 LOG_WARN("Unknown reading mode " << readingMode());
147 break;
148 }
149 break;
150
151 // Add here new event type if needed
152
153 default:
154 LOG_WARN("Unknown event ID " << event->type());
155 break;
156 }
157 event->accept();
158 }
159
160 // TODO: pull mode (NOT YET IMPLEMENTED!!!)
161 T& getData()
162 {
163 T data;
164 // TODO: ask the output data;
165 return data;
166 }
167
168protected:
169 struct ProcessingMethod
170 {
171 ProcessingMethod()
172 {
173 fe = NULL;
174 f = NULL;
175 cfe = NULL;
176 cf = NULL;
177 }
178
179 void invoke(ComponentBase* component, T& data, PacpusEvent event)
180 {
181 C* comp = dynamic_cast<C*>(component);
182 if (NULL == comp) {
183 LOG_WARN("NULL component");
184 return;
185 }
186 //if (fe) {
187 // fe(comp, data, event);
188 //} else if (f) {
189 // f(comp, data);
190 //} else if (cfe) {
191 // cfe(comp, data, event);
192 //} else if (cf) {
193 // cf(comp, data);
194 //} else {
195 // LOG_WARN("no method to invoke");
196 //}
197 if (fe) {
198 (comp->*fe)(data, event);
199 } else if (f) {
200 (comp->*f)(data);
201 } else if (cfe) {
202 (comp->*cfe)(data, event);
203 } else if (cf) {
204 (comp->*cf)(data);
205 } else {
206 LOG_WARN("no method to invoke");
207 }
208 }
209
210 process_data_function_type_with_event fe;
211 process_data_function_type f;
212 process_const_data_function_type_with_event cfe;
213 process_const_data_function_type cf;
214 };
215
216 ProcessingMethod mProcessingMethod;
217 QThread mProcessingThread;
218};
219
220template <typename T, class C>
221class OutputInterface
222 : public OutputInterfaceBase
223{
224public:
225 typedef T data_type;
226 typedef C component_type;
227
228 OutputInterface(QString name, C* component)
229 : OutputInterfaceBase(name, component, component)
230 {}
231
232 ~OutputInterface()
233 {}
234
235 /// Send data through a typed output
236 void send(T const& data, road_time_t t = road_time(), road_timerange_t tr = 0);
237
238 std::size_t getDataSize() const
239 {
240 return sizeof(T);
241 }
242
243 std::type_info const& getDataType() const
244 {
245 return typeid(T);
246 }
247};
248
249template <typename T, class C>
250void OutputInterface<T, C>::send(T const& data, road_time_t t, road_timerange_t tr)
251{
252 // FIXME: use shared data
253 //QSharedPointer<T> sharedPointer = new T(data);
254
255 for (QList<ConnectionBase>::iterator it = connections().begin(), itend = connections().end(); it != itend; ++it) {
256 // Qt documentation:
257 // The event must be allocated on the heap since the post event queue will take ownership of the event and delete it once it has been posted.
258 // It is not safe to access the event after it has been posted.
259 QEvent* pacpusEvent = new PacpusTypedEvent<T>(TYPED_EVENT, data, t, tr);
260 QCoreApplication::postEvent(
261 it->getInterface(),
262 pacpusEvent,
263 it->getPriority()
264 );
265 LOG_TRACE("Sender: " << getSignature());
266 }
267}
268
269template <typename T1, typename T2, class C>
270bool checkedSend(OutputInterface<T1, C>* sender, T2 const& data, road_time_t t = road_time(), road_timerange_t tr = 0);
271
272template <typename T1, typename T2, class C>
273bool checkedSend(OutputInterface<T1, C>* sender, T2 const& data, road_time_t t, road_timerange_t tr)
274{
275 if (sender && sender->hasConnection()) {
276 sender->send(data, t, tr);
277 return true;
278 }
279 return false;
280}
281
282} // namespace pacpus
283
284#endif // IN_OUT_INTERFACE_H
Note: See TracBrowser for help on using the repository browser.