#ifndef IN_OUT_INTERFACE_H #define IN_OUT_INTERFACE_H #include #include #include #include #include #include #include namespace pacpus { template class InputInterface : public InputInterfaceBase { public: typedef T data_type; typedef C component_type; typedef void (C::*process_data_function_type_with_event)(T&, PacpusEvent); typedef void (C::*process_data_function_type)(T&); typedef void (C::*process_const_data_function_type_with_event)(T const&, PacpusEvent); typedef void (C::*process_const_data_function_type)(T const&); //typedef boost::function process_data_function_type_with_event; //typedef boost::function process_data_function_type; //typedef boost::function process_const_data_function_type_with_event; //typedef boost::function process_const_data_function_type; void initialize() { // each input slot will be processed by a separate thread moveToThread(&mProcessingThread); mProcessingThread.start(); } InputInterface(QString name, C* component, process_data_function_type_with_event processingMethod) : InputInterfaceBase(name, component, component) { mProcessingMethod.fe = processingMethod; initialize(); } InputInterface(QString name, C* component, process_data_function_type processingMethod) : InputInterfaceBase(name, component, component) { mProcessingMethod.f = processingMethod; initialize(); } InputInterface(QString name, C* component, process_const_data_function_type_with_event processingMethod) : InputInterfaceBase(name, component, component) { mProcessingMethod.cfe = processingMethod; initialize(); } InputInterface(QString name, C* component, process_const_data_function_type processingMethod) : InputInterfaceBase(name, component, component) { mProcessingMethod.cf = processingMethod; initialize(); } ~InputInterface() { } std::size_t getDataSize() const { return sizeof(T); } std::type_info const& getDataType() const { return typeid(T); } // FIXME: what's the purpose of this function? //PacpusEvent * getEventTemplate() //{ // return new PacpusTypedEvent(TYPED_EVENT); //} // FIXME: what's the purpose of this function? void customEvent(QEvent* event) { static road_timerange_t const kMaximumBoundedTimeRange = 500; if (!event) { LOG_WARN("null event"); return; } // check that component has been started if ((NULL == getComponent()) || (!getComponent()->isActive())) { LOG_DEBUG("component is not active"); return; } LOG_TRACE("Receiver: " << getSignature()); //PacpusTypedEvent* typedEvent = dynamic_cast *>(event); PacpusEvent* pacpusEvent = dynamic_cast(event); if (!pacpusEvent) { LOG_WARN("dynamic_cast failed: not a PacpusEvent"); return; } PacpusTypedEvent* typedEvent = dynamic_cast*>(pacpusEvent); if (!typedEvent) { LOG_WARN("dynamic_cast failed: incompatible event types"); return; } switch (event->type()) { case TYPED_EVENT: if (TimeBounded == readingMode() && typedEvent->timerange() < kMaximumBoundedTimeRange) { LOG_WARN("Incorrect TimeRange (< " << kMaximumBoundedTimeRange << "), switching to NeverSkip mode"); readingMode() = NeverSkip; } switch (readingMode()) { case TimeBounded: if (road_time() - typedEvent->time() > typedEvent->timerange()) { LOG_TRACE("Data skipped, receiver: " << this->getSignature()); break; } mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent); break; case GetLast: mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent); // delete all remaining events QCoreApplication::removePostedEvents(this, TYPED_EVENT); break; case NeverSkip: mProcessingMethod.invoke(component(), typedEvent->data(), *typedEvent); break; default: LOG_WARN("Unknown reading mode " << readingMode()); break; } break; // Add here new event type if needed default: LOG_WARN("Unknown event ID " << event->type()); break; } event->accept(); } // TODO: pull mode (NOT YET IMPLEMENTED!!!) T& getData() { T data; // TODO: ask the output data; return data; } protected: struct ProcessingMethod { ProcessingMethod() { fe = NULL; f = NULL; cfe = NULL; cf = NULL; } void invoke(ComponentBase* component, T& data, PacpusEvent event) { C* comp = dynamic_cast(component); if (NULL == comp) { LOG_WARN("NULL component"); return; } //if (fe) { // fe(comp, data, event); //} else if (f) { // f(comp, data); //} else if (cfe) { // cfe(comp, data, event); //} else if (cf) { // cf(comp, data); //} else { // LOG_WARN("no method to invoke"); //} if (fe) { (comp->*fe)(data, event); } else if (f) { (comp->*f)(data); } else if (cfe) { (comp->*cfe)(data, event); } else if (cf) { (comp->*cf)(data); } else { LOG_WARN("no method to invoke"); } } process_data_function_type_with_event fe; process_data_function_type f; process_const_data_function_type_with_event cfe; process_const_data_function_type cf; }; ProcessingMethod mProcessingMethod; QThread mProcessingThread; }; template class OutputInterface : public OutputInterfaceBase { public: typedef T data_type; typedef C component_type; OutputInterface(QString name, C* component) : OutputInterfaceBase(name, component, component) {} ~OutputInterface() {} /// Send data through a typed output void send(T const& data, road_time_t t = road_time(), road_timerange_t tr = 0); std::size_t getDataSize() const { return sizeof(T); } std::type_info const& getDataType() const { return typeid(T); } }; template void OutputInterface::send(T const& data, road_time_t t, road_timerange_t tr) { // FIXME: use shared data //QSharedPointer sharedPointer = new T(data); for (QList::iterator it = connections().begin(), itend = connections().end(); it != itend; ++it) { // Qt documentation: // The event must be allocated on the heap since the post event queue will take ownership of the event and delete it once it has been posted. // It is not safe to access the event after it has been posted. QEvent* pacpusEvent = new PacpusTypedEvent(TYPED_EVENT, data, t, tr); QCoreApplication::postEvent( it->getInterface(), pacpusEvent, it->getPriority() ); LOG_TRACE("Sender: " << getSignature()); } } template bool checkedSend(OutputInterface* sender, T2 const& data, road_time_t t = road_time(), road_timerange_t tr = 0); template bool checkedSend(OutputInterface* sender, T2 const& data, road_time_t t, road_timerange_t tr) { if (sender && sender->hasConnection()) { sender->send(data, t, tr); return true; } return false; } } // namespace pacpus #endif // IN_OUT_INTERFACE_H