From 212313d92b421f098108a33e0eda7ada08e8a6b8 Mon Sep 17 00:00:00 2001 From: Simon Marchi Date: Thu, 15 Feb 2024 22:53:01 -0500 Subject: [PATCH] cpp-common/bt2: move user component class bases to `component-class-dev.hpp` It will become possible to use these classes to create component classes at runtime, not only in the context of plugins. Change-Id: I3507a8025fa4e8af6bdb42f2be053dd43f3c2eaa Signed-off-by: Simon Marchi Reviewed-on: https://review.lttng.org/c/babeltrace/+/11814 Reviewed-by: Philippe Proulx Tested-by: jenkins --- src/Makefile.am | 1 + src/cpp-common/bt2/component-class-dev.hpp | 632 +++++++++++++++++++++ src/cpp-common/bt2/plugin-dev.hpp | 619 -------------------- src/plugins/utils/muxer/msg-iter.hpp | 2 +- 4 files changed, 634 insertions(+), 620 deletions(-) create mode 100644 src/cpp-common/bt2/component-class-dev.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 0d990faa..1a9ac61d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -117,6 +117,7 @@ cpp_common_libcpp_common_la_SOURCES = \ cpp-common/bt2/borrowed-object.hpp \ cpp-common/bt2/clock-class.hpp \ cpp-common/bt2/clock-snapshot.hpp \ + cpp-common/bt2/component-class-dev.hpp \ cpp-common/bt2/component-port.hpp \ cpp-common/bt2/exc.hpp \ cpp-common/bt2/field-class.hpp \ diff --git a/src/cpp-common/bt2/component-class-dev.hpp b/src/cpp-common/bt2/component-class-dev.hpp new file mode 100644 index 00000000..1174140f --- /dev/null +++ b/src/cpp-common/bt2/component-class-dev.hpp @@ -0,0 +1,632 @@ +/* + * Copyright (c) 2024 EfficiOS, Inc. + * + * SPDX-License-Identifier: MIT + */ + +#ifndef BABELTRACE_CPP_COMMON_BT2_COMPONENT_CLASS_DEV_HPP +#define BABELTRACE_CPP_COMMON_BT2_COMPONENT_CLASS_DEV_HPP + +#include + +#include + +#include "cpp-common/bt2c/c-string-view.hpp" +#include "cpp-common/bt2c/logging.hpp" +#include "cpp-common/vendor/fmt/core.h" + +#include "exc.hpp" +#include "private-query-executor.hpp" +#include "self-component-port.hpp" + +namespace bt2 { + +template +class UserMessageIterator; + +/* + * Base class of any user component. + * + * See the specific `bt2::UserSourceComponent`, + * `bt2::UserFilterComponent`, and `bt2::UserSinkComponent`. + */ +template +class UserComponent +{ + /* Give a related message iterator access to this logger */ + template + friend class UserMessageIterator; + +public: + using InitData = InitDataT; + using QueryData = QueryDataT; + +protected: + explicit UserComponent(const SelfCompT selfComp, const std::string& logTag) : + _mLogger {selfComp, fmt::format("{}/[{}]", logTag, selfComp.name())}, _mSelfComp {selfComp} + { + } + +protected: + bt2c::CStringView _name() const noexcept + { + return _mSelfComp.name(); + } + + LoggingLevel _loggingLevel() const noexcept + { + return _mSelfComp.loggingLevel(); + } + + std::uint64_t _graphMipVersion() const noexcept + { + return _mSelfComp.graphMipVersion(); + } + + SelfCompT _selfComp() noexcept + { + return _mSelfComp; + } + + bt2c::Logger _mLogger; + +private: + SelfCompT _mSelfComp; +}; + +/* + * Base class of a user source component `UserComponentT` (CRTP). + * + * UserComponentT::UserComponentT() must accept, in this order: + * + * 1. A `bt2::SelfSourceComponent` parameter, which it needs to forward + * to bt2::UserSourceComponent::UserSourceComponent(). + * + * 2. A `bt2::ConstValue` parameter (the initialization parameters). + * + * 3. An `InitDataT *` parameter (the initialization method data). + * + * `UserMessageIteratorT`, the message iterator class to use, must inherit + * `UserMessageIterator`. + * + * UserComponentT::_query() receives a query method data pointer of type + * `QueryDataT *` as its last parameter. + */ +template +class UserSourceComponent : public UserComponent +{ + static_assert(std::is_base_of, + UserMessageIteratorT>::value, + "`UserMessageIteratorT` inherits `UserMessageIterator`"); + +public: + using MessageIterator = UserMessageIteratorT; + +protected: + using _OutputPorts = SelfSourceComponent::OutputPorts; + + explicit UserSourceComponent(const SelfSourceComponent selfComp, const std::string& logTag) : + UserComponent {selfComp, logTag} + { + } + +public: + static Value::Shared query(const SelfComponentClass selfCompCls, + const PrivateQueryExecutor privQueryExec, + const bt2c::CStringView obj, const ConstValue params, + QueryDataT * const data) + { + return UserComponentT::_query(selfCompCls, privQueryExec, obj, params, data); + } + + static void getSupportedMipVersions(const SelfComponentClass selfCompCls, + const ConstValue params, const LoggingLevel loggingLevel, + const UnsignedIntegerRangeSet ranges) + { + UserComponentT::_getSupportedMipVersions(selfCompCls, params, loggingLevel, ranges); + } + + void outputPortConnected(const SelfComponentOutputPort outputPort, + const ConstInputPort inputPort) + { + static_cast(*this)._outputPortConnected(outputPort, inputPort); + } + +protected: + /* Overloadable */ + static Value::Shared _query(SelfComponentClass, PrivateQueryExecutor, bt2c::CStringView, + ConstValue, QueryDataT *) + { + throw UnknownObject {}; + } + + /* Overloadable */ + static void _getSupportedMipVersions(SelfComponentClass, ConstValue, LoggingLevel, + const UnsignedIntegerRangeSet ranges) + { + ranges.addRange(0, 0); + } + + /* Overloadable */ + void _outputPortConnected(SelfComponentOutputPort, ConstInputPort) + { + } + + template + _OutputPorts::Port _addOutputPort(const bt2c::CStringView name, DataT& data) + { + return this->_selfComp().addOutputPort(name, data); + } + + _OutputPorts::Port _addOutputPort(const bt2c::CStringView name) + { + return this->_selfComp().addOutputPort(name); + } + + _OutputPorts _outputPorts() noexcept + { + return this->_selfComp().outputPorts(); + } +}; + +/* + * Base class of a user filter component `UserComponentT` (CRTP). + * + * UserComponentT::UserComponentT() must accept, in this order: + * + * 1. A `bt2::SelfFilterComponent` parameter, which it needs to forward + * to bt2::UserFilterComponent::UserFilterComponent(). + * + * 2. A `bt2::ConstValue` parameter (the initialization parameters). + * + * 3. An `InitDataT *` parameter (the initialization method data). + * + * `UserMessageIteratorT`, the message iterator class to use, must inherit + * `UserMessageIterator`. + * + * UserComponentT::_query() receives a query method data pointer of type + * `QueryDataT *` as its last parameter. + */ +template +class UserFilterComponent : public UserComponent +{ + static_assert(std::is_base_of, + UserMessageIteratorT>::value, + "`UserMessageIteratorT` inherits `UserMessageIterator`"); + +public: + using MessageIterator = UserMessageIteratorT; + +protected: + using _InputPorts = SelfFilterComponent::InputPorts; + using _OutputPorts = SelfFilterComponent::OutputPorts; + + explicit UserFilterComponent(const SelfFilterComponent selfComp, const std::string& logTag) : + UserComponent {selfComp, logTag} + { + } + +public: + static Value::Shared query(const SelfComponentClass selfCompCls, + const PrivateQueryExecutor privQueryExec, + const bt2c::CStringView obj, const ConstValue params, + QueryDataT * const data) + { + return UserComponentT::_query(selfCompCls, privQueryExec, obj, params, data); + } + + static void getSupportedMipVersions(const SelfComponentClass selfCompCls, + const ConstValue params, const LoggingLevel loggingLevel, + const UnsignedIntegerRangeSet ranges) + { + UserComponentT::_getSupportedMipVersions(selfCompCls, params, loggingLevel, ranges); + } + + void inputPortConnected(const SelfComponentInputPort inputPort, + const ConstOutputPort outputPort) + { + static_cast(*this)._inputPortConnected(inputPort, outputPort); + } + + void outputPortConnected(const SelfComponentOutputPort outputPort, + const ConstInputPort inputPort) + { + static_cast(*this)._outputPortConnected(outputPort, inputPort); + } + +protected: + /* Overloadable */ + static Value::Shared _query(SelfComponentClass, PrivateQueryExecutor, bt2c::CStringView, + ConstValue, QueryDataT *) + { + throw UnknownObject {}; + } + + /* Overloadable */ + static void _getSupportedMipVersions(SelfComponentClass, ConstValue, LoggingLevel, + const UnsignedIntegerRangeSet ranges) + { + ranges.addRange(0, 0); + } + + /* Overloadable */ + void _inputPortConnected(SelfComponentInputPort, ConstOutputPort) + { + } + + /* Overloadable */ + void _outputPortConnected(SelfComponentOutputPort, ConstInputPort) + { + } + + template + _OutputPorts::Port _addInputPort(const bt2c::CStringView name, DataT& data) + { + return this->_selfComp().addInputPort(name, data); + } + + _InputPorts::Port _addInputPort(const bt2c::CStringView name) + { + return this->_selfComp().addInputPort(name); + } + + _InputPorts _inputPorts() noexcept + { + return this->_selfComp().inputPorts(); + } + + template + _OutputPorts::Port _addOutputPort(const bt2c::CStringView name, DataT& data) + { + return this->_selfComp().addOutputPort(name, data); + } + + _OutputPorts::Port _addOutputPort(const bt2c::CStringView name) + { + return this->_selfComp().addOutputPort(name); + } + + _OutputPorts _outputPorts() noexcept + { + return this->_selfComp().outputPorts(); + } +}; + +/* + * Base class of a user sink component `UserComponentT` (CRTP). + * + * UserComponentT::UserComponentT() must accept, in this order: + * + * 1. A `bt2::SelfSinkComponent` parameter, which it needs to forward + * to bt2::UserSinkComponent::UserSinkComponent(). + * + * 2. A `bt2::ConstValue` parameter (the initialization parameters). + * + * 3. An `InitDataT *` parameter (the initialization method data). + * + * `UserComponentT` must implement: + * + * bool _consume(); + * + * This method returns `true` if the sink component still needs to + * consume, or `false` if it's finished. + * + * UserComponentT::_query() receives a query method data pointer of type + * `QueryDataT *` as its last parameter. + + */ +template +class UserSinkComponent : public UserComponent +{ +protected: + using _InputPorts = SelfSinkComponent::InputPorts; + + explicit UserSinkComponent(const SelfSinkComponent selfComp, const std::string& logTag) : + UserComponent {selfComp, logTag} + { + } + +public: + static Value::Shared query(const SelfComponentClass selfCompCls, + const PrivateQueryExecutor privQueryExec, + const bt2c::CStringView obj, const ConstValue params, + QueryDataT * const data) + { + return UserComponentT::_query(selfCompCls, privQueryExec, obj, params, data); + } + + static void getSupportedMipVersions(const SelfComponentClass selfCompCls, + const ConstValue params, const LoggingLevel loggingLevel, + const UnsignedIntegerRangeSet ranges) + { + UserComponentT::_getSupportedMipVersions(selfCompCls, params, loggingLevel, ranges); + } + + void graphIsConfigured() + { + static_cast(*this)._graphIsConfigured(); + } + + void inputPortConnected(const SelfComponentInputPort inputPort, + const ConstOutputPort outputPort) + { + static_cast(*this)._inputPortConnected(inputPort, outputPort); + } + + bool consume() + { + return static_cast(*this)._consume(); + } + +protected: + /* Overloadable */ + static Value::Shared _query(SelfComponentClass, PrivateQueryExecutor, bt2c::CStringView, + ConstValue, QueryDataT *) + { + throw UnknownObject {}; + } + + /* Overloadable */ + static void _getSupportedMipVersions(SelfComponentClass, ConstValue, LoggingLevel, + const UnsignedIntegerRangeSet ranges) + { + ranges.addRange(0, 0); + } + + /* Overloadable */ + void _graphIsConfigured() + { + } + + /* Overloadable */ + void _inputPortConnected(SelfComponentInputPort, ConstOutputPort) + { + } + + MessageIterator::Shared _createMessageIterator(const _InputPorts::Port port) + { + return this->_selfComp().createMessageIterator(port); + } + + template + _InputPorts::Port _addInputPort(const bt2c::CStringView name, DataT& data) + { + return this->_selfComp().addInputPort(name, data); + } + + _InputPorts::Port _addInputPort(const bt2c::CStringView name) + { + return this->_selfComp().addInputPort(name); + } + + _InputPorts _inputPorts() noexcept + { + return this->_selfComp().inputPorts(); + } +}; + +/* + * Base class of a user message iterator `UserMessageIteratorT` (CRTP) + * of which the parent user component class is `UserComponentT`. + * + * `UserMessageIteratorT::UserMessageIteratorT()` must accept a + * `bt2::SelfMessageIterator` parameter, which it needs to forward to + * bt2::UserMessageIterator::UserMessageIterator(). + * + * The public next() method below (called by the bridge) implements the + * very common pattern of appending messages into the output array, and, + * meanwhile: + * + * If it catches a `bt2::TryAgain` exception: + * If the message array isn't empty, transform this into a success + * (don't throw). + * + * Otherwise rethrow. + * + * If it catches an error: + * If the message array isn't empty, transform this into a success + * (don't throw), but save the error of the current thread and the + * type of error to throw the next time the user calls next(). + * + * Otherwise rethrow. + * + * `UserMessageIteratorT` must implement: + * + * void _next(bt2::ConstMessageArray& messages); + * + * This method fills `messages` with at most `messages.capacity()` + * messages and may throw `bt2::TryAgain` or a valid error whenever. + * Leaving an empty `messages` means the end of iteration. + */ +template +class UserMessageIterator +{ +private: + /* Type of `_mExcToThrowType` */ + enum class _ExcToThrowType + { + NONE, + ERROR, + MEM_ERROR, + }; + +protected: + explicit UserMessageIterator(const SelfMessageIterator selfMsgIter, + const std::string& logTagSuffix) : + _mSelfMsgIter {selfMsgIter}, + _mLogger {selfMsgIter, + fmt::format("{}/{}", this->_component()._mLogger.tag(), logTagSuffix)} + { + } + +public: + ~UserMessageIterator() + { + this->_resetError(); + } + + void next(bt2::ConstMessageArray& messages) + { + /* Any saved error? Now is the time to throw */ + if (G_UNLIKELY(_mExcToThrowType != _ExcToThrowType::NONE)) { + /* Move `_mSavedLibError`, if any, as current thread error */ + if (_mSavedLibError) { + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(_mSavedLibError); + } + + /* Throw the corresponding exception */ + if (_mExcToThrowType == _ExcToThrowType::ERROR) { + throw bt2::Error {}; + } else { + BT_ASSERT(_mExcToThrowType == _ExcToThrowType::MEM_ERROR); + throw bt2::MemoryError {}; + } + } + + /* + * When catching some exception below, if our message array + * isn't empty, then return immediately before throwing to + * provide those messages to downstream. + * + * When catching an error, also save the current thread error, + * if any, so that we can restore it later (see the beginning of + * this method). + */ + BT_ASSERT_DBG(_mExcToThrowType == _ExcToThrowType::NONE); + + try { + this->_userObj()._next(messages); + + /* We're done: everything below is exception handling */ + return; + } catch (const bt2::TryAgain&) { + if (messages.isEmpty()) { + throw; + } + } catch (const std::bad_alloc&) { + if (messages.isEmpty()) { + throw; + } + + _mExcToThrowType = _ExcToThrowType::MEM_ERROR; + } catch (const bt2::Error&) { + if (messages.isEmpty()) { + throw; + } + + _mExcToThrowType = _ExcToThrowType::ERROR; + } + + if (_mExcToThrowType != _ExcToThrowType::NONE) { + BT_CPPLOGE( + "An error occurred, but there are {} messages to return: delaying the error reporting.", + messages.length()); + BT_ASSERT(!_mSavedLibError); + _mSavedLibError = bt_current_thread_take_error(); + } + } + + bool canSeekBeginning() + { + this->_resetError(); + return this->_userObj()._canSeekBeginning(); + } + + void seekBeginning() + { + this->_resetError(); + return this->_userObj()._seekBeginning(); + } + + bool canSeekNsFromOrigin(const std::int64_t nsFromOrigin) + { + this->_resetError(); + return this->_userObj()._canSeekNsFromOrigin(nsFromOrigin); + } + + void seekNsFromOrigin(const std::int64_t nsFromOrigin) + { + this->_resetError(); + this->_userObj()._seekNsFromOrigin(nsFromOrigin); + } + +protected: + /* Overloadable */ + bool _canSeekBeginning() noexcept + { + return false; + } + + /* Overloadable */ + void _seekBeginning() noexcept + { + } + + /* Overloadable */ + bool _canSeekNsFromOrigin(std::int64_t) noexcept + { + return false; + } + + /* Overloadable */ + void _seekNsFromOrigin(std::int64_t) noexcept + { + } + + MessageIterator::Shared _createMessageIterator(const SelfComponentInputPort port) + { + return _mSelfMsgIter.createMessageIterator(port); + } + + UserComponentT& _component() noexcept + { + return _mSelfMsgIter.component().template data(); + } + + SelfComponentOutputPort _port() noexcept + { + return _mSelfMsgIter.port(); + } + + bool _isInterrupted() const noexcept + { + return _mSelfMsgIter.isInterrupted(); + } + +private: + UserMessageIteratorT& _userObj() noexcept + { + return static_cast(*this); + } + + void _resetError() noexcept + { + _mExcToThrowType = _ExcToThrowType::NONE; + + if (_mSavedLibError) { + bt_error_release(_mSavedLibError); + } + } + + SelfMessageIterator _mSelfMsgIter; + + /* + * next() may accumulate messages, and then catch an error before + * returning. In that case, it saves the error of the current thread + * here so that it can return its accumulated messages and throw the + * next time. + * + * It also saves the type of the exception to throw the next time. + */ + _ExcToThrowType _mExcToThrowType = _ExcToThrowType::NONE; + const bt_error *_mSavedLibError = nullptr; + +protected: + bt2c::Logger _mLogger; +}; + +} /* namespace bt2 */ + +#endif /* BABELTRACE_CPP_COMMON_BT2_COMPONENT_CLASS_DEV_HPP */ diff --git a/src/cpp-common/bt2/plugin-dev.hpp b/src/cpp-common/bt2/plugin-dev.hpp index 526266d1..5b3e2edd 100644 --- a/src/cpp-common/bt2/plugin-dev.hpp +++ b/src/cpp-common/bt2/plugin-dev.hpp @@ -8,629 +8,10 @@ #ifndef BABELTRACE_CPP_COMMON_BT2_PLUGIN_DEV_HPP #define BABELTRACE_CPP_COMMON_BT2_PLUGIN_DEV_HPP -#include - -#include - #include -#include "cpp-common/bt2c/c-string-view.hpp" -#include "cpp-common/bt2c/logging.hpp" -#include "cpp-common/vendor/fmt/core.h" - -#include "exc.hpp" #include "internal/comp-cls-bridge.hpp" /* IWYU pragma: keep */ -namespace bt2 { - -template -class UserMessageIterator; - -/* - * Base class of any user component. - * - * See the specific `bt2::UserSourceComponent`, - * `bt2::UserFilterComponent`, and `bt2::UserSinkComponent`. - */ -template -class UserComponent -{ - /* Give a related message iterator access to this logger */ - template - friend class UserMessageIterator; - -public: - using InitData = InitDataT; - using QueryData = QueryDataT; - -protected: - explicit UserComponent(const SelfCompT selfComp, const std::string& logTag) : - _mLogger {selfComp, fmt::format("{}/[{}]", logTag, selfComp.name())}, _mSelfComp {selfComp} - { - } - -protected: - bt2c::CStringView _name() const noexcept - { - return _mSelfComp.name(); - } - - LoggingLevel _loggingLevel() const noexcept - { - return _mSelfComp.loggingLevel(); - } - - std::uint64_t _graphMipVersion() const noexcept - { - return _mSelfComp.graphMipVersion(); - } - - SelfCompT _selfComp() noexcept - { - return _mSelfComp; - } - - bt2c::Logger _mLogger; - -private: - SelfCompT _mSelfComp; -}; - -/* - * Base class of a user source component `UserComponentT` (CRTP). - * - * UserComponentT::UserComponentT() must accept, in this order: - * - * 1. A `bt2::SelfSourceComponent` parameter, which it needs to forward - * to bt2::UserSourceComponent::UserSourceComponent(). - * - * 2. A `bt2::ConstValue` parameter (the initialization parameters). - * - * 3. An `InitDataT *` parameter (the initialization method data). - * - * `UserMessageIteratorT`, the message iterator class to use, must inherit - * `UserMessageIterator`. - * - * UserComponentT::_query() receives a query method data pointer of type - * `QueryDataT *` as its last parameter. - */ -template -class UserSourceComponent : public UserComponent -{ - static_assert(std::is_base_of, - UserMessageIteratorT>::value, - "`UserMessageIteratorT` inherits `UserMessageIterator`"); - -public: - using MessageIterator = UserMessageIteratorT; - -protected: - using _OutputPorts = SelfSourceComponent::OutputPorts; - - explicit UserSourceComponent(const SelfSourceComponent selfComp, const std::string& logTag) : - UserComponent {selfComp, logTag} - { - } - -public: - static Value::Shared query(const SelfComponentClass selfCompCls, - const PrivateQueryExecutor privQueryExec, - const bt2c::CStringView obj, const ConstValue params, - QueryDataT * const data) - { - return UserComponentT::_query(selfCompCls, privQueryExec, obj, params, data); - } - - static void getSupportedMipVersions(const SelfComponentClass selfCompCls, - const ConstValue params, const LoggingLevel loggingLevel, - const UnsignedIntegerRangeSet ranges) - { - UserComponentT::_getSupportedMipVersions(selfCompCls, params, loggingLevel, ranges); - } - - void outputPortConnected(const SelfComponentOutputPort outputPort, - const ConstInputPort inputPort) - { - static_cast(*this)._outputPortConnected(outputPort, inputPort); - } - -protected: - /* Overloadable */ - static Value::Shared _query(SelfComponentClass, PrivateQueryExecutor, bt2c::CStringView, - ConstValue, QueryDataT *) - { - throw UnknownObject {}; - } - - /* Overloadable */ - static void _getSupportedMipVersions(SelfComponentClass, ConstValue, LoggingLevel, - const UnsignedIntegerRangeSet ranges) - { - ranges.addRange(0, 0); - } - - /* Overloadable */ - void _outputPortConnected(SelfComponentOutputPort, ConstInputPort) - { - } - - template - _OutputPorts::Port _addOutputPort(const bt2c::CStringView name, DataT& data) - { - return this->_selfComp().addOutputPort(name, data); - } - - _OutputPorts::Port _addOutputPort(const bt2c::CStringView name) - { - return this->_selfComp().addOutputPort(name); - } - - _OutputPorts _outputPorts() noexcept - { - return this->_selfComp().outputPorts(); - } -}; - -/* - * Base class of a user filter component `UserComponentT` (CRTP). - * - * UserComponentT::UserComponentT() must accept, in this order: - * - * 1. A `bt2::SelfFilterComponent` parameter, which it needs to forward - * to bt2::UserFilterComponent::UserFilterComponent(). - * - * 2. A `bt2::ConstValue` parameter (the initialization parameters). - * - * 3. An `InitDataT *` parameter (the initialization method data). - * - * `UserMessageIteratorT`, the message iterator class to use, must inherit - * `UserMessageIterator`. - * - * UserComponentT::_query() receives a query method data pointer of type - * `QueryDataT *` as its last parameter. - */ -template -class UserFilterComponent : public UserComponent -{ - static_assert(std::is_base_of, - UserMessageIteratorT>::value, - "`UserMessageIteratorT` inherits `UserMessageIterator`"); - -public: - using MessageIterator = UserMessageIteratorT; - -protected: - using _InputPorts = SelfFilterComponent::InputPorts; - using _OutputPorts = SelfFilterComponent::OutputPorts; - - explicit UserFilterComponent(const SelfFilterComponent selfComp, const std::string& logTag) : - UserComponent {selfComp, logTag} - { - } - -public: - static Value::Shared query(const SelfComponentClass selfCompCls, - const PrivateQueryExecutor privQueryExec, - const bt2c::CStringView obj, const ConstValue params, - QueryDataT * const data) - { - return UserComponentT::_query(selfCompCls, privQueryExec, obj, params, data); - } - - static void getSupportedMipVersions(const SelfComponentClass selfCompCls, - const ConstValue params, const LoggingLevel loggingLevel, - const UnsignedIntegerRangeSet ranges) - { - UserComponentT::_getSupportedMipVersions(selfCompCls, params, loggingLevel, ranges); - } - - void inputPortConnected(const SelfComponentInputPort inputPort, - const ConstOutputPort outputPort) - { - static_cast(*this)._inputPortConnected(inputPort, outputPort); - } - - void outputPortConnected(const SelfComponentOutputPort outputPort, - const ConstInputPort inputPort) - { - static_cast(*this)._outputPortConnected(outputPort, inputPort); - } - -protected: - /* Overloadable */ - static Value::Shared _query(SelfComponentClass, PrivateQueryExecutor, bt2c::CStringView, - ConstValue, QueryDataT *) - { - throw UnknownObject {}; - } - - /* Overloadable */ - static void _getSupportedMipVersions(SelfComponentClass, ConstValue, LoggingLevel, - const UnsignedIntegerRangeSet ranges) - { - ranges.addRange(0, 0); - } - - /* Overloadable */ - void _inputPortConnected(SelfComponentInputPort, ConstOutputPort) - { - } - - /* Overloadable */ - void _outputPortConnected(SelfComponentOutputPort, ConstInputPort) - { - } - - template - _OutputPorts::Port _addInputPort(const bt2c::CStringView name, DataT& data) - { - return this->_selfComp().addInputPort(name, data); - } - - _InputPorts::Port _addInputPort(const bt2c::CStringView name) - { - return this->_selfComp().addInputPort(name); - } - - _InputPorts _inputPorts() noexcept - { - return this->_selfComp().inputPorts(); - } - - template - _OutputPorts::Port _addOutputPort(const bt2c::CStringView name, DataT& data) - { - return this->_selfComp().addOutputPort(name, data); - } - - _OutputPorts::Port _addOutputPort(const bt2c::CStringView name) - { - return this->_selfComp().addOutputPort(name); - } - - _OutputPorts _outputPorts() noexcept - { - return this->_selfComp().outputPorts(); - } -}; - -/* - * Base class of a user sink component `UserComponentT` (CRTP). - * - * UserComponentT::UserComponentT() must accept, in this order: - * - * 1. A `bt2::SelfSinkComponent` parameter, which it needs to forward - * to bt2::UserSinkComponent::UserSinkComponent(). - * - * 2. A `bt2::ConstValue` parameter (the initialization parameters). - * - * 3. An `InitDataT *` parameter (the initialization method data). - * - * `UserComponentT` must implement: - * - * bool _consume(); - * - * This method returns `true` if the sink component still needs to - * consume, or `false` if it's finished. - * - * UserComponentT::_query() receives a query method data pointer of type - * `QueryDataT *` as its last parameter. - - */ -template -class UserSinkComponent : public UserComponent -{ -protected: - using _InputPorts = SelfSinkComponent::InputPorts; - - explicit UserSinkComponent(const SelfSinkComponent selfComp, const std::string& logTag) : - UserComponent {selfComp, logTag} - { - } - -public: - static Value::Shared query(const SelfComponentClass selfCompCls, - const PrivateQueryExecutor privQueryExec, - const bt2c::CStringView obj, const ConstValue params, - QueryDataT * const data) - { - return UserComponentT::_query(selfCompCls, privQueryExec, obj, params, data); - } - - static void getSupportedMipVersions(const SelfComponentClass selfCompCls, - const ConstValue params, const LoggingLevel loggingLevel, - const UnsignedIntegerRangeSet ranges) - { - UserComponentT::_getSupportedMipVersions(selfCompCls, params, loggingLevel, ranges); - } - - void graphIsConfigured() - { - static_cast(*this)._graphIsConfigured(); - } - - void inputPortConnected(const SelfComponentInputPort inputPort, - const ConstOutputPort outputPort) - { - static_cast(*this)._inputPortConnected(inputPort, outputPort); - } - - bool consume() - { - return static_cast(*this)._consume(); - } - -protected: - /* Overloadable */ - static Value::Shared _query(SelfComponentClass, PrivateQueryExecutor, bt2c::CStringView, - ConstValue, QueryDataT *) - { - throw UnknownObject {}; - } - - /* Overloadable */ - static void _getSupportedMipVersions(SelfComponentClass, ConstValue, LoggingLevel, - const UnsignedIntegerRangeSet ranges) - { - ranges.addRange(0, 0); - } - - /* Overloadable */ - void _graphIsConfigured() - { - } - - /* Overloadable */ - void _inputPortConnected(SelfComponentInputPort, ConstOutputPort) - { - } - - MessageIterator::Shared _createMessageIterator(const _InputPorts::Port port) - { - return this->_selfComp().createMessageIterator(port); - } - - template - _InputPorts::Port _addInputPort(const bt2c::CStringView name, DataT& data) - { - return this->_selfComp().addInputPort(name, data); - } - - _InputPorts::Port _addInputPort(const bt2c::CStringView name) - { - return this->_selfComp().addInputPort(name); - } - - _InputPorts _inputPorts() noexcept - { - return this->_selfComp().inputPorts(); - } -}; - -/* - * Base class of a user message iterator `UserMessageIteratorT` (CRTP) - * of which the parent user component class is `UserComponentT`. - * - * `UserMessageIteratorT::UserMessageIteratorT()` must accept a - * `bt2::SelfMessageIterator` parameter, which it needs to forward to - * bt2::UserMessageIterator::UserMessageIterator(). - * - * The public next() method below (called by the bridge) implements the - * very common pattern of appending messages into the output array, and, - * meanwhile: - * - * If it catches a `bt2::TryAgain` exception: - * If the message array isn't empty, transform this into a success - * (don't throw). - * - * Otherwise rethrow. - * - * If it catches an error: - * If the message array isn't empty, transform this into a success - * (don't throw), but save the error of the current thread and the - * type of error to throw the next time the user calls next(). - * - * Otherwise rethrow. - * - * `UserMessageIteratorT` must implement: - * - * void _next(bt2::ConstMessageArray& messages); - * - * This method fills `messages` with at most `messages.capacity()` - * messages and may throw `bt2::TryAgain` or a valid error whenever. - * Leaving an empty `messages` means the end of iteration. - */ -template -class UserMessageIterator -{ -private: - /* Type of `_mExcToThrowType` */ - enum class _ExcToThrowType - { - NONE, - ERROR, - MEM_ERROR, - }; - -protected: - explicit UserMessageIterator(const SelfMessageIterator selfMsgIter, - const std::string& logTagSuffix) : - _mSelfMsgIter {selfMsgIter}, - _mLogger {selfMsgIter, - fmt::format("{}/{}", this->_component()._mLogger.tag(), logTagSuffix)} - { - } - -public: - ~UserMessageIterator() - { - this->_resetError(); - } - - void next(bt2::ConstMessageArray& messages) - { - /* Any saved error? Now is the time to throw */ - if (G_UNLIKELY(_mExcToThrowType != _ExcToThrowType::NONE)) { - /* Move `_mSavedLibError`, if any, as current thread error */ - if (_mSavedLibError) { - BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(_mSavedLibError); - } - - /* Throw the corresponding exception */ - if (_mExcToThrowType == _ExcToThrowType::ERROR) { - throw bt2::Error {}; - } else { - BT_ASSERT(_mExcToThrowType == _ExcToThrowType::MEM_ERROR); - throw bt2::MemoryError {}; - } - } - - /* - * When catching some exception below, if our message array - * isn't empty, then return immediately before throwing to - * provide those messages to downstream. - * - * When catching an error, also save the current thread error, - * if any, so that we can restore it later (see the beginning of - * this method). - */ - BT_ASSERT_DBG(_mExcToThrowType == _ExcToThrowType::NONE); - - try { - this->_userObj()._next(messages); - - /* We're done: everything below is exception handling */ - return; - } catch (const bt2::TryAgain&) { - if (messages.isEmpty()) { - throw; - } - } catch (const std::bad_alloc&) { - if (messages.isEmpty()) { - throw; - } - - _mExcToThrowType = _ExcToThrowType::MEM_ERROR; - } catch (const bt2::Error&) { - if (messages.isEmpty()) { - throw; - } - - _mExcToThrowType = _ExcToThrowType::ERROR; - } - - if (_mExcToThrowType != _ExcToThrowType::NONE) { - BT_CPPLOGE( - "An error occurred, but there are {} messages to return: delaying the error reporting.", - messages.length()); - BT_ASSERT(!_mSavedLibError); - _mSavedLibError = bt_current_thread_take_error(); - } - } - - bool canSeekBeginning() - { - this->_resetError(); - return this->_userObj()._canSeekBeginning(); - } - - void seekBeginning() - { - this->_resetError(); - return this->_userObj()._seekBeginning(); - } - - bool canSeekNsFromOrigin(const std::int64_t nsFromOrigin) - { - this->_resetError(); - return this->_userObj()._canSeekNsFromOrigin(nsFromOrigin); - } - - void seekNsFromOrigin(const std::int64_t nsFromOrigin) - { - this->_resetError(); - this->_userObj()._seekNsFromOrigin(nsFromOrigin); - } - -protected: - /* Overloadable */ - bool _canSeekBeginning() noexcept - { - return false; - } - - /* Overloadable */ - void _seekBeginning() noexcept - { - } - - /* Overloadable */ - bool _canSeekNsFromOrigin(std::int64_t) noexcept - { - return false; - } - - /* Overloadable */ - void _seekNsFromOrigin(std::int64_t) noexcept - { - } - - MessageIterator::Shared _createMessageIterator(const SelfComponentInputPort port) - { - return _mSelfMsgIter.createMessageIterator(port); - } - - UserComponentT& _component() noexcept - { - return _mSelfMsgIter.component().template data(); - } - - SelfComponentOutputPort _port() noexcept - { - return _mSelfMsgIter.port(); - } - - bool _isInterrupted() const noexcept - { - return _mSelfMsgIter.isInterrupted(); - } - -private: - UserMessageIteratorT& _userObj() noexcept - { - return static_cast(*this); - } - - void _resetError() noexcept - { - _mExcToThrowType = _ExcToThrowType::NONE; - - if (_mSavedLibError) { - bt_error_release(_mSavedLibError); - } - } - - SelfMessageIterator _mSelfMsgIter; - - /* - * next() may accumulate messages, and then catch an error before - * returning. In that case, it saves the error of the current thread - * here so that it can return its accumulated messages and throw the - * next time. - * - * It also saves the type of the exception to throw the next time. - */ - _ExcToThrowType _mExcToThrowType = _ExcToThrowType::NONE; - const bt_error *_mSavedLibError = nullptr; - -protected: - bt2c::Logger _mLogger; -}; - -} /* namespace bt2 */ - #define BT_CPP_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID(_pluginId, _componentClassId, _name, \ _userComponentClass) \ BT_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID( \ diff --git a/src/plugins/utils/muxer/msg-iter.hpp b/src/plugins/utils/muxer/msg-iter.hpp index ff1e8240..d6daefb4 100644 --- a/src/plugins/utils/muxer/msg-iter.hpp +++ b/src/plugins/utils/muxer/msg-iter.hpp @@ -10,8 +10,8 @@ #include +#include "cpp-common/bt2/component-class-dev.hpp" #include "cpp-common/bt2/optional-borrowed-object.hpp" -#include "cpp-common/bt2/plugin-dev.hpp" #include "cpp-common/bt2/self-message-iterator-configuration.hpp" #include "cpp-common/bt2c/prio-heap.hpp" #include "cpp-common/bt2c/uuid.hpp" -- 2.34.1