flt.utils.muxer: use a heap to sort iterators (also: rewrite in C++)
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Mon, 6 Nov 2023 21:16:54 +0000 (16:16 -0500)
committerSimon Marchi <simon.marchi@efficios.com>
Thu, 14 Dec 2023 15:57:04 +0000 (10:57 -0500)
The main goal of this patch was to use a heap instead of an array to
sort upstream message iterators so as to improve the performance of a
`flt.utils.muxer` message iterator managing a lot of usptream message
iterators, especially with a lot of connected input ports.

To do this, I first decided to use `std::priority_queue`, and since I'm
not a fan of mixed C and C++, I also decided to take the opportunity to
convert the whole component class C source to C++. This looked like a
good excuse to prove that it's possible, using only common C++ stuff,
including libbabeltrace2 bindings (`bt2` namespace).

Therefore, I'm glad to announce that `flt.utils.muxer` is the first
upstream C++-only component class! ðŸ¥³

In the end, because it's more efficient, I decided to write a C++
version of what used to be the `bt_heap_` C API, `bt2c::PrioHeap`, and
to use this instead of `std::priority_queue`. ðŸ¤·

This new version has more or less the same logic as the previous one. It
has three classes under the `bt2mux` namespace:

`Comp`:
    The filter component class which doesn't do much.

    Its main purpose is to add an available input port when one is
    connected.

`UpstreamMsgIter`:
    Wraps a `bt2::MessageIterator`, keeping an internal array of
    messages (what bt2::MessageIterator::next() returns), and offering
    the msg() (current message), msgTs() (cached timestamp, if any, of
    current message), discard(), and reload() methods to deal with a
    single message at a time.

    You must call always reload() before calling msg()/msgTs(), and if
    there's current message, you must call discard() before you call
    reload(). Making the caller responsible of this logic removes
    redundant conditions in this class. You may view the
    msg() + discard() pair as a form of message popping. The sequence is
    usually:

    1. We know there's a current message because the last call to
       reload() was successful.

    2. Get the current message with msg(), and then call discard() to
       discard/remove it.

    3. Try to call reload().

       This may return `UpstreamMsgIter::ReloadStatus::NO_MORE`; if it
       does, stop using this upstream message iterator.

       This may also throw `bt2::TryAgain`; if it does, call reload()
       again later.

    Caching the timestamp of the current message is an improvement
    because the heap rebalancing operation could need to use them
    several times when comparing two upstream message iterators.

    This class also offers canSeekBeginning(), seekBeginning(), and
    canSeekForward() which more or less wrap the corresponding
    `bt2::MessageIterator` methods.

`MsgIter`:
    The message iterator class.

    This is where most of the changes are.

    It keeps a `bt2c::PrioHeap` of `UpstreamMsgIter` pointers to sort
    them. The comparator works like before, so that the output message
    order of this version for a given set of upstream message iterators
    should be the same as the previous version.

    Like before, it has an array of pointers to upstream message
    iterators to reload. An upstream message iterator is either part of
    the heap, or part of this array. We need to call reload() on each of
    the upstream message iterators in this array. This is either
    because:

    * We put them here on `MsgIter` construction to avoid getting any
      exception from bt2::MessageIterator::next().

    * The last attempt to call reload() on them threw `bt2::TryAgain`.

    Before MsgIter::_next() selects a message to add to the message
    array, all non-ended upstream message iterators need to be part of
    the heap.

    The logic of clock class expectation is unchanged.

In general, there are more detailed comments, and logging is improved,
in particular the trace level statements of the heap comparator:

    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:501 Comparing two messages: port-name-a=in0, msg-a-type=PACKET_BEGINNING, msg-a-ts=1441852773113941931, port-name-b=in1, msg-b-type=STREAM_BEGINNING, msg-b-ts=none
    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:538 Message A has a timestamp, but message B has none: oldest=B
    ...
    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:501 Comparing two messages: port-name-a=in2, msg-a-type=PACKET_END, msg-a-ts=1441852847496965372, port-name-b=in1, msg-b-type=PACKET_END, msg-b-ts=1441852847496953796
    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:533 Timestamp of message A is greater than timestamp of message B: oldest=B

My preliminary experiments show that this version is slightly (a few
percentage points) more efficient than the previous one for a typical
four-stream scenario, and then around 12-13 times more efficient for a
208-stream scenario.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Change-Id: I2e393a2ef1c33c6b134284028807c687f8adf0e8
Reviewed-on: https://review.lttng.org/c/babeltrace/+/11273
Reviewed-by: Simon Marchi <simon.marchi@efficios.com>
CI-Build: Simon Marchi <simon.marchi@efficios.com>
Tested-by: jenkins <jenkins@lttng.org>
15 files changed:
src/Makefile.am
src/cli/Makefile.am
src/plugins/utils/counter/counter.h
src/plugins/utils/dummy/dummy.h
src/plugins/utils/muxer/comp.cpp [new file with mode: 0644]
src/plugins/utils/muxer/comp.hpp [new file with mode: 0644]
src/plugins/utils/muxer/msg-iter.cpp [new file with mode: 0644]
src/plugins/utils/muxer/msg-iter.hpp [new file with mode: 0644]
src/plugins/utils/muxer/muxer.c [deleted file]
src/plugins/utils/muxer/muxer.h [deleted file]
src/plugins/utils/muxer/upstream-msg-iter.cpp [new file with mode: 0644]
src/plugins/utils/muxer/upstream-msg-iter.hpp [new file with mode: 0644]
src/plugins/utils/plugin.c [deleted file]
src/plugins/utils/plugin.cpp [new file with mode: 0644]
src/plugins/utils/trimmer/trimmer.h

index 6bb2269bbd97dbfc771684d0f734b157a0c5bb6d..bd79324d0caeff6bd0c222e06e1443452746bc60 100644 (file)
@@ -594,11 +594,15 @@ plugins_utils_babeltrace_plugin_utils_la_SOURCES = \
        plugins/utils/counter/counter.h \
        plugins/utils/dummy/dummy.c \
        plugins/utils/dummy/dummy.h \
-       plugins/utils/muxer/muxer.c \
-       plugins/utils/muxer/muxer.h \
+       plugins/utils/muxer/comp.cpp \
+       plugins/utils/muxer/comp.hpp \
+       plugins/utils/muxer/msg-iter.cpp \
+       plugins/utils/muxer/msg-iter.hpp \
+       plugins/utils/muxer/upstream-msg-iter.cpp \
+       plugins/utils/muxer/upstream-msg-iter.hpp \
        plugins/utils/trimmer/trimmer.c \
        plugins/utils/trimmer/trimmer.h \
-       plugins/utils/plugin.c
+       plugins/utils/plugin.cpp
 
 plugins_utils_babeltrace_plugin_utils_la_LDFLAGS = \
        $(AM_LDFLAGS) \
@@ -612,6 +616,7 @@ if !ENABLE_BUILT_IN_PLUGINS
 plugins_utils_babeltrace_plugin_utils_la_LIBADD += \
        lib/libbabeltrace2.la \
        common/libcommon.la \
+       cpp-common/vendor/fmt/libfmt.la \
        logging/liblogging.la \
        plugins/common/param-validation/libparam-validation.la
 endif
index 80ffb85f6271058ded663dc5d4843aee74a1dc99..4731375b9295f7e45d308042c2b30f433d407b06 100644 (file)
@@ -64,6 +64,7 @@ babeltrace2_bin_LDADD = \
        $(top_builddir)/src/lib/libbabeltrace2.la \
        $(top_builddir)/src/compat/libcompat.la \
        $(top_builddir)/src/common/libcommon.la \
+       $(top_builddir)/src/cpp-common/vendor/fmt/libfmt.la \
        $(top_builddir)/src/logging/liblogging.la \
        $(top_builddir)/src/ctfser/libctfser.la
 
index 39b9a29ab83a9983d86fa5a532b3894a4a6060cb..ad886bb3bfdea3c36af9885f9ab68b3d7585cb17 100644 (file)
 #include <stdint.h>
 #include "common/macros.h"
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 struct counter {
        bt_message_iterator *msg_iter;
        struct {
@@ -46,4 +50,8 @@ bt_component_class_sink_graph_is_configured_method_status counter_graph_is_confi
 
 bt_component_class_sink_consume_method_status counter_consume(bt_self_component_sink *component);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* BABELTRACE_PLUGINS_UTILS_COUNTER_H */
index a5e3eb9fcc5754a7c67297e5debb753fb5f5b830..0a7c6a95f037cede02e5675a25c2ee78cff7c464 100644 (file)
 #include <babeltrace2/babeltrace.h>
 #include "common/macros.h"
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 struct dummy {
        bt_message_iterator *msg_iter;
 };
@@ -28,4 +32,8 @@ bt_component_class_sink_graph_is_configured_method_status dummy_graph_is_configu
 bt_component_class_sink_consume_method_status dummy_consume(
                bt_self_component_sink *component);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* BABELTRACE_PLUGINS_UTILS_DUMMY_H */
diff --git a/src/plugins/utils/muxer/comp.cpp b/src/plugins/utils/muxer/comp.cpp
new file mode 100644 (file)
index 0000000..d81a4c2
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#include "cpp-common/vendor/fmt/core.h"
+
+#include "comp.hpp"
+
+namespace bt2mux {
+
+Comp::Comp(const bt2::SelfFilterComponent selfComp, const bt2::ConstMapValue params) :
+    bt2::UserFilterComponent<Comp> {selfComp, "PLUGIN/FLT.UTILS.MUXER"}
+{
+    BT_CPPLOGI_STR("Initializing component.");
+
+    /* No parameters expected */
+    if (!params.isEmpty()) {
+        BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+            bt2c::Error, "This component expects no parameters: param-count={}", params.length());
+    }
+
+    /* Add initial available input port */
+    this->_addAvailInputPort();
+
+    /* Add single output port */
+    try {
+        this->_addOutputPort("out");
+    } catch (const bt2c::Error&) {
+        BT_CPPLOGE_STR_APPEND_CAUSE_AND_RETHROW("Failed to add a single output port.");
+    }
+
+    BT_CPPLOGI_STR("Initialized component.");
+}
+
+void Comp::_inputPortConnected(const bt2::SelfComponentInputPort, const bt2::ConstOutputPort)
+{
+    this->_addAvailInputPort();
+}
+
+void Comp::_addAvailInputPort()
+{
+    try {
+        this->_addInputPort(fmt::format("in{}", this->_inputPorts().length()));
+    } catch (const bt2c::Error&) {
+        BT_CPPLOGE_STR_APPEND_CAUSE_AND_RETHROW("Failed to add an available input port.");
+    }
+
+    BT_CPPLOGI("Added one available input port: name={}", this->_inputPorts().back().name());
+}
+
+} /* namespace bt2mux */
diff --git a/src/plugins/utils/muxer/comp.hpp b/src/plugins/utils/muxer/comp.hpp
new file mode 100644 (file)
index 0000000..2ce66a4
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#ifndef BABELTRACE_PLUGINS_UTILS_MUXER_COMP_HPP
+#define BABELTRACE_PLUGINS_UTILS_MUXER_COMP_HPP
+
+#include "cpp-common/bt2/plugin-dev.hpp"
+
+namespace bt2mux {
+
+class Comp final : public bt2::UserFilterComponent<Comp>
+{
+    friend class MsgIter;
+    friend bt2::UserFilterComponent<Comp>;
+
+public:
+    explicit Comp(bt2::SelfFilterComponent selfComp, bt2::ConstMapValue params);
+
+private:
+    void _inputPortConnected(bt2::SelfComponentInputPort selfPort, bt2::ConstOutputPort otherPort);
+    void _addAvailInputPort();
+};
+
+} /* namespace bt2mux */
+
+#endif /* BABELTRACE_PLUGINS_UTILS_MUXER_COMP_HPP */
diff --git a/src/plugins/utils/muxer/msg-iter.cpp b/src/plugins/utils/muxer/msg-iter.cpp
new file mode 100644 (file)
index 0000000..526be00
--- /dev/null
@@ -0,0 +1,499 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#include <algorithm>
+
+#include <glib.h>
+
+#include <babeltrace2/babeltrace.h>
+
+#include "common/common.h"
+#include "cpp-common/bt2c/call.hpp"
+#include "cpp-common/bt2s/make-unique.hpp"
+#include "cpp-common/vendor/fmt/format.h"
+
+#include "plugins/common/muxing/muxing.h"
+
+#include "comp.hpp"
+#include "msg-iter.hpp"
+
+namespace bt2mux {
+
+MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
+                 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
+    bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
+    _mHeap {_HeapComparator {_mLogger}}
+{
+    /*
+     * Create one upstream message iterator for each connected
+     * input port.
+     */
+    auto canSeekForward = true;
+
+    for (const auto inputPort : this->_component()._inputPorts()) {
+        if (!inputPort.isConnected()) {
+            BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
+            continue;
+        }
+
+        /*
+         * Create new upstream message iterator and immediately make it
+         * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
+         * deal with it when downstream calls next()).
+         */
+        auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
+            this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
+
+        canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
+        _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
+        _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
+    }
+
+    /* Set the "can seek forward" configuration */
+    cfg.canSeekForward(canSeekForward);
+}
+
+namespace {
+
+std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
+{
+    if (ts) {
+        return fmt::to_string(*ts);
+    }
+
+    return "none";
+}
+
+} /* namespace */
+
+void MsgIter::_next(bt2::ConstMessageArray& msgs)
+{
+    /* Make sure all upstream message iterators are part of the heap */
+    this->_ensureFullHeap();
+
+    while (msgs.length() < msgs.capacity()) {
+        /* Empty heap? */
+        if (G_UNLIKELY(_mHeap.isEmpty())) {
+            /* No more upstream messages! */
+            return;
+        }
+
+        /*
+         * Retrieve the upstream message iterator having the oldest message.
+         */
+        auto& oldestUpstreamMsgIter = *_mHeap.top();
+
+        /* Validate the clock class of the oldest message */
+        this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
+
+        /* Append the oldest message and discard it */
+        msgs.append(oldestUpstreamMsgIter.msg().shared());
+
+        if (_mLogger.wouldLogD()) {
+            BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
+                       oldestUpstreamMsgIter.portName(),
+                       optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
+        }
+
+        oldestUpstreamMsgIter.discard();
+
+        /*
+         * Immediately try to reload `oldestUpstreamMsgIter`.
+         *
+         * The possible outcomes are:
+         *
+         * There's an available message:
+         *     Call `_mHeap.replaceTop()` to bring
+         *     `oldestUpstreamMsgIter` back to the heap, performing a
+         *     single heap rebalance.
+         *
+         * There isn't an available message (ended):
+         *     Remove `oldestUpstreamMsgIter` from the heap.
+         *
+         * `bt2::TryAgain` is thrown:
+         *     Remove `oldestUpstreamMsgIter` from the heap.
+         *
+         *     Add `oldestUpstreamMsgIter` to the set of upstream
+         *     message iterators to reload. The next call to _next()
+         *     will move it to the heap again (if not ended) after
+         *     having successfully called reload().
+         */
+        BT_CPPLOGD(
+            "Trying to reload upstream message iterator having the oldest message: port-name={}",
+            oldestUpstreamMsgIter.portName());
+
+        try {
+            if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
+                /* New current message: update heap */
+                _mHeap.replaceTop(&oldestUpstreamMsgIter);
+                BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
+                           oldestUpstreamMsgIter.portName(), _mHeap.len());
+            } else {
+                _mHeap.removeTop();
+                BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
+                           "port-name{}, heap-len={}",
+                           oldestUpstreamMsgIter.portName(), _mHeap.len());
+            }
+        } catch (const bt2::TryAgain&) {
+            _mHeap.removeTop();
+            _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
+            BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
+                       "port-name={}, heap-len={}, to-reload-len={}",
+                       oldestUpstreamMsgIter.portName(), _mHeap.len(),
+                       _mUpstreamMsgItersToReload.size());
+            throw;
+        }
+    }
+}
+
+void MsgIter::_ensureFullHeap()
+{
+    /*
+     * Always remove from `_mUpstreamMsgItersToReload` when reload()
+     * doesn't throw.
+     *
+     * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
+     * then we don't need it anymore (remains alive in
+     * `_mUpstreamMsgIters`).
+     */
+    for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
+         it = _mUpstreamMsgItersToReload.erase(it)) {
+        auto& upstreamMsgIter = **it;
+
+        BT_CPPLOGD("Handling upstream message iterator to reload: "
+                   "port-name={}, heap-len={}, to-reload-len={}",
+                   upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
+
+        if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
+            /* New current message: move to heap */
+            _mHeap.insert(&upstreamMsgIter);
+            BT_CPPLOGD("More messages available; "
+                       "inserted upstream message iterator into heap from \"to reload\" set: "
+                       "port-name={}, heap-len={}",
+                       upstreamMsgIter.portName(), _mHeap.len());
+        } else {
+            BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
+                       "port-name={}",
+                       upstreamMsgIter.portName());
+        }
+    }
+}
+
+bool MsgIter::_canSeekBeginning()
+{
+    /*
+     * We can only seek our beginning if all our upstream message
+     * iterators also can.
+     */
+    return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
+                       [](UpstreamMsgIter::UP& upstreamMsgIter) {
+                           return upstreamMsgIter->canSeekBeginning();
+                       });
+}
+
+void MsgIter::_seekBeginning()
+{
+    /*
+     * The current approach is that this operation is either successful
+     * (all upstream message iterators seek) or not. If it's not, then
+     * we don't keep any state that some sought and some didn't: we'll
+     * restart the whole process when the user tries to seek again.
+     *
+     * The first step is to clear all the containers of upstream message
+     * iterator pointers so that we can process what's in
+     * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
+     * if any seeking fails below, the downstream user is required to
+     * try the "seek beginning" operation again and only call
+     * bt_message_iterator_next() if it was successful.
+     *
+     * This means if the first four upstream message iterators seek, and
+     * then the fifth one throws `bt2::TryAgain`, then the next time
+     * this method executes, the first four upstream message iterators
+     * will seek again. That being said, it's such an unlikely scenario
+     * that the simplicity outweighs performance concerns here.
+     */
+    _mHeap.clear();
+    _mUpstreamMsgItersToReload.clear();
+
+    /* Also reset clock class expectation */
+    _mClkClsExpectation = _ClkClsExpectation::ANY;
+    _mExpectedClkClsUuid.reset();
+
+    /* Make each upstream message iterator seek */
+    for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
+        /* This may throw! */
+        upstreamMsgIter->seekBeginning();
+    }
+
+    /*
+     * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
+     * next call to _next() will deal with those.
+     */
+    for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
+        _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
+    }
+}
+
+namespace {
+
+const char *msgTypeStr(const bt2::ConstMessage msg) noexcept
+{
+    return bt_common_message_type_string(static_cast<bt_message_type>(msg.type()));
+}
+
+std::string optLogStr(const char * const str) noexcept
+{
+    return str ? fmt::format("\"{}\"", str) : "(none)";
+}
+
+} /* namespace */
+
+void MsgIter::_setClkClsExpectation(
+    const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept
+{
+    BT_ASSERT_DBG(_mClkClsExpectation == _ClkClsExpectation::ANY);
+
+    /* No initial clock class: also expect none afterwards */
+    if (!clkCls) {
+        _mClkClsExpectation = _ClkClsExpectation::NONE;
+        return;
+    }
+
+    /*
+     * This is the first clock class that this message iterator
+     * encounters. Its properties determine what to expect for the whole
+     * lifetime of the iterator.
+     */
+    if (clkCls->originIsUnixEpoch()) {
+        /* Expect clock classes having a Unix epoch origin*/
+        _mClkClsExpectation = _ClkClsExpectation::ORIG_IS_UNIX_EPOCH;
+    } else {
+        if (clkCls->uuid()) {
+            /*
+             * Expect clock classes not having a Unix epoch origin and
+             * with a specific UUID.
+             */
+            _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID;
+            _mExpectedClkClsUuid = *clkCls->uuid();
+        } else {
+            /*
+             * Expect clock classes not having a Unix epoch origin and
+             * without a UUID.
+             */
+            _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID;
+        }
+    }
+}
+
+void MsgIter::_makeSureClkClsIsExpected(
+    const bt2::ConstMessage msg,
+    const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const
+{
+    BT_ASSERT_DBG(_mClkClsExpectation != _ClkClsExpectation::ANY);
+
+    if (!clkCls) {
+        if (_mClkClsExpectation != _ClkClsExpectation::NONE) {
+            /*
+             * `msg` is a stream beginning message because a message
+             * iterator inactivity message always has a clock class.
+             */
+            const auto streamCls = msg.asStreamBeginning().stream().cls();
+
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                              "Expecting a clock class, but got none: "
+                                              "stream-class-addr={}, stream-class-name=\"{}\", "
+                                              "stream-class-id={}",
+                                              static_cast<const void *>(streamCls.libObjPtr()),
+                                              optLogStr(streamCls.name()), streamCls.id());
+        }
+
+        return;
+    }
+
+    const auto clkClsAddr = static_cast<const void *>(clkCls->libObjPtr());
+
+    switch (_mClkClsExpectation) {
+    case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH:
+        if (!clkCls->originIsUnixEpoch()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                              "Expecting a clock class having a Unix epoch origin, "
+                                              "but got one not having a Unix epoch origin: "
+                                              "clock-class-addr={}, clock-class-name={}",
+                                              clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        break;
+    case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID:
+        BT_ASSERT_DBG(!_mExpectedClkClsUuid);
+
+        if (clkCls->originIsUnixEpoch()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class not having a Unix epoch origin, "
+                "but got one having a Unix epoch origin: "
+                "clock-class-addr={}, clock-class-name={}",
+                clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        if (clkCls->uuid()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class without a UUID, but got one with a UUID: "
+                "clock-class-addr={}, clock-class-name={}, uuid={}",
+                clkClsAddr, optLogStr(clkCls->name()), clkCls->uuid()->str());
+        }
+
+        break;
+    case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID:
+        BT_ASSERT_DBG(_mExpectedClkClsUuid);
+
+        if (clkCls->originIsUnixEpoch()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class not having a Unix epoch origin, "
+                "but got one having a Unix epoch origin: "
+                "clock-class-addr={}, clock-class-name={}",
+                clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        if (!clkCls->uuid()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class with a UUID, but got one without a UUID: "
+                "clock-class-addr={}, clock-class-name={}",
+                clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        if (*clkCls->uuid() != bt2c::UuidView {*_mExpectedClkClsUuid}) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                              "Expecting a clock class with a specific UUID, "
+                                              "but got one with a different UUID: "
+                                              "clock-class-addr={}, clock-class-name={}, "
+                                              "expected-uuid=\"{}\", uuid=\"{}\"",
+                                              clkClsAddr, optLogStr(clkCls->name()),
+                                              _mExpectedClkClsUuid->str(), clkCls->uuid()->str());
+        }
+
+        break;
+    case _ClkClsExpectation::NONE:
+        BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                          "Expecting no clock class, but got one: "
+                                          "clock-class-addr={}, clock-class-name={}",
+                                          clkClsAddr, optLogStr(clkCls->name()));
+        break;
+    default:
+        bt_common_abort();
+    }
+}
+
+void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
+{
+    if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
+        /*
+         * We don't care about the other types: all the messages related
+         * to a given stream shared the same default clock class, if
+         * any.
+         */
+        return;
+    }
+
+    BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msgTypeStr(msg));
+
+    /* Get the clock class, if any, of `msg` */
+    const auto clkCls = bt2c::call([msg]() -> bt2::OptionalBorrowedObject<bt2::ConstClockClass> {
+        if (msg.isStreamBeginning()) {
+            return msg.asStreamBeginning().stream().cls().defaultClockClass();
+        } else {
+            BT_ASSERT(msg.isMessageIteratorInactivity());
+            return msg.asMessageIteratorInactivity().clockSnapshot().clockClass();
+        }
+    });
+
+    /* Set the expectation or check it */
+    if (_mClkClsExpectation == _ClkClsExpectation::ANY) {
+        /* First message: set the expectation */
+        this->_setClkClsExpectation(clkCls);
+    } else {
+        /* Make sure clock class is expected */
+        this->_makeSureClkClsIsExpected(msg, clkCls);
+    }
+}
+
+MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
+{
+}
+
+bool MsgIter::_HeapComparator::operator()(
+    const UpstreamMsgIter * const upstreamMsgIterA,
+    const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
+{
+    /* The two messages to compare */
+    const auto msgA = upstreamMsgIterA->msg();
+    const auto msgB = upstreamMsgIterB->msg();
+    auto& msgTsA = upstreamMsgIterA->msgTs();
+    auto& msgTsB = upstreamMsgIterB->msgTs();
+
+    if (_mLogger.wouldLogT()) {
+        BT_CPPLOGT("Comparing two messages: "
+                   "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
+                   "port-name-b={}, msg-b-type={}, msg-b-ts={}",
+                   upstreamMsgIterA->portName(), msgTypeStr(msgA), optMsgTsStr(msgTsA),
+                   upstreamMsgIterB->portName(), msgTypeStr(msgB), optMsgTsStr(msgTsB));
+    }
+
+    /*
+     * Try to compare using timestamps.
+     *
+     * If both timestamps are set and their values are different, then
+     * use this to establish the ordering of the two messages.
+     *
+     * If one timestamp is set, but not the other, the latter always
+     * wins. This is because, for a given upstream message iterator, we
+     * need to consume all the messages having no timestamp so that we
+     * can reach a message with a timestamp to compare it.
+     *
+     * Otherwise, we'll fall back to using
+     * common_muxing_compare_messages().
+     */
+    if (G_LIKELY(msgTsA && msgTsB)) {
+        if (*msgTsA < *msgTsB) {
+            /*
+             * Return `true` because `_mHeap.top()` provides the
+             * "greatest" element. For us, the "greatest" message is
+             * the oldest one, that is, the one having the smallest
+             * timestamp.
+             */
+            BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
+            return true;
+        } else if (*msgTsA > *msgTsB) {
+            BT_CPPLOGT_STR(
+                "Timestamp of message A is greater than timestamp of message B: oldest=B");
+            return false;
+        }
+    } else if (msgTsA && !msgTsB) {
+        BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
+        return false;
+    } else if (!msgTsA && msgTsB) {
+        BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
+        return true;
+    }
+
+    /*
+     * Comparison failed using timestamps: determine an ordering using
+     * arbitrary properties, but in a deterministic way.
+     *
+     * common_muxing_compare_messages() returns less than 0 if the first
+     * message is considered older than the second, which corresponds to
+     * this comparator returning `true`.
+     */
+    const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
+
+    BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
+               res ? "A" : "B");
+    return res;
+}
+
+} /* namespace bt2mux */
diff --git a/src/plugins/utils/muxer/msg-iter.hpp b/src/plugins/utils/muxer/msg-iter.hpp
new file mode 100644 (file)
index 0000000..993ed4a
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#ifndef BABELTRACE_PLUGINS_UTILS_MUXER_MSG_ITER_HPP
+#define BABELTRACE_PLUGINS_UTILS_MUXER_MSG_ITER_HPP
+
+#include <vector>
+
+#include "cpp-common/bt2/optional-borrowed-object.hpp"
+#include "cpp-common/bt2/plugin-dev.hpp"
+#include "cpp-common/bt2c/prio-heap.hpp"
+#include "cpp-common/bt2c/uuid.hpp"
+#include "cpp-common/bt2s/optional.hpp"
+
+#include "upstream-msg-iter.hpp"
+
+namespace bt2mux {
+
+class Comp;
+
+class MsgIter final : public bt2::UserMessageIterator<MsgIter, Comp>
+{
+    friend bt2::UserMessageIterator<MsgIter, Comp>;
+
+private:
+    /* Clock class nature expectation */
+    enum class _ClkClsExpectation
+    {
+        ANY,
+        NONE,
+        ORIG_IS_UNIX_EPOCH,
+        ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID,
+        ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID,
+    };
+
+    /* Comparator for `_mHeap` with its own logger */
+    class _HeapComparator final
+    {
+    public:
+        explicit _HeapComparator(const bt2c::Logger& logger);
+
+        bool operator()(const UpstreamMsgIter *upstreamMsgIterA,
+                        const UpstreamMsgIter *upstreamMsgIterB) const noexcept;
+
+    private:
+        bt2c::Logger _mLogger;
+    };
+
+public:
+    explicit MsgIter(bt2::SelfMessageIterator selfMsgIter,
+                     bt2::SelfMessageIteratorConfiguration config,
+                     bt2::SelfComponentOutputPort selfPort);
+
+private:
+    bool _canSeekBeginning();
+    void _seekBeginning();
+    void _next(bt2::ConstMessageArray& msgs);
+
+    /*
+     * Makes sure `_mUpstreamMsgItersToReload` is empty so that `_mHeap`
+     * is ready for the next message selection.
+     *
+     * This may throw whatever UpstreamMsgIter::reload() may throw.
+     */
+    void _ensureFullHeap();
+
+    /*
+     * Validates the clock class of the received message `msg`, setting
+     * the expectation if this is the first one.
+     *
+     * Throws `bt2::Error` on error.
+     */
+    void _validateMsgClkCls(bt2::ConstMessage msg);
+
+    /*
+     * Sets the clock class expectation (`_mClkClsExpectation` and
+     * `_mExpectedClkClsUuid`) according to `clkCls`.
+     */
+    void _setClkClsExpectation(bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept;
+
+    /*
+     * Checks that `clkCls` meets the current clock class expectation,
+     * throwing if it doesn't.
+     */
+    void _makeSureClkClsIsExpected(bt2::ConstMessage msg,
+                                   bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const;
+
+    /*
+     * Container of all the upstream message iterators.
+     *
+     * The only purpose of this is to own them; where they are below
+     * indicates their state.
+     */
+    std::vector<UpstreamMsgIter::UP> _mUpstreamMsgIters;
+
+    /*
+     * Heap of ready-to-use upstream message iterators (pointers to
+     * owned objects in `_mUpstreamMsgIters` above).
+     */
+    bt2c::PrioHeap<UpstreamMsgIter *, _HeapComparator> _mHeap;
+
+    /*
+     * Current upstream message iterators to reload, on which we must
+     * call reload() before moving them to `_mHeap` or to
+     * `_mEndedUpstreamMsgIters`.
+     *
+     * Using `std::vector` instead of some linked list because the
+     * typical scenario is to add a single one and then remove it
+     * shortly after.
+     */
+    std::vector<UpstreamMsgIter *> _mUpstreamMsgItersToReload;
+
+    /*
+     * Which kind of clock class to expect from any incoming message.
+     *
+     * The very first received message determines this for all the
+     * following.
+     *
+     * For `ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID`, `*_mExpectedClkClsUuid`
+     * is the expected specific UUID.
+     */
+    _ClkClsExpectation _mClkClsExpectation = _ClkClsExpectation::ANY;
+    bt2s::optional<bt2c::Uuid> _mExpectedClkClsUuid;
+};
+
+} /* namespace bt2mux */
+
+#endif /* BABELTRACE_PLUGINS_UTILS_MUXER_MSG_ITER_HPP */
diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c
deleted file mode 100644 (file)
index 6fe2f00..0000000
+++ /dev/null
@@ -1,1600 +0,0 @@
-/*
- * SPDX-License-Identifier: MIT
- *
- * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
- */
-
-#define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp)
-#define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level)
-#define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER"
-#include "logging/comp-logging.h"
-
-#include "common/macros.h"
-#include "common/uuid.h"
-#include <babeltrace2/babeltrace.h>
-#include <glib.h>
-#include <stdbool.h>
-#include <inttypes.h>
-#include "common/assert.h"
-#include "common/common.h"
-#include <stdlib.h>
-#include <string.h>
-
-#include "plugins/common/muxing/muxing.h"
-#include "plugins/common/param-validation/param-validation.h"
-
-#include "muxer.h"
-
-struct muxer_comp {
-       /* Weak refs */
-       bt_self_component_filter *self_comp_flt;
-       bt_self_component *self_comp;
-
-       unsigned int next_port_num;
-       size_t available_input_ports;
-       bool initializing_muxer_msg_iter;
-       bt_logging_level log_level;
-};
-
-struct muxer_upstream_msg_iter {
-       struct muxer_comp *muxer_comp;
-
-       /* Owned by this, NULL if ended */
-       bt_message_iterator *msg_iter;
-
-       /* Contains `const bt_message *`, owned by this */
-       GPtrArray *msgs;
-
-       /* Index of the next message in `msgs` to return */
-       guint next_msg;
-};
-
-enum muxer_msg_iter_clock_class_expectation {
-       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
-       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
-       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
-       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
-       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
-};
-
-struct muxer_msg_iter {
-       struct muxer_comp *muxer_comp;
-
-       /* Weak */
-       bt_self_message_iterator *self_msg_iter;
-
-       /*
-        * Array of struct muxer_upstream_msg_iter * (owned by this).
-        *
-        * NOTE: This array is searched in linearly to find the youngest
-        * current message. Keep this until benchmarks confirm that
-        * another data structure is faster than this for our typical
-        * use cases.
-        */
-       GPtrArray *active_muxer_upstream_msg_iters;
-
-       /*
-        * Array of struct muxer_upstream_msg_iter * (owned by this).
-        *
-        * We move ended message iterators from
-        * `active_muxer_upstream_msg_iters` to this array so as to be
-        * able to restore them when seeking.
-        */
-       GPtrArray *ended_muxer_upstream_msg_iters;
-
-       /* Last time returned in a message */
-       int64_t last_returned_ts_ns;
-
-       /* Clock class expectation state */
-       enum muxer_msg_iter_clock_class_expectation clock_class_expectation;
-
-       /*
-        * Expected clock class UUID, only valid when
-        * clock_class_expectation is
-        * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
-        */
-       bt_uuid_t expected_clock_class_uuid;
-
-       /*
-        * Saved error.  If we hit an error in the _next method, but have some
-        * messages ready to return, we save the error here and return it on
-        * the next _next call.
-        */
-       bt_message_iterator_class_next_method_status next_saved_status;
-       const struct bt_error *next_saved_error;
-};
-
-static
-void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
-{
-       g_ptr_array_set_size(upstream_msg_iter->msgs, 0);
-}
-
-static
-void destroy_muxer_upstream_msg_iter(
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
-{
-       struct muxer_comp *muxer_comp;
-
-       if (!muxer_upstream_msg_iter) {
-               return;
-       }
-
-       muxer_comp = muxer_upstream_msg_iter->muxer_comp;
-       BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
-               "addr=%p, msg-iter-addr=%p, queue-len=%u, next-msg=%u",
-               muxer_upstream_msg_iter,
-               muxer_upstream_msg_iter->msg_iter,
-               muxer_upstream_msg_iter->msgs->len,
-               muxer_upstream_msg_iter->next_msg);
-
-       bt_message_iterator_put_ref(
-               muxer_upstream_msg_iter->msg_iter);
-
-       if (muxer_upstream_msg_iter->msgs) {
-               g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE);
-       }
-
-       g_free(muxer_upstream_msg_iter);
-}
-
-static
-int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
-               bt_message_iterator *self_msg_iter)
-{
-       int ret = 0;
-       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
-               g_new0(struct muxer_upstream_msg_iter, 1);
-       struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp;
-
-       if (!muxer_upstream_msg_iter) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Failed to allocate one muxer's upstream message iterator wrapper.");
-               goto error;
-       }
-
-       muxer_upstream_msg_iter->muxer_comp = muxer_comp;
-       muxer_upstream_msg_iter->msg_iter = self_msg_iter;
-       bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
-       muxer_upstream_msg_iter->msgs =
-               g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref);
-       if (!muxer_upstream_msg_iter->msgs) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Failed to allocate a GPtrArray.");
-               goto error;
-       }
-
-       g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
-               muxer_upstream_msg_iter);
-       BT_COMP_LOGD("Added muxer's upstream message iterator wrapper: "
-               "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
-               muxer_upstream_msg_iter, muxer_msg_iter,
-               self_msg_iter);
-
-       goto end;
-
-error:
-       destroy_muxer_upstream_msg_iter(muxer_upstream_msg_iter);
-       ret = -1;
-
-end:
-       return ret;
-}
-
-static
-bt_self_component_add_port_status add_available_input_port(
-               bt_self_component_filter *self_comp)
-{
-       struct muxer_comp *muxer_comp = bt_self_component_get_data(
-               bt_self_component_filter_as_self_component(self_comp));
-       bt_self_component_add_port_status status =
-               BT_SELF_COMPONENT_ADD_PORT_STATUS_OK;
-       GString *port_name = NULL;
-
-       BT_ASSERT(muxer_comp);
-       port_name = g_string_new("in");
-       if (!port_name) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, "Failed to allocate a GString.");
-               status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR;
-               goto end;
-       }
-
-       g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
-       status = bt_self_component_filter_add_input_port(
-               self_comp, port_name->str, NULL, NULL);
-       if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Cannot add input port to muxer component: "
-                       "port-name=\"%s\", comp-addr=%p, status=%s",
-                       port_name->str, self_comp,
-                       bt_common_func_status_string(status));
-               goto end;
-       }
-
-       muxer_comp->available_input_ports++;
-       muxer_comp->next_port_num++;
-       BT_COMP_LOGI("Added one input port to muxer component: "
-               "port-name=\"%s\", comp-addr=%p",
-               port_name->str, self_comp);
-
-end:
-       if (port_name) {
-               g_string_free(port_name, TRUE);
-       }
-
-       return status;
-}
-
-static
-bt_self_component_add_port_status create_output_port(
-               bt_self_component_filter *self_comp)
-{
-       return bt_self_component_filter_add_output_port(
-               self_comp, "out", NULL, NULL);
-}
-
-static
-void destroy_muxer_comp(struct muxer_comp *muxer_comp)
-{
-       if (!muxer_comp) {
-               return;
-       }
-
-       g_free(muxer_comp);
-}
-
-static
-struct bt_param_validation_map_value_entry_descr muxer_params[] = {
-       BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
-};
-
-bt_component_class_initialize_method_status muxer_init(
-               bt_self_component_filter *self_comp_flt,
-               bt_self_component_filter_configuration *config __attribute__((unused)),
-               const bt_value *params,
-               void *init_data __attribute__((unused)))
-{
-       bt_component_class_initialize_method_status status;
-       bt_self_component_add_port_status add_port_status;
-       bt_self_component *self_comp =
-               bt_self_component_filter_as_self_component(self_comp_flt);
-       struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
-       bt_logging_level log_level = bt_component_get_logging_level(
-               bt_self_component_as_component(self_comp));
-       enum bt_param_validation_status validation_status;
-       gchar *validate_error = NULL;
-
-       BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp,
-               "Initializing muxer component: "
-               "comp-addr=%p, params-addr=%p", self_comp, params);
-
-       if (!muxer_comp) {
-               /*
-                * Don't use BT_COMP_LOGE_APPEND_CAUSE, as `muxer_comp` is not
-                * initialized.
-                */
-               BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
-                       "Failed to allocate one muxer component.");
-               BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(self_comp,
-                       "Failed to allocate one muxer component.");
-                       status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-               goto error;
-       }
-
-       muxer_comp->log_level = log_level;
-       muxer_comp->self_comp = self_comp;
-       muxer_comp->self_comp_flt = self_comp_flt;
-
-       validation_status = bt_param_validation_validate(params,
-               muxer_params, &validate_error);
-       if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
-               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-               goto error;
-       } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
-               status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validate_error);
-               goto error;
-       }
-
-       bt_self_component_set_data(self_comp, muxer_comp);
-       add_port_status = add_available_input_port(self_comp_flt);
-       if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Cannot ensure that at least one muxer component's input port is available: "
-                       "muxer-comp-addr=%p, status=%s",
-                       muxer_comp, bt_common_func_status_string(add_port_status));
-               status = (int) add_port_status;
-               goto error;
-       }
-
-       add_port_status = create_output_port(self_comp_flt);
-       if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Cannot create muxer component's output port: "
-                       "muxer-comp-addr=%p, status=%s",
-                       muxer_comp, bt_common_func_status_string(add_port_status));
-               status = (int) add_port_status;
-               goto error;
-       }
-
-       BT_COMP_LOGI("Initialized muxer component: "
-               "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
-               self_comp, params, muxer_comp);
-
-       status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
-       goto end;
-
-error:
-       destroy_muxer_comp(muxer_comp);
-       bt_self_component_set_data(self_comp, NULL);
-
-end:
-       g_free(validate_error);
-       return status;
-}
-
-void muxer_finalize(bt_self_component_filter *self_comp)
-{
-       struct muxer_comp *muxer_comp = bt_self_component_get_data(
-               bt_self_component_filter_as_self_component(self_comp));
-
-       BT_COMP_LOGI("Finalizing muxer component: comp-addr=%p",
-               self_comp);
-       destroy_muxer_comp(muxer_comp);
-}
-
-static
-bt_message_iterator_create_from_message_iterator_status
-create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
-               struct muxer_msg_iter *muxer_msg_iter,
-               bt_self_component_port_input *self_port,
-               bt_message_iterator **msg_iter)
-{
-       const bt_port *port = bt_self_component_port_as_port(
-               bt_self_component_port_input_as_self_component_port(
-                       self_port));
-       bt_message_iterator_create_from_message_iterator_status
-               status;
-
-       BT_ASSERT(port);
-       BT_ASSERT(bt_port_is_connected(port));
-
-       // TODO: Advance the iterator to >= the time of the latest
-       //       returned message by the muxer message
-       //       iterator which creates it.
-       status = bt_message_iterator_create_from_message_iterator(
-               muxer_msg_iter->self_msg_iter, self_port, msg_iter);
-       if (status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Cannot create upstream message iterator on input port: "
-                       "port-addr=%p, port-name=\"%s\"",
-                       port, bt_port_get_name(port));
-               goto end;
-       }
-
-       BT_COMP_LOGI("Created upstream message iterator on input port: "
-               "port-addr=%p, port-name=\"%s\", msg-iter-addr=%p",
-               port, bt_port_get_name(port), msg_iter);
-
-end:
-       return status;
-}
-
-static
-bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
-               bool *is_ended)
-{
-       struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp;
-       bt_message_iterator_class_next_method_status status;
-       bt_message_iterator_next_status input_port_iter_status;
-       bt_message_array_const msgs;
-       uint64_t i;
-       uint64_t count;
-
-       BT_COMP_LOGD("Calling upstream message iterator's \"next\" method: "
-               "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p",
-               muxer_upstream_msg_iter,
-               muxer_upstream_msg_iter->msg_iter);
-       input_port_iter_status = bt_message_iterator_next(
-               muxer_upstream_msg_iter->msg_iter, &msgs, &count);
-       BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: "
-               "status=%s",
-               bt_common_func_status_string(input_port_iter_status));
-
-       switch (input_port_iter_status) {
-       case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
-               /*
-                * Message iterator's current message is
-                * valid: it must be considered for muxing operations.
-                */
-               BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
-               BT_ASSERT_DBG(count > 0);
-
-               g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count);
-               muxer_upstream_msg_iter->next_msg = 0;
-
-               /* Move messages to our queue */
-               for (i = 0; i < count; i++) {
-                       /*
-                        * Push to tail in order; other side
-                        * (muxer_msg_iter_do_next_one()) consumes
-                        * from the head first.
-                        */
-                       g_ptr_array_index(muxer_upstream_msg_iter->msgs, i)
-                               = (gpointer *) msgs[i];
-               }
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-               break;
-       case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
-               /*
-                * Message iterator's current message is not
-                * valid anymore. Return
-                * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately.
-                */
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
-               break;
-       case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:       /* Fall-through. */
-               /*
-                * Message iterator reached the end: release it. It
-                * won't be considered again to find the youngest
-                * message.
-                */
-               *is_ended = true;
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-               break;
-       case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
-       case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
-               /* Error status code */
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Upstream iterator's next method returned an error: status=%s",
-                       bt_common_func_status_string(input_port_iter_status));
-               status = (int) input_port_iter_status;
-               break;
-       default:
-               /* Unsupported status code */
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Unsupported status code: status=%s",
-                       bt_common_func_status_string(input_port_iter_status));
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
-               break;
-       }
-
-       return status;
-}
-
-static
-int get_msg_ts_ns(struct muxer_comp *muxer_comp,
-               struct muxer_msg_iter *muxer_msg_iter,
-               const bt_message *msg, int64_t last_returned_ts_ns,
-               int64_t *ts_ns)
-{
-       const bt_clock_snapshot *clock_snapshot = NULL;
-       int ret = 0;
-       const bt_stream_class *stream_class = NULL;
-       bt_message_type msg_type;
-
-       BT_ASSERT_DBG(msg);
-       BT_ASSERT_DBG(ts_ns);
-       BT_COMP_LOGD("Getting message's timestamp: "
-               "muxer-msg-iter-addr=%p, msg-addr=%p, "
-               "last-returned-ts=%" PRId64,
-               muxer_msg_iter, msg, last_returned_ts_ns);
-
-       if (G_UNLIKELY(muxer_msg_iter->clock_class_expectation ==
-                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
-               *ts_ns = last_returned_ts_ns;
-               goto end;
-       }
-
-       msg_type = bt_message_get_type(msg);
-
-       if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) {
-               stream_class = bt_stream_borrow_class_const(
-                       bt_packet_borrow_stream_const(
-                               bt_message_packet_beginning_borrow_packet_const(
-                                       msg)));
-       } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_END)) {
-               stream_class = bt_stream_borrow_class_const(
-                       bt_packet_borrow_stream_const(
-                               bt_message_packet_end_borrow_packet_const(
-                                       msg)));
-       } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) {
-               stream_class = bt_stream_borrow_class_const(
-                       bt_message_discarded_events_borrow_stream_const(msg));
-       } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) {
-               stream_class = bt_stream_borrow_class_const(
-                       bt_message_discarded_packets_borrow_stream_const(msg));
-       }
-
-       switch (msg_type) {
-       case BT_MESSAGE_TYPE_EVENT:
-               BT_ASSERT_DBG(bt_message_event_borrow_stream_class_default_clock_class_const(
-                               msg));
-               clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
-                       msg);
-               break;
-       case BT_MESSAGE_TYPE_PACKET_BEGINNING:
-               if (bt_stream_class_packets_have_beginning_default_clock_snapshot(
-                               stream_class)) {
-                       clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
-                               msg);
-               } else {
-                       goto no_clock_snapshot;
-               }
-
-               break;
-       case BT_MESSAGE_TYPE_PACKET_END:
-               if (bt_stream_class_packets_have_end_default_clock_snapshot(
-                               stream_class)) {
-                       clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
-                               msg);
-               } else {
-                       goto no_clock_snapshot;
-               }
-
-               break;
-       case BT_MESSAGE_TYPE_STREAM_BEGINNING:
-       {
-               enum bt_message_stream_clock_snapshot_state snapshot_state =
-                       bt_message_stream_beginning_borrow_default_clock_snapshot_const(
-                               msg, &clock_snapshot);
-               if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
-                       goto no_clock_snapshot;
-               }
-
-               break;
-       }
-       case BT_MESSAGE_TYPE_STREAM_END:
-       {
-               enum bt_message_stream_clock_snapshot_state snapshot_state =
-                       bt_message_stream_end_borrow_default_clock_snapshot_const(
-                               msg, &clock_snapshot);
-               if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
-                       goto no_clock_snapshot;
-               }
-
-               break;
-       }
-       case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
-               if (bt_stream_class_discarded_events_have_default_clock_snapshots(
-                               stream_class)) {
-                       clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
-                               msg);
-               } else {
-                       goto no_clock_snapshot;
-               }
-
-               break;
-       case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
-               if (bt_stream_class_discarded_packets_have_default_clock_snapshots(
-                               stream_class)) {
-                       clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
-                               msg);
-               } else {
-                       goto no_clock_snapshot;
-               }
-
-               break;
-       case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
-               clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(
-                       msg);
-               break;
-       default:
-               /* All the other messages have a higher priority */
-               BT_COMP_LOGD_STR("Message has no timestamp: using the last returned timestamp.");
-               *ts_ns = last_returned_ts_ns;
-               goto end;
-       }
-
-       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
-       if (ret) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Cannot get nanoseconds from Epoch of clock snapshot: "
-                       "clock-snapshot-addr=%p", clock_snapshot);
-               goto error;
-       }
-
-       goto end;
-
-no_clock_snapshot:
-       BT_COMP_LOGD_STR("Message's default clock snapshot is missing: "
-               "using the last returned timestamp.");
-       *ts_ns = last_returned_ts_ns;
-       goto end;
-
-error:
-       ret = -1;
-
-end:
-       if (ret == 0) {
-               BT_COMP_LOGD("Found message's timestamp: "
-                       "muxer-msg-iter-addr=%p, msg-addr=%p, "
-                       "last-returned-ts=%" PRId64 ", ts=%" PRId64,
-                       muxer_msg_iter, msg, last_returned_ts_ns,
-                       *ts_ns);
-       }
-
-       return ret;
-}
-
-static inline
-int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
-               struct muxer_comp *muxer_comp,
-               const bt_clock_class *clock_class)
-{
-       int ret = 0;
-       const uint8_t *cc_uuid;
-       const char *cc_name;
-
-       BT_ASSERT_DBG(clock_class);
-       cc_uuid = bt_clock_class_get_uuid(clock_class);
-       cc_name = bt_clock_class_get_name(clock_class);
-
-       if (muxer_msg_iter->clock_class_expectation ==
-                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
-               /*
-                * This is the first clock class that this muxer message
-                * iterator encounters. Its properties determine what to expect
-                * for the whole lifetime of the iterator.
-                */
-               if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
-                       /* Expect absolute clock classes */
-                       muxer_msg_iter->clock_class_expectation =
-                               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
-               } else {
-                       if (cc_uuid) {
-                               /*
-                                * Expect non-absolute clock classes
-                                * with a specific UUID.
-                                */
-                               muxer_msg_iter->clock_class_expectation =
-                                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
-                               bt_uuid_copy(muxer_msg_iter->expected_clock_class_uuid, cc_uuid);
-                       } else {
-                               /*
-                                * Expect non-absolute clock classes
-                                * with no UUID.
-                                */
-                               muxer_msg_iter->clock_class_expectation =
-                                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
-                       }
-               }
-       }
-
-       switch (muxer_msg_iter->clock_class_expectation) {
-       case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
-               if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting an absolute clock class, "
-                               "but got a non-absolute one: "
-                               "clock-class-addr=%p, clock-class-name=\"%s\"",
-                               clock_class, cc_name);
-                       goto error;
-               }
-               break;
-       case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
-               if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting a non-absolute clock class with no UUID, "
-                               "but got an absolute one: "
-                               "clock-class-addr=%p, clock-class-name=\"%s\"",
-                               clock_class, cc_name);
-                       goto error;
-               }
-
-               if (cc_uuid) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting a non-absolute clock class with no UUID, "
-                               "but got one with a UUID: "
-                               "clock-class-addr=%p, clock-class-name=\"%s\", "
-                               "uuid=\"" BT_UUID_FMT "\"",
-                               clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid));
-                       goto error;
-               }
-               break;
-       case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
-               if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting a non-absolute clock class with a specific UUID, "
-                               "but got an absolute one: "
-                               "clock-class-addr=%p, clock-class-name=\"%s\"",
-                               clock_class, cc_name);
-                       goto error;
-               }
-
-               if (!cc_uuid) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting a non-absolute clock class with a specific UUID, "
-                               "but got one with no UUID: "
-                               "clock-class-addr=%p, clock-class-name=\"%s\"",
-                               clock_class, cc_name);
-                       goto error;
-               }
-
-               if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting a non-absolute clock class with a specific UUID, "
-                               "but got one with different UUID: "
-                               "clock-class-addr=%p, clock-class-name=\"%s\", "
-                               "expected-uuid=\"" BT_UUID_FMT "\", "
-                               "uuid=\"" BT_UUID_FMT "\"",
-                               clock_class, cc_name,
-                               BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid),
-                               BT_UUID_FMT_VALUES(cc_uuid));
-                       goto error;
-               }
-               break;
-       case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Expecting no clock class, but got one: "
-                       "clock-class-addr=%p, clock-class-name=\"%s\"",
-                       clock_class, cc_name);
-               goto error;
-       default:
-               /* Unexpected */
-               BT_COMP_LOGF("Unexpected clock class expectation: "
-                       "expectation-code=%d",
-                       muxer_msg_iter->clock_class_expectation);
-               bt_common_abort();
-       }
-
-       goto end;
-
-error:
-       ret = -1;
-
-end:
-       return ret;
-}
-
-static inline
-int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
-               struct muxer_comp *muxer_comp, const bt_stream *stream)
-{
-       int ret = 0;
-       const bt_stream_class *stream_class =
-               bt_stream_borrow_class_const(stream);
-       const bt_clock_class *clock_class =
-               bt_stream_class_borrow_default_clock_class_const(stream_class);
-
-       if (!clock_class) {
-               if (muxer_msg_iter->clock_class_expectation ==
-                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
-                       /* Expect no clock class */
-                       muxer_msg_iter->clock_class_expectation =
-                               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
-               } else if (muxer_msg_iter->clock_class_expectation !=
-                               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Expecting stream class without a default clock class: "
-                               "stream-class-addr=%p, stream-class-name=\"%s\", "
-                               "stream-class-id=%" PRIu64,
-                               stream_class, bt_stream_class_get_name(stream_class),
-                               bt_stream_class_get_id(stream_class));
-                       ret = -1;
-               }
-
-               goto end;
-       }
-
-       ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
-
-end:
-       return ret;
-}
-
-/*
- * This function finds the youngest available message amongst the
- * non-ended upstream message iterators and returns the upstream
- * message iterator which has it, or
- * BT_MESSAGE_ITERATOR_STATUS_END if there's no available
- * message.
- *
- * This function does NOT:
- *
- * * Update any upstream message iterator.
- * * Check the upstream message iterators to retry.
- *
- * On sucess, this function sets *muxer_upstream_msg_iter to the
- * upstream message iterator of which the current message is
- * the youngest, and sets *ts_ns to its time.
- */
-static
-bt_message_iterator_class_next_method_status
-muxer_msg_iter_youngest_upstream_msg_iter(
-               struct muxer_comp *muxer_comp,
-               struct muxer_msg_iter *muxer_msg_iter,
-               struct muxer_upstream_msg_iter **muxer_upstream_msg_iter,
-               int64_t *ts_ns)
-{
-       size_t i;
-       int ret;
-       int64_t youngest_ts_ns = INT64_MAX;
-       bt_message_iterator_class_next_method_status status =
-               BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-
-       BT_ASSERT_DBG(muxer_comp);
-       BT_ASSERT_DBG(muxer_msg_iter);
-       BT_ASSERT_DBG(muxer_upstream_msg_iter);
-       *muxer_upstream_msg_iter = NULL;
-
-       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
-                       i++) {
-               const bt_message *msg;
-               struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
-                       g_ptr_array_index(
-                               muxer_msg_iter->active_muxer_upstream_msg_iters,
-                               i);
-               int64_t msg_ts_ns;
-
-               if (!cur_muxer_upstream_msg_iter->msg_iter) {
-                       /* This upstream message iterator is ended */
-                       BT_COMP_LOGT("Skipping ended upstream message iterator: "
-                               "muxer-upstream-msg-iter-wrap-addr=%p",
-                               cur_muxer_upstream_msg_iter);
-                       continue;
-               }
-
-               BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg <
-                       cur_muxer_upstream_msg_iter->msgs->len);
-               msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs,
-                       cur_muxer_upstream_msg_iter->next_msg);
-               BT_ASSERT_DBG(msg);
-
-               if (G_UNLIKELY(bt_message_get_type(msg) ==
-                               BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
-                       ret = validate_new_stream_clock_class(
-                               muxer_msg_iter, muxer_comp,
-                               bt_message_stream_beginning_borrow_stream_const(
-                                       msg));
-                       if (ret) {
-                               /*
-                                * validate_new_stream_clock_class() logs
-                                * errors.
-                                */
-                               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
-                               goto end;
-                       }
-               } else if (G_UNLIKELY(bt_message_get_type(msg) ==
-                               BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
-                       const bt_clock_snapshot *cs;
-
-                       cs = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(
-                               msg);
-                       ret = validate_clock_class(muxer_msg_iter, muxer_comp,
-                               bt_clock_snapshot_borrow_clock_class_const(cs));
-                       if (ret) {
-                               /* validate_clock_class() logs errors */
-                               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
-                               goto end;
-                       }
-               }
-
-               ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
-                       muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
-               if (ret) {
-                       /* get_msg_ts_ns() logs errors */
-                       *muxer_upstream_msg_iter = NULL;
-                       status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
-                       goto end;
-               }
-
-               /*
-                * Update the current message iterator if it has not been set
-                * yet, or if its current message has a timestamp smaller than
-                * the previously selected youngest message.
-                */
-               if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) ||
-                               msg_ts_ns < youngest_ts_ns) {
-                       *muxer_upstream_msg_iter =
-                               cur_muxer_upstream_msg_iter;
-                       youngest_ts_ns = msg_ts_ns;
-                       *ts_ns = youngest_ts_ns;
-               } else if (msg_ts_ns == youngest_ts_ns) {
-                       /*
-                        * The currently selected message to be sent downstream
-                        * next has the exact same timestamp that of the
-                        * current candidate message. We must break the tie
-                        * in a predictable manner.
-                        */
-                       BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg <
-                               (*muxer_upstream_msg_iter)->msgs->len);
-                       const bt_message *selected_msg =
-                               g_ptr_array_index((*muxer_upstream_msg_iter)->msgs,
-                                       (*muxer_upstream_msg_iter)->next_msg);
-                       BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
-
-                       /*
-                        * Order the messages in an arbitrary but determinitic
-                        * way.
-                        */
-                       ret = common_muxing_compare_messages(msg, selected_msg);
-                       if (ret < 0) {
-                               /*
-                                * The `msg` should go first. Update the next
-                                * iterator and the current timestamp.
-                                */
-                               *muxer_upstream_msg_iter =
-                                       cur_muxer_upstream_msg_iter;
-                               youngest_ts_ns = msg_ts_ns;
-                               *ts_ns = youngest_ts_ns;
-                       } else if (ret == 0) {
-                               /* Unable to pick which one should go first. */
-                               BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: "
-                                       "muxer-upstream-msg-iter-wrap-addr=%p"
-                                       "cur-muxer-upstream-msg-iter-wrap-addr=%p",
-                                       *muxer_upstream_msg_iter,
-                                       cur_muxer_upstream_msg_iter);
-                       }
-               }
-       }
-
-       if (!*muxer_upstream_msg_iter) {
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
-               *ts_ns = INT64_MIN;
-       }
-
-end:
-       return status;
-}
-
-static
-bt_message_iterator_class_next_method_status
-validate_muxer_upstream_msg_iter(
-       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
-       bool *is_ended)
-{
-       struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp;
-       bt_message_iterator_class_next_method_status status;
-
-       BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: "
-               "muxer-upstream-msg-iter-wrap-addr=%p",
-               muxer_upstream_msg_iter);
-
-       if (muxer_upstream_msg_iter->next_msg < muxer_upstream_msg_iter->msgs->len ||
-                       !muxer_upstream_msg_iter->msg_iter) {
-               BT_COMP_LOGD("Already valid or not considered: "
-                       "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p",
-                       muxer_upstream_msg_iter->msgs->len,
-                       muxer_upstream_msg_iter->next_msg,
-                       muxer_upstream_msg_iter->msg_iter);
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-               goto end;
-       }
-
-       /* muxer_upstream_msg_iter_next() logs details/errors */
-       status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
-               is_ended);
-
-end:
-       return status;
-}
-
-static
-bt_message_iterator_class_next_method_status
-validate_muxer_upstream_msg_iters(
-               struct muxer_msg_iter *muxer_msg_iter)
-{
-       struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp;
-       bt_message_iterator_class_next_method_status status;
-       size_t i;
-
-       BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: "
-               "muxer-msg-iter-addr=%p", muxer_msg_iter);
-
-       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
-                       i++) {
-               bool is_ended = false;
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
-                       g_ptr_array_index(
-                               muxer_msg_iter->active_muxer_upstream_msg_iters,
-                               i);
-
-               status = validate_muxer_upstream_msg_iter(
-                       muxer_upstream_msg_iter, &is_ended);
-               if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
-                       if (status < 0) {
-                               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                                       "Cannot validate muxer's upstream message iterator wrapper: "
-                                       "muxer-msg-iter-addr=%p, "
-                                       "muxer-upstream-msg-iter-wrap-addr=%p",
-                                       muxer_msg_iter,
-                                       muxer_upstream_msg_iter);
-                       } else {
-                               BT_COMP_LOGD("Cannot validate muxer's upstream message iterator wrapper: "
-                                       "muxer-msg-iter-addr=%p, "
-                                       "muxer-upstream-msg-iter-wrap-addr=%p",
-                                       muxer_msg_iter,
-                                       muxer_upstream_msg_iter);
-                       }
-
-                       goto end;
-               }
-
-               /*
-                * Move this muxer upstream message iterator to the
-                * array of ended iterators if it's ended.
-                */
-               if (G_UNLIKELY(is_ended)) {
-                       BT_COMP_LOGD("Muxer's upstream message iterator wrapper: ended or canceled: "
-                               "muxer-msg-iter-addr=%p, "
-                               "muxer-upstream-msg-iter-wrap-addr=%p",
-                               muxer_msg_iter, muxer_upstream_msg_iter);
-                       g_ptr_array_add(
-                               muxer_msg_iter->ended_muxer_upstream_msg_iters,
-                               muxer_upstream_msg_iter);
-                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
-
-                       /*
-                        * Use g_ptr_array_remove_fast() because the
-                        * order of those elements is not important.
-                        */
-                       g_ptr_array_remove_index_fast(
-                               muxer_msg_iter->active_muxer_upstream_msg_iters,
-                               i);
-                       i--;
-               }
-       }
-
-       status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-
-end:
-       return status;
-}
-
-static inline
-bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one(
-               struct muxer_comp *muxer_comp,
-               struct muxer_msg_iter *muxer_msg_iter,
-               const bt_message **msg)
-{
-       bt_message_iterator_class_next_method_status status;
-       struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
-       /* Initialize to avoid -Wmaybe-uninitialized warning with gcc 4.8. */
-       int64_t next_return_ts = 0;
-
-       status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
-       if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
-               /* validate_muxer_upstream_msg_iters() logs details */
-               goto end;
-       }
-
-       /*
-        * At this point we know that all the existing upstream
-        * message iterators are valid. We can find the one,
-        * amongst those, of which the current message is the
-        * youngest.
-        */
-       status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
-                       muxer_msg_iter, &muxer_upstream_msg_iter,
-                       &next_return_ts);
-       if (status < 0 || status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) {
-               if (status < 0) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Cannot find the youngest upstream message iterator wrapper: "
-                               "status=%s",
-                               bt_common_func_status_string(status));
-               } else {
-                       BT_COMP_LOGD("Cannot find the youngest upstream message iterator wrapper: "
-                               "status=%s",
-                               bt_common_func_status_string(status));
-               }
-
-               goto end;
-       }
-
-       if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: "
-                       "muxer-msg-iter-addr=%p, ts=%" PRId64 ", "
-                       "last-returned-ts=%" PRId64,
-                       muxer_msg_iter, next_return_ts,
-                       muxer_msg_iter->last_returned_ts_ns);
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
-               goto end;
-       }
-
-       BT_COMP_LOGD("Found youngest upstream message iterator wrapper: "
-               "muxer-msg-iter-addr=%p, "
-               "muxer-upstream-msg-iter-wrap-addr=%p, "
-               "ts=%" PRId64,
-               muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts);
-       BT_ASSERT_DBG(status ==
-               BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
-       BT_ASSERT_DBG(muxer_upstream_msg_iter);
-
-       /*
-        * Consume from the queue's head: other side
-        * (muxer_upstream_msg_iter_next()) writes to the tail.
-        */
-       *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs,
-               muxer_upstream_msg_iter->next_msg);
-       g_ptr_array_index(muxer_upstream_msg_iter->msgs,
-               muxer_upstream_msg_iter->next_msg) = NULL;
-       ++muxer_upstream_msg_iter->next_msg;
-       BT_ASSERT_DBG(*msg);
-       muxer_msg_iter->last_returned_ts_ns = next_return_ts;
-
-end:
-       return status;
-}
-
-static
-bt_message_iterator_class_next_method_status muxer_msg_iter_do_next(
-               struct muxer_comp *muxer_comp,
-               struct muxer_msg_iter *muxer_msg_iter,
-               bt_message_array_const msgs, uint64_t capacity,
-               uint64_t *count)
-{
-       bt_message_iterator_class_next_method_status status;
-       uint64_t i = 0;
-
-       if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) {
-               /*
-                * Last time we were called, we hit an error but had some
-                * messages to deliver, so we stashed the error here.  Return
-                * it now.
-                */
-               BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error);
-               status = muxer_msg_iter->next_saved_status;
-               goto end;
-       }
-
-       do {
-               status = muxer_msg_iter_do_next_one(muxer_comp,
-                       muxer_msg_iter, &msgs[i]);
-               if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
-                       i++;
-               }
-       } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
-
-       if (i > 0) {
-               /*
-                * Even if muxer_msg_iter_do_next_one() returned
-                * something else than
-                * BT_MESSAGE_ITERATOR_STATUS_OK, we accumulated
-                * message objects in the output message
-                * array, so we need to return
-                * BT_MESSAGE_ITERATOR_STATUS_OK so that they are
-                * transfered to downstream. This other status occurs
-                * again the next time muxer_msg_iter_do_next() is
-                * called, possibly without any accumulated
-                * message, in which case we'll return it.
-                */
-               if (status < 0) {
-                       /*
-                        * Save this error for the next _next call.  Assume that
-                        * this component always appends error causes when
-                        * returning an error status code, which will cause the
-                        * current thread error to be non-NULL.
-                        */
-                       muxer_msg_iter->next_saved_error = bt_current_thread_take_error();
-                       BT_ASSERT(muxer_msg_iter->next_saved_error);
-                       muxer_msg_iter->next_saved_status = status;
-               }
-
-               *count = i;
-               status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-       }
-
-end:
-       return status;
-}
-
-static
-void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
-{
-       struct muxer_comp *muxer_comp;
-
-       if (!muxer_msg_iter) {
-               return;
-       }
-
-       muxer_comp = muxer_msg_iter->muxer_comp;
-       BT_COMP_LOGD("Destroying muxer component's message iterator: "
-               "muxer-msg-iter-addr=%p", muxer_msg_iter);
-
-       if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
-               BT_COMP_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
-               g_ptr_array_free(
-                       muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
-       }
-
-       if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
-               BT_COMP_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
-               g_ptr_array_free(
-                       muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
-       }
-
-       g_free(muxer_msg_iter);
-}
-
-static
-bt_message_iterator_class_initialize_method_status
-muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
-               struct muxer_msg_iter *muxer_msg_iter,
-               struct bt_self_message_iterator_configuration *config)
-{
-       int64_t count;
-       int64_t i;
-       bt_message_iterator_class_initialize_method_status status;
-       bool can_seek_forward = true;
-
-       count = bt_component_filter_get_input_port_count(
-               bt_self_component_filter_as_component_filter(
-                       muxer_comp->self_comp_flt));
-       if (count < 0) {
-               BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: "
-                       "muxer-comp-addr=%p, muxer-msg-iter-addr=%p",
-                       muxer_comp, muxer_msg_iter);
-               status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
-               goto end;
-       }
-
-       for (i = 0; i < count; i++) {
-               bt_message_iterator *upstream_msg_iter;
-               bt_self_component_port_input *self_port =
-                       bt_self_component_filter_borrow_input_port_by_index(
-                               muxer_comp->self_comp_flt, i);
-               const bt_port *port;
-               bt_message_iterator_create_from_message_iterator_status
-                       msg_iter_status;
-               int int_status;
-
-               BT_ASSERT(self_port);
-               port = bt_self_component_port_as_port(
-                       bt_self_component_port_input_as_self_component_port(
-                               self_port));
-               BT_ASSERT(port);
-
-               if (!bt_port_is_connected(port)) {
-                       /* Skip non-connected port */
-                       continue;
-               }
-
-               msg_iter_status = create_msg_iter_on_input_port(muxer_comp,
-                       muxer_msg_iter, self_port, &upstream_msg_iter);
-               if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
-                       /* create_msg_iter_on_input_port() logs errors */
-                       status = (int) msg_iter_status;
-                       goto end;
-               }
-
-               int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter,
-                       upstream_msg_iter);
-               bt_message_iterator_put_ref(
-                       upstream_msg_iter);
-               if (int_status) {
-                       status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
-                       /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
-                       goto end;
-               }
-
-               can_seek_forward = can_seek_forward &&
-                       bt_message_iterator_can_seek_forward(
-                               upstream_msg_iter);
-       }
-
-       /*
-        * This iterator can seek forward if all of its iterators can seek
-        * forward.
-        */
-       bt_self_message_iterator_configuration_set_can_seek_forward(
-               config, can_seek_forward);
-
-       status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
-
-end:
-       return status;
-}
-
-bt_message_iterator_class_initialize_method_status muxer_msg_iter_init(
-               bt_self_message_iterator *self_msg_iter,
-               bt_self_message_iterator_configuration *config,
-               bt_self_component_port_output *port __attribute__((unused)))
-{
-       struct muxer_comp *muxer_comp = NULL;
-       struct muxer_msg_iter *muxer_msg_iter = NULL;
-       bt_message_iterator_class_initialize_method_status status;
-       bt_self_component *self_comp =
-               bt_self_message_iterator_borrow_component(self_msg_iter);
-
-       muxer_comp = bt_self_component_get_data(self_comp);
-       BT_ASSERT(muxer_comp);
-       BT_COMP_LOGD("Initializing muxer component's message iterator: "
-               "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p",
-               self_comp, muxer_comp, self_msg_iter);
-
-       if (muxer_comp->initializing_muxer_msg_iter) {
-               /*
-                * Weird, unhandled situation detected: downstream
-                * creates a muxer message iterator while creating
-                * another muxer message iterator (same component).
-                */
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Recursive initialization of muxer component's message iterator: "
-                       "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p",
-                       self_comp, muxer_comp, self_msg_iter);
-               status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
-               goto error;
-       }
-
-       muxer_comp->initializing_muxer_msg_iter = true;
-       muxer_msg_iter = g_new0(struct muxer_msg_iter, 1);
-       if (!muxer_msg_iter) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Failed to allocate one muxer component's message iterator.");
-               status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-               goto error;
-       }
-
-       muxer_msg_iter->muxer_comp = muxer_comp;
-       muxer_msg_iter->self_msg_iter = self_msg_iter;
-       muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
-       muxer_msg_iter->active_muxer_upstream_msg_iters =
-               g_ptr_array_new_with_free_func(
-                       (GDestroyNotify) destroy_muxer_upstream_msg_iter);
-       if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, "Failed to allocate a GPtrArray.");
-               status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-               goto error;
-       }
-
-       muxer_msg_iter->ended_muxer_upstream_msg_iters =
-               g_ptr_array_new_with_free_func(
-                       (GDestroyNotify) destroy_muxer_upstream_msg_iter);
-       if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp, "Failed to allocate a GPtrArray.");
-               status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-               goto error;
-       }
-
-       status = muxer_msg_iter_init_upstream_iterators(muxer_comp,
-               muxer_msg_iter, config);
-       if (status) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Cannot initialize connected input ports for muxer component's message iterator: "
-                       "comp-addr=%p, muxer-comp-addr=%p, "
-                       "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d",
-                       self_comp, muxer_comp, muxer_msg_iter,
-                       self_msg_iter, status);
-               goto error;
-       }
-
-       bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter);
-       BT_COMP_LOGD("Initialized muxer component's message iterator: "
-               "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
-               "msg-iter-addr=%p",
-               self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
-       goto end;
-
-error:
-       destroy_muxer_msg_iter(muxer_msg_iter);
-       bt_self_message_iterator_set_data(self_msg_iter, NULL);
-
-end:
-       muxer_comp->initializing_muxer_msg_iter = false;
-       return status;
-}
-
-void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
-{
-       struct muxer_msg_iter *muxer_msg_iter =
-               bt_self_message_iterator_get_data(self_msg_iter);
-       bt_self_component *self_comp = NULL;
-       struct muxer_comp *muxer_comp = NULL;
-
-       self_comp = bt_self_message_iterator_borrow_component(
-               self_msg_iter);
-       BT_ASSERT(self_comp);
-       muxer_comp = bt_self_component_get_data(self_comp);
-       BT_COMP_LOGD("Finalizing muxer component's message iterator: "
-               "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
-               "msg-iter-addr=%p",
-               self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
-
-       if (muxer_msg_iter) {
-               destroy_muxer_msg_iter(muxer_msg_iter);
-       }
-}
-
-bt_message_iterator_class_next_method_status muxer_msg_iter_next(
-               bt_self_message_iterator *self_msg_iter,
-               bt_message_array_const msgs, uint64_t capacity,
-               uint64_t *count)
-{
-       bt_message_iterator_class_next_method_status status;
-       struct muxer_msg_iter *muxer_msg_iter =
-               bt_self_message_iterator_get_data(self_msg_iter);
-       bt_self_component *self_comp = NULL;
-       struct muxer_comp *muxer_comp = NULL;
-
-       BT_ASSERT_DBG(muxer_msg_iter);
-       self_comp = bt_self_message_iterator_borrow_component(
-               self_msg_iter);
-       BT_ASSERT_DBG(self_comp);
-       muxer_comp = bt_self_component_get_data(self_comp);
-       BT_ASSERT_DBG(muxer_comp);
-       BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: "
-               "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
-               "msg-iter-addr=%p",
-               self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
-
-       status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter,
-               msgs, capacity, count);
-       if (status < 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Cannot get next message: "
-                       "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
-                       "msg-iter-addr=%p, status=%s",
-                       self_comp, muxer_comp, muxer_msg_iter, self_msg_iter,
-                       bt_common_func_status_string(status));
-       } else {
-               BT_COMP_LOGT("Returning from muxer component's message iterator's \"next\" method: "
-                       "status=%s",
-                       bt_common_func_status_string(status));
-       }
-
-       return status;
-}
-
-bt_component_class_port_connected_method_status muxer_input_port_connected(
-               bt_self_component_filter *self_comp,
-               bt_self_component_port_input *self_port __attribute__((unused)),
-               const bt_port_output *other_port __attribute__((unused)))
-{
-       bt_component_class_port_connected_method_status status =
-               BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_OK;
-       bt_self_component_add_port_status add_port_status;
-       struct muxer_comp *muxer_comp = bt_self_component_get_data(
-               bt_self_component_filter_as_self_component(self_comp));
-
-       add_port_status = add_available_input_port(self_comp);
-       if (add_port_status) {
-               BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                       "Cannot add one muxer component's input port: status=%s",
-                       bt_common_func_status_string(add_port_status));
-
-               if (add_port_status ==
-                               BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) {
-                       status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_MEMORY_ERROR;
-               } else {
-                       status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_ERROR;
-               }
-
-               goto end;
-       }
-
-end:
-       return status;
-}
-
-static inline
-bt_message_iterator_class_can_seek_beginning_method_status
-muxer_upstream_msg_iters_can_all_seek_beginning(
-               struct muxer_comp *muxer_comp,
-               GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek)
-{
-       bt_message_iterator_class_can_seek_beginning_method_status status =
-               BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK;
-       uint64_t i;
-
-       for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
-               struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_upstream_msg_iters->pdata[i];
-               status = (int) bt_message_iterator_can_seek_beginning(
-                       upstream_msg_iter->msg_iter, can_seek);
-               if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) {
-                       BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
-                               "Failed to determine whether upstream message iterator can seek beginning: "
-                               "msg-iter-addr=%p", upstream_msg_iter->msg_iter);
-                       goto end;
-               }
-
-               if (!*can_seek) {
-                       goto end;
-               }
-       }
-
-       *can_seek = BT_TRUE;
-
-end:
-       return status;
-}
-
-bt_message_iterator_class_can_seek_beginning_method_status
-muxer_msg_iter_can_seek_beginning(
-               bt_self_message_iterator *self_msg_iter, bt_bool *can_seek)
-{
-       struct muxer_msg_iter *muxer_msg_iter =
-               bt_self_message_iterator_get_data(self_msg_iter);
-       bt_message_iterator_class_can_seek_beginning_method_status status;
-
-       status = muxer_upstream_msg_iters_can_all_seek_beginning(
-               muxer_msg_iter->muxer_comp,
-               muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek);
-       if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) {
-               goto end;
-       }
-
-       if (!*can_seek) {
-               goto end;
-       }
-
-       status = muxer_upstream_msg_iters_can_all_seek_beginning(
-               muxer_msg_iter->muxer_comp,
-               muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek);
-
-end:
-       return status;
-}
-
-bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning(
-               bt_self_message_iterator *self_msg_iter)
-{
-       struct muxer_msg_iter *muxer_msg_iter =
-               bt_self_message_iterator_get_data(self_msg_iter);
-       bt_message_iterator_class_seek_beginning_method_status status =
-               BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK;
-       bt_message_iterator_seek_beginning_status seek_beg_status;
-       uint64_t i;
-
-       /* Seek all ended upstream iterators first */
-       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
-                       i++) {
-               struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
-
-               seek_beg_status = bt_message_iterator_seek_beginning(
-                       upstream_msg_iter->msg_iter);
-               if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) {
-                       status = (int) seek_beg_status;
-                       goto end;
-               }
-
-               empty_message_queue(upstream_msg_iter);
-       }
-
-       /* Seek all previously active upstream iterators */
-       for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
-                       i++) {
-               struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
-
-               seek_beg_status = bt_message_iterator_seek_beginning(
-                       upstream_msg_iter->msg_iter);
-               if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) {
-                       status = (int) seek_beg_status;
-                       goto end;
-               }
-
-               empty_message_queue(upstream_msg_iter);
-       }
-
-       /* Make them all active */
-       for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
-                       i++) {
-               struct muxer_upstream_msg_iter *upstream_msg_iter =
-                       muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
-
-               g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
-                       upstream_msg_iter);
-               muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
-       }
-
-       /*
-        * GLib < 2.48.0 asserts when g_ptr_array_remove_range() is
-        * called on an empty array.
-        */
-       if (muxer_msg_iter->ended_muxer_upstream_msg_iters->len > 0) {
-               g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
-                       0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
-       }
-       muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
-       muxer_msg_iter->clock_class_expectation =
-               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
-
-end:
-       return status;
-}
diff --git a/src/plugins/utils/muxer/muxer.h b/src/plugins/utils/muxer/muxer.h
deleted file mode 100644 (file)
index 9abb851..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * SPDX-License-Identifier: MIT
- *
- * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
- */
-
-#ifndef BABELTRACE_PLUGINS_UTILS_MUXER_H
-#define BABELTRACE_PLUGINS_UTILS_MUXER_H
-
-#include <stdint.h>
-#include <babeltrace2/babeltrace.h>
-#include "common/macros.h"
-
-bt_component_class_initialize_method_status muxer_init(
-               bt_self_component_filter *self_comp,
-               bt_self_component_filter_configuration *config,
-               const bt_value *params, void *init_data);
-
-void muxer_finalize(bt_self_component_filter *self_comp);
-
-bt_message_iterator_class_initialize_method_status muxer_msg_iter_init(
-               bt_self_message_iterator *self_msg_iter,
-               bt_self_message_iterator_configuration *config,
-               bt_self_component_port_output *self_port);
-
-void muxer_msg_iter_finalize(
-               bt_self_message_iterator *self_msg_iter);
-
-bt_message_iterator_class_next_method_status muxer_msg_iter_next(
-               bt_self_message_iterator *self_msg_iter,
-               bt_message_array_const msgs, uint64_t capacity,
-               uint64_t *count);
-
-bt_component_class_port_connected_method_status muxer_input_port_connected(
-               bt_self_component_filter *comp,
-               bt_self_component_port_input *self_port,
-               const bt_port_output *other_port);
-
-bt_message_iterator_class_can_seek_beginning_method_status
-muxer_msg_iter_can_seek_beginning(
-               bt_self_message_iterator *message_iterator, bt_bool *can_seek);
-
-bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning(
-               bt_self_message_iterator *message_iterator);
-
-#endif /* BABELTRACE_PLUGINS_UTILS_MUXER_H */
diff --git a/src/plugins/utils/muxer/upstream-msg-iter.cpp b/src/plugins/utils/muxer/upstream-msg-iter.cpp
new file mode 100644 (file)
index 0000000..be49499
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#include <glib.h>
+
+#include "cpp-common/bt2/optional-borrowed-object.hpp"
+#include "cpp-common/bt2c/logging.hpp"
+#include "cpp-common/vendor/fmt/core.h"
+#include "cpp-common/vendor/fmt/format.h"
+
+#include "upstream-msg-iter.hpp"
+
+namespace bt2mux {
+
+UpstreamMsgIter::UpstreamMsgIter(bt2::MessageIterator::Shared msgIter, std::string portName,
+                                 const bt2c::Logger& parentLogger) :
+    _mMsgIter {std::move(msgIter)},
+    _mLogger {parentLogger, fmt::format("{}/[{}]", parentLogger.tag(), portName)},
+    _mPortName {std::move(portName)}
+{
+    BT_CPPLOGI("Created an upstream message iterator: this={}, port-name={}", fmt::ptr(this),
+               _mPortName);
+}
+
+namespace {
+
+/*
+ * Returns the clock snapshot of `msg`, possibly missing.
+ */
+bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> msgCs(const bt2::ConstMessage msg) noexcept
+{
+    switch (msg.type()) {
+    case bt2::MessageType::EVENT:
+        if (msg.asEvent().streamClassDefaultClockClass()) {
+            return msg.asEvent().defaultClockSnapshot();
+        }
+
+        break;
+    case bt2::MessageType::PACKET_BEGINNING:
+        if (msg.asPacketBeginning().packet().stream().cls().packetsHaveBeginningClockSnapshot()) {
+            return msg.asPacketBeginning().defaultClockSnapshot();
+        }
+
+        break;
+    case bt2::MessageType::PACKET_END:
+        if (msg.asPacketEnd().packet().stream().cls().packetsHaveEndClockSnapshot()) {
+            return msg.asPacketEnd().defaultClockSnapshot();
+        }
+
+        break;
+    case bt2::MessageType::DISCARDED_EVENTS:
+        if (msg.asDiscardedEvents().stream().cls().discardedEventsHaveDefaultClockSnapshots()) {
+            return msg.asDiscardedEvents().beginningDefaultClockSnapshot();
+        }
+
+        break;
+    case bt2::MessageType::DISCARDED_PACKETS:
+        if (msg.asDiscardedPackets().stream().cls().discardedPacketsHaveDefaultClockSnapshots()) {
+            return msg.asDiscardedPackets().beginningDefaultClockSnapshot();
+        }
+
+        break;
+    case bt2::MessageType::MESSAGE_ITERATOR_INACTIVITY:
+        return msg.asMessageIteratorInactivity().clockSnapshot();
+    case bt2::MessageType::STREAM_BEGINNING:
+        if (msg.asStreamBeginning().stream().cls().defaultClockClass()) {
+            return msg.asStreamBeginning().defaultClockSnapshot();
+        }
+
+        break;
+    case bt2::MessageType::STREAM_END:
+        if (msg.asStreamEnd().stream().cls().defaultClockClass()) {
+            return msg.asStreamEnd().defaultClockSnapshot();
+        }
+
+        break;
+    default:
+        bt_common_abort();
+    }
+
+    return {};
+}
+
+} /* namespace */
+
+UpstreamMsgIter::ReloadStatus UpstreamMsgIter::reload()
+{
+    BT_ASSERT_DBG(!_mDiscardRequired);
+
+    if (G_UNLIKELY(!_mMsgs.msgs)) {
+        /*
+         * This will either:
+         *
+         * 1. Set `_mMsgs.msgs` to new messages (we'll return
+         *    `ReloadStatus::MORE`).
+         *
+         * 2. Not set `_mMsgs.msgs` (ended, we'll return
+         *    `ReloadStatus::NO_MORE`).
+         *
+         * 3. Throw.
+         */
+        this->_tryGetNewMsgs();
+    }
+
+    if (G_UNLIKELY(!_mMsgs.msgs)) {
+        /* Still none: no more */
+        _mMsgTs.reset();
+        return ReloadStatus::NO_MORE;
+    } else {
+        if (const auto cs = msgCs(this->msg())) {
+            _mMsgTs = cs->nsFromOrigin();
+            BT_CPPLOGD("Cached the timestamp of the current message: this={}, ts={}",
+                       fmt::ptr(this), *_mMsgTs);
+        } else {
+            _mMsgTs.reset();
+            BT_CPPLOGD("Reset the timestamp of the current message: this={}", fmt::ptr(this));
+        }
+
+        _mDiscardRequired = true;
+        return ReloadStatus::MORE;
+    }
+}
+
+void UpstreamMsgIter::_tryGetNewMsgs()
+{
+    BT_ASSERT_DBG(_mMsgIter);
+    BT_CPPLOGD("Calling the \"next\" method of the upstream message iterator: this={}",
+               fmt::ptr(this));
+
+    /*
+     * Replace with next batch!
+     *
+     * This may throw, in which case we'll keep our current
+     * `_mMsgs.msgs` (set), still requiring to get new messages the next
+     * time the user calls reload().
+     */
+    _mMsgs.msgs = _mMsgIter->next();
+
+    if (!_mMsgs.msgs) {
+        /*
+         * Don't destroy `*_mMsgIter` here because the user may still
+         * call seekBeginning() afterwards.
+         */
+        BT_CPPLOGD("End of upstream message iterator: this={}", fmt::ptr(this));
+        return;
+    }
+
+    _mMsgs.index = 0;
+    BT_CPPLOGD("Got {1} messages from upstream: this={0}, count={1}", fmt::ptr(this),
+               _mMsgs.msgs->length());
+}
+
+bool UpstreamMsgIter::canSeekBeginning()
+{
+    return _mMsgIter->canSeekBeginning();
+}
+
+void UpstreamMsgIter::seekBeginning()
+{
+    _mMsgIter->seekBeginning();
+    _mMsgs.msgs.reset();
+    _mMsgTs.reset();
+    _mDiscardRequired = false;
+}
+
+bool UpstreamMsgIter::canSeekForward() const noexcept
+{
+    return _mMsgIter->canSeekForward();
+}
+
+} /* namespace bt2mux */
diff --git a/src/plugins/utils/muxer/upstream-msg-iter.hpp b/src/plugins/utils/muxer/upstream-msg-iter.hpp
new file mode 100644 (file)
index 0000000..2ae663f
--- /dev/null
@@ -0,0 +1,188 @@
+
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#ifndef BABELTRACE_PLUGINS_UTILS_MUXER_UPSTREAM_MSG_ITER_HPP
+#define BABELTRACE_PLUGINS_UTILS_MUXER_UPSTREAM_MSG_ITER_HPP
+
+#include <memory>
+
+#include "common/assert.h"
+#include "cpp-common/bt2/message-array.hpp"
+#include "cpp-common/bt2/message-iterator.hpp"
+#include "cpp-common/bt2c/logging.hpp"
+#include "cpp-common/bt2s/optional.hpp"
+
+namespace bt2mux {
+
+/*
+ * An instance of this wraps an upstream libbabeltrace2 message
+ * iterator, keeping an internal array of receives messages, and making
+ * the oldest one available (msg() method).
+ */
+class UpstreamMsgIter final
+{
+public:
+    /* Unique pointer to upstream message iterator */
+    using UP = std::unique_ptr<UpstreamMsgIter>;
+
+    /* Return type of reload() */
+    enum class ReloadStatus
+    {
+        MORE,
+        NO_MORE,
+    };
+
+    /*
+     * Builds an upstream message iterator wrapper using the
+     * libbabeltrace2 message iterator `msgIter`.
+     *
+     * This constructor doesn't immediately gets the next messages from
+     * `*msgIter` (you always need to call reload() before you call
+     * msg()), therefore it won't throw `bt2::Error` or `bt2::TryAgain`.
+     */
+    explicit UpstreamMsgIter(bt2::MessageIterator::Shared msgIter, std::string portName,
+                             const bt2c::Logger& parentLogger);
+
+    /* Some protection */
+    UpstreamMsgIter(const UpstreamMsgIter&) = delete;
+    UpstreamMsgIter& operator=(const UpstreamMsgIter&) = delete;
+
+    /*
+     * Current message.
+     *
+     * Before you call this method:
+     *
+     * 1. If needed, you must call discard().
+     *
+     *    This is not the case immediately after construction and
+     *    immediately after seeking.
+     *
+     * 2. You must call reload() successfully (not ended).
+     *
+     *    This is always the case.
+     *
+     *    This makes it possible to build an `UpstreamMsgIter` instance
+     *    without libbabeltrace2 message iterator exceptions.
+     */
+    bt2::ConstMessage msg() const noexcept
+    {
+        BT_ASSERT_DBG(_mMsgs.msgs && _mMsgs.index < _mMsgs.msgs->length());
+        return (*_mMsgs.msgs)[_mMsgs.index];
+    }
+
+    /*
+     * Timestamp, if any, of the current message.
+     *
+     * It must be valid to call msg() when you call this method.
+     */
+    const bt2s::optional<std::int64_t> msgTs() const noexcept
+    {
+        return _mMsgTs;
+    }
+
+    /*
+     * Discards the current message, making this upstream message
+     * iterator ready for a reload (reload()).
+     *
+     * You may only call reload() or seekBeginning() after having called
+     * this.
+     */
+    void discard() noexcept
+    {
+        BT_ASSERT_DBG(_mMsgs.msgs && _mMsgs.index < _mMsgs.msgs->length());
+        BT_ASSERT_DBG(_mDiscardRequired);
+        _mDiscardRequired = false;
+        ++_mMsgs.index;
+
+        if (_mMsgs.index == _mMsgs.msgs->length()) {
+            _mMsgs.msgs.reset();
+        }
+    }
+
+    /*
+     * Retrieves the next message, making it available afterwards
+     * through the msg() method.
+     *
+     * You must have called discard() to discard the current message, if
+     * any, before you call this method.
+     *
+     * This method may throw anything bt2::MessageIterator::next() may
+     * throw.
+     *
+     * If this method returns `ReloadStatus::NO_MORE`, then the
+     * underlying libbabeltrace2 message iterator is ended, meaning you
+     * may not call msg(), msgTs(), or reload() again for this message
+     * iterator until you successfully call seekBeginning().
+     */
+    ReloadStatus reload();
+
+    /*
+     * Forwards to bt2::MessageIterator::canSeekBeginning().
+     */
+    bool canSeekBeginning();
+
+    /*
+     * Forwards to bt2::MessageIterator::seekBeginning().
+     *
+     * On success, you may call reload() afterwards. With any exception,
+     * you must call this method again, successfully, before you may
+     * call reload().
+     */
+    void seekBeginning();
+
+    /*
+     * Forwards to bt2::MessageIterator::canSeekForward().
+     */
+    bool canSeekForward() const noexcept;
+
+    /*
+     * Name of the input port on which the libbabeltrace2 message
+     * iterator was created.
+     */
+    const std::string& portName() const noexcept
+    {
+        return _mPortName;
+    }
+
+private:
+    /*
+     * Tries to get new messages into `_mMsgs.msgs`.
+     */
+    void _tryGetNewMsgs();
+
+    /* Actual upstream message iterator */
+    bt2::MessageIterator::Shared _mMsgIter;
+
+    /*
+     * Currently contained messages.
+     *
+     * `index` is the index of the current message (msg()/msgTs())
+     * within `msgs`.
+     */
+    struct
+    {
+        bt2s::optional<bt2::ConstMessageArray> msgs;
+        std::size_t index;
+    } _mMsgs;
+
+    /* Timestamp of the current message, if any */
+    bt2s::optional<std::int64_t> _mMsgTs;
+
+    /*
+     * Only relevant in debug mode: true if a call to discard() is
+     * required before calling reload().
+     */
+    bool _mDiscardRequired = false;
+
+    bt2c::Logger _mLogger;
+    std::string _mPortName;
+};
+
+} /* namespace bt2mux */
+
+#endif /* BABELTRACE_PLUGINS_UTILS_MUXER_UPSTREAM_MSG_ITER_HPP */
diff --git a/src/plugins/utils/plugin.c b/src/plugins/utils/plugin.c
deleted file mode 100644 (file)
index 536224d..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * SPDX-License-Identifier: MIT
- *
- * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
- */
-
-#include <babeltrace2/babeltrace.h>
-#include "dummy/dummy.h"
-#include "counter/counter.h"
-#include "muxer/muxer.h"
-#include "trimmer/trimmer.h"
-
-#ifndef BT_BUILT_IN_PLUGINS
-BT_PLUGIN_MODULE();
-#endif
-
-BT_PLUGIN(utils);
-BT_PLUGIN_DESCRIPTION("Common graph utilities");
-BT_PLUGIN_AUTHOR("EfficiOS <https://www.efficios.com/>");
-BT_PLUGIN_LICENSE("MIT");
-
-/* sink.utils.dummy */
-BT_PLUGIN_SINK_COMPONENT_CLASS(dummy, dummy_consume);
-BT_PLUGIN_SINK_COMPONENT_CLASS_INITIALIZE_METHOD(dummy, dummy_init);
-BT_PLUGIN_SINK_COMPONENT_CLASS_FINALIZE_METHOD(dummy, dummy_finalize);
-BT_PLUGIN_SINK_COMPONENT_CLASS_GRAPH_IS_CONFIGURED_METHOD(dummy,
-       dummy_graph_is_configured);
-BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(dummy,
-       "Consume messages and discard them.");
-BT_PLUGIN_SINK_COMPONENT_CLASS_HELP(dummy,
-       "See the babeltrace2-sink.utils.dummy(7) manual page.");
-
-/* sink.utils.counter */
-BT_PLUGIN_SINK_COMPONENT_CLASS(counter, counter_consume);
-BT_PLUGIN_SINK_COMPONENT_CLASS_INITIALIZE_METHOD(counter, counter_init);
-BT_PLUGIN_SINK_COMPONENT_CLASS_FINALIZE_METHOD(counter, counter_finalize);
-BT_PLUGIN_SINK_COMPONENT_CLASS_GRAPH_IS_CONFIGURED_METHOD(counter,
-       counter_graph_is_configured);
-BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(counter,
-       "Count messages and print the statistics.");
-BT_PLUGIN_SINK_COMPONENT_CLASS_HELP(counter,
-       "See the babeltrace2-sink.utils.counter(7) manual page.");
-
-/* flt.utils.trimmer */
-BT_PLUGIN_FILTER_COMPONENT_CLASS(trimmer, trimmer_msg_iter_next);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_DESCRIPTION(trimmer,
-       "Discard messages that occur outside a specific time range.");
-BT_PLUGIN_FILTER_COMPONENT_CLASS_HELP(trimmer,
-       "See the babeltrace2-filter.utils.trimmer(7) manual page.");
-BT_PLUGIN_FILTER_COMPONENT_CLASS_INITIALIZE_METHOD(trimmer, trimmer_init);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_FINALIZE_METHOD(trimmer, trimmer_finalize);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD(trimmer,
-       trimmer_msg_iter_init);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_FINALIZE_METHOD(trimmer,
-       trimmer_msg_iter_finalize);
-
-/* flt.utils.muxer */
-BT_PLUGIN_FILTER_COMPONENT_CLASS(muxer, muxer_msg_iter_next);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_DESCRIPTION(muxer,
-       "Sort messages from multiple input ports to a single output port by time.");
-BT_PLUGIN_FILTER_COMPONENT_CLASS_HELP(muxer,
-       "See the babeltrace2-filter.utils.muxer(7) manual page.");
-BT_PLUGIN_FILTER_COMPONENT_CLASS_INITIALIZE_METHOD(muxer, muxer_init);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_FINALIZE_METHOD(muxer, muxer_finalize);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_INPUT_PORT_CONNECTED_METHOD(muxer,
-       muxer_input_port_connected);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD(muxer,
-       muxer_msg_iter_init);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_FINALIZE_METHOD(muxer,
-       muxer_msg_iter_finalize);
-BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHODS(muxer,
-       muxer_msg_iter_seek_beginning, muxer_msg_iter_can_seek_beginning);
diff --git a/src/plugins/utils/plugin.cpp b/src/plugins/utils/plugin.cpp
new file mode 100644 (file)
index 0000000..acb7801
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#include <babeltrace2/babeltrace.h>
+
+#include "cpp-common/bt2/plugin-dev.hpp"
+
+#include "counter/counter.h"
+#include "dummy/dummy.h"
+#include "muxer/comp.hpp"
+#include "muxer/msg-iter.hpp"
+#include "trimmer/trimmer.h"
+
+#ifndef BT_BUILT_IN_PLUGINS
+BT_PLUGIN_MODULE();
+#endif
+
+BT_PLUGIN(utils);
+BT_PLUGIN_DESCRIPTION("Common graph utilities");
+BT_PLUGIN_AUTHOR("EfficiOS <https://www.efficios.com/>");
+BT_PLUGIN_LICENSE("MIT");
+
+/* sink.utils.dummy */
+BT_PLUGIN_SINK_COMPONENT_CLASS(dummy, dummy_consume);
+BT_PLUGIN_SINK_COMPONENT_CLASS_INITIALIZE_METHOD(dummy, dummy_init);
+BT_PLUGIN_SINK_COMPONENT_CLASS_FINALIZE_METHOD(dummy, dummy_finalize);
+BT_PLUGIN_SINK_COMPONENT_CLASS_GRAPH_IS_CONFIGURED_METHOD(dummy, dummy_graph_is_configured);
+BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(dummy, "Consume messages and discard them.");
+BT_PLUGIN_SINK_COMPONENT_CLASS_HELP(dummy, "See the babeltrace2-sink.utils.dummy(7) manual page.");
+
+/* sink.utils.counter */
+BT_PLUGIN_SINK_COMPONENT_CLASS(counter, counter_consume);
+BT_PLUGIN_SINK_COMPONENT_CLASS_INITIALIZE_METHOD(counter, counter_init);
+BT_PLUGIN_SINK_COMPONENT_CLASS_FINALIZE_METHOD(counter, counter_finalize);
+BT_PLUGIN_SINK_COMPONENT_CLASS_GRAPH_IS_CONFIGURED_METHOD(counter, counter_graph_is_configured);
+BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(counter, "Count messages and print the statistics.");
+BT_PLUGIN_SINK_COMPONENT_CLASS_HELP(counter,
+                                    "See the babeltrace2-sink.utils.counter(7) manual page.");
+
+/* flt.utils.trimmer */
+BT_PLUGIN_FILTER_COMPONENT_CLASS(trimmer, trimmer_msg_iter_next);
+BT_PLUGIN_FILTER_COMPONENT_CLASS_DESCRIPTION(
+    trimmer, "Discard messages that occur outside a specific time range.");
+BT_PLUGIN_FILTER_COMPONENT_CLASS_HELP(trimmer,
+                                      "See the babeltrace2-filter.utils.trimmer(7) manual page.");
+BT_PLUGIN_FILTER_COMPONENT_CLASS_INITIALIZE_METHOD(trimmer, trimmer_init);
+BT_PLUGIN_FILTER_COMPONENT_CLASS_FINALIZE_METHOD(trimmer, trimmer_finalize);
+BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD(trimmer,
+                                                                          trimmer_msg_iter_init);
+BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_FINALIZE_METHOD(trimmer,
+                                                                        trimmer_msg_iter_finalize);
+
+/* flt.utils.muxer */
+BT_CPP_PLUGIN_FILTER_COMPONENT_CLASS(muxer, bt2mux::Comp, bt2mux::MsgIter);
+BT_PLUGIN_FILTER_COMPONENT_CLASS_DESCRIPTION(
+    muxer, "Sort messages from multiple input ports to a single output port by time.");
+BT_PLUGIN_FILTER_COMPONENT_CLASS_HELP(muxer,
+                                      "See the babeltrace2-filter.utils.muxer(7) manual page.");
index b6146b3ac9aca3b37350eb0cd053d37e2fb89bdd..0cb997a430e047d06facee3102dd4a927c7b414d 100644 (file)
 #include "common/macros.h"
 #include <babeltrace2/babeltrace.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 void trimmer_finalize(bt_self_component_filter *self_comp);
 
 bt_component_class_initialize_method_status trimmer_init(
@@ -31,4 +35,8 @@ bt_message_iterator_class_next_method_status trimmer_msg_iter_next(
 
 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* BABELTRACE_PLUGINS_UTILS_TRIMMER_H */
This page took 0.059478 seconds and 4 git commands to generate.