flt.utils.muxer: use a heap to sort iterators (also: rewrite in C++)
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
diff --git a/src/plugins/utils/muxer/msg-iter.cpp b/src/plugins/utils/muxer/msg-iter.cpp
new file mode 100644 (file)
index 0000000..526be00
--- /dev/null
@@ -0,0 +1,499 @@
+/*
+ * SPDX-License-Identifier: MIT
+ *
+ * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
+ */
+
+#include <algorithm>
+
+#include <glib.h>
+
+#include <babeltrace2/babeltrace.h>
+
+#include "common/common.h"
+#include "cpp-common/bt2c/call.hpp"
+#include "cpp-common/bt2s/make-unique.hpp"
+#include "cpp-common/vendor/fmt/format.h"
+
+#include "plugins/common/muxing/muxing.h"
+
+#include "comp.hpp"
+#include "msg-iter.hpp"
+
+namespace bt2mux {
+
+MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
+                 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
+    bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
+    _mHeap {_HeapComparator {_mLogger}}
+{
+    /*
+     * Create one upstream message iterator for each connected
+     * input port.
+     */
+    auto canSeekForward = true;
+
+    for (const auto inputPort : this->_component()._inputPorts()) {
+        if (!inputPort.isConnected()) {
+            BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
+            continue;
+        }
+
+        /*
+         * Create new upstream message iterator and immediately make it
+         * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
+         * deal with it when downstream calls next()).
+         */
+        auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
+            this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
+
+        canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
+        _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
+        _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
+    }
+
+    /* Set the "can seek forward" configuration */
+    cfg.canSeekForward(canSeekForward);
+}
+
+namespace {
+
+std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
+{
+    if (ts) {
+        return fmt::to_string(*ts);
+    }
+
+    return "none";
+}
+
+} /* namespace */
+
+void MsgIter::_next(bt2::ConstMessageArray& msgs)
+{
+    /* Make sure all upstream message iterators are part of the heap */
+    this->_ensureFullHeap();
+
+    while (msgs.length() < msgs.capacity()) {
+        /* Empty heap? */
+        if (G_UNLIKELY(_mHeap.isEmpty())) {
+            /* No more upstream messages! */
+            return;
+        }
+
+        /*
+         * Retrieve the upstream message iterator having the oldest message.
+         */
+        auto& oldestUpstreamMsgIter = *_mHeap.top();
+
+        /* Validate the clock class of the oldest message */
+        this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
+
+        /* Append the oldest message and discard it */
+        msgs.append(oldestUpstreamMsgIter.msg().shared());
+
+        if (_mLogger.wouldLogD()) {
+            BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
+                       oldestUpstreamMsgIter.portName(),
+                       optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
+        }
+
+        oldestUpstreamMsgIter.discard();
+
+        /*
+         * Immediately try to reload `oldestUpstreamMsgIter`.
+         *
+         * The possible outcomes are:
+         *
+         * There's an available message:
+         *     Call `_mHeap.replaceTop()` to bring
+         *     `oldestUpstreamMsgIter` back to the heap, performing a
+         *     single heap rebalance.
+         *
+         * There isn't an available message (ended):
+         *     Remove `oldestUpstreamMsgIter` from the heap.
+         *
+         * `bt2::TryAgain` is thrown:
+         *     Remove `oldestUpstreamMsgIter` from the heap.
+         *
+         *     Add `oldestUpstreamMsgIter` to the set of upstream
+         *     message iterators to reload. The next call to _next()
+         *     will move it to the heap again (if not ended) after
+         *     having successfully called reload().
+         */
+        BT_CPPLOGD(
+            "Trying to reload upstream message iterator having the oldest message: port-name={}",
+            oldestUpstreamMsgIter.portName());
+
+        try {
+            if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
+                /* New current message: update heap */
+                _mHeap.replaceTop(&oldestUpstreamMsgIter);
+                BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
+                           oldestUpstreamMsgIter.portName(), _mHeap.len());
+            } else {
+                _mHeap.removeTop();
+                BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
+                           "port-name{}, heap-len={}",
+                           oldestUpstreamMsgIter.portName(), _mHeap.len());
+            }
+        } catch (const bt2::TryAgain&) {
+            _mHeap.removeTop();
+            _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
+            BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
+                       "port-name={}, heap-len={}, to-reload-len={}",
+                       oldestUpstreamMsgIter.portName(), _mHeap.len(),
+                       _mUpstreamMsgItersToReload.size());
+            throw;
+        }
+    }
+}
+
+void MsgIter::_ensureFullHeap()
+{
+    /*
+     * Always remove from `_mUpstreamMsgItersToReload` when reload()
+     * doesn't throw.
+     *
+     * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
+     * then we don't need it anymore (remains alive in
+     * `_mUpstreamMsgIters`).
+     */
+    for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
+         it = _mUpstreamMsgItersToReload.erase(it)) {
+        auto& upstreamMsgIter = **it;
+
+        BT_CPPLOGD("Handling upstream message iterator to reload: "
+                   "port-name={}, heap-len={}, to-reload-len={}",
+                   upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
+
+        if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
+            /* New current message: move to heap */
+            _mHeap.insert(&upstreamMsgIter);
+            BT_CPPLOGD("More messages available; "
+                       "inserted upstream message iterator into heap from \"to reload\" set: "
+                       "port-name={}, heap-len={}",
+                       upstreamMsgIter.portName(), _mHeap.len());
+        } else {
+            BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
+                       "port-name={}",
+                       upstreamMsgIter.portName());
+        }
+    }
+}
+
+bool MsgIter::_canSeekBeginning()
+{
+    /*
+     * We can only seek our beginning if all our upstream message
+     * iterators also can.
+     */
+    return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
+                       [](UpstreamMsgIter::UP& upstreamMsgIter) {
+                           return upstreamMsgIter->canSeekBeginning();
+                       });
+}
+
+void MsgIter::_seekBeginning()
+{
+    /*
+     * The current approach is that this operation is either successful
+     * (all upstream message iterators seek) or not. If it's not, then
+     * we don't keep any state that some sought and some didn't: we'll
+     * restart the whole process when the user tries to seek again.
+     *
+     * The first step is to clear all the containers of upstream message
+     * iterator pointers so that we can process what's in
+     * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
+     * if any seeking fails below, the downstream user is required to
+     * try the "seek beginning" operation again and only call
+     * bt_message_iterator_next() if it was successful.
+     *
+     * This means if the first four upstream message iterators seek, and
+     * then the fifth one throws `bt2::TryAgain`, then the next time
+     * this method executes, the first four upstream message iterators
+     * will seek again. That being said, it's such an unlikely scenario
+     * that the simplicity outweighs performance concerns here.
+     */
+    _mHeap.clear();
+    _mUpstreamMsgItersToReload.clear();
+
+    /* Also reset clock class expectation */
+    _mClkClsExpectation = _ClkClsExpectation::ANY;
+    _mExpectedClkClsUuid.reset();
+
+    /* Make each upstream message iterator seek */
+    for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
+        /* This may throw! */
+        upstreamMsgIter->seekBeginning();
+    }
+
+    /*
+     * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
+     * next call to _next() will deal with those.
+     */
+    for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
+        _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
+    }
+}
+
+namespace {
+
+const char *msgTypeStr(const bt2::ConstMessage msg) noexcept
+{
+    return bt_common_message_type_string(static_cast<bt_message_type>(msg.type()));
+}
+
+std::string optLogStr(const char * const str) noexcept
+{
+    return str ? fmt::format("\"{}\"", str) : "(none)";
+}
+
+} /* namespace */
+
+void MsgIter::_setClkClsExpectation(
+    const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept
+{
+    BT_ASSERT_DBG(_mClkClsExpectation == _ClkClsExpectation::ANY);
+
+    /* No initial clock class: also expect none afterwards */
+    if (!clkCls) {
+        _mClkClsExpectation = _ClkClsExpectation::NONE;
+        return;
+    }
+
+    /*
+     * This is the first clock class that this message iterator
+     * encounters. Its properties determine what to expect for the whole
+     * lifetime of the iterator.
+     */
+    if (clkCls->originIsUnixEpoch()) {
+        /* Expect clock classes having a Unix epoch origin*/
+        _mClkClsExpectation = _ClkClsExpectation::ORIG_IS_UNIX_EPOCH;
+    } else {
+        if (clkCls->uuid()) {
+            /*
+             * Expect clock classes not having a Unix epoch origin and
+             * with a specific UUID.
+             */
+            _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID;
+            _mExpectedClkClsUuid = *clkCls->uuid();
+        } else {
+            /*
+             * Expect clock classes not having a Unix epoch origin and
+             * without a UUID.
+             */
+            _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID;
+        }
+    }
+}
+
+void MsgIter::_makeSureClkClsIsExpected(
+    const bt2::ConstMessage msg,
+    const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const
+{
+    BT_ASSERT_DBG(_mClkClsExpectation != _ClkClsExpectation::ANY);
+
+    if (!clkCls) {
+        if (_mClkClsExpectation != _ClkClsExpectation::NONE) {
+            /*
+             * `msg` is a stream beginning message because a message
+             * iterator inactivity message always has a clock class.
+             */
+            const auto streamCls = msg.asStreamBeginning().stream().cls();
+
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                              "Expecting a clock class, but got none: "
+                                              "stream-class-addr={}, stream-class-name=\"{}\", "
+                                              "stream-class-id={}",
+                                              static_cast<const void *>(streamCls.libObjPtr()),
+                                              optLogStr(streamCls.name()), streamCls.id());
+        }
+
+        return;
+    }
+
+    const auto clkClsAddr = static_cast<const void *>(clkCls->libObjPtr());
+
+    switch (_mClkClsExpectation) {
+    case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH:
+        if (!clkCls->originIsUnixEpoch()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                              "Expecting a clock class having a Unix epoch origin, "
+                                              "but got one not having a Unix epoch origin: "
+                                              "clock-class-addr={}, clock-class-name={}",
+                                              clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        break;
+    case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID:
+        BT_ASSERT_DBG(!_mExpectedClkClsUuid);
+
+        if (clkCls->originIsUnixEpoch()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class not having a Unix epoch origin, "
+                "but got one having a Unix epoch origin: "
+                "clock-class-addr={}, clock-class-name={}",
+                clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        if (clkCls->uuid()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class without a UUID, but got one with a UUID: "
+                "clock-class-addr={}, clock-class-name={}, uuid={}",
+                clkClsAddr, optLogStr(clkCls->name()), clkCls->uuid()->str());
+        }
+
+        break;
+    case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID:
+        BT_ASSERT_DBG(_mExpectedClkClsUuid);
+
+        if (clkCls->originIsUnixEpoch()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class not having a Unix epoch origin, "
+                "but got one having a Unix epoch origin: "
+                "clock-class-addr={}, clock-class-name={}",
+                clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        if (!clkCls->uuid()) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+                bt2::Error,
+                "Expecting a clock class with a UUID, but got one without a UUID: "
+                "clock-class-addr={}, clock-class-name={}",
+                clkClsAddr, optLogStr(clkCls->name()));
+        }
+
+        if (*clkCls->uuid() != bt2c::UuidView {*_mExpectedClkClsUuid}) {
+            BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                              "Expecting a clock class with a specific UUID, "
+                                              "but got one with a different UUID: "
+                                              "clock-class-addr={}, clock-class-name={}, "
+                                              "expected-uuid=\"{}\", uuid=\"{}\"",
+                                              clkClsAddr, optLogStr(clkCls->name()),
+                                              _mExpectedClkClsUuid->str(), clkCls->uuid()->str());
+        }
+
+        break;
+    case _ClkClsExpectation::NONE:
+        BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
+                                          "Expecting no clock class, but got one: "
+                                          "clock-class-addr={}, clock-class-name={}",
+                                          clkClsAddr, optLogStr(clkCls->name()));
+        break;
+    default:
+        bt_common_abort();
+    }
+}
+
+void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
+{
+    if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
+        /*
+         * We don't care about the other types: all the messages related
+         * to a given stream shared the same default clock class, if
+         * any.
+         */
+        return;
+    }
+
+    BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msgTypeStr(msg));
+
+    /* Get the clock class, if any, of `msg` */
+    const auto clkCls = bt2c::call([msg]() -> bt2::OptionalBorrowedObject<bt2::ConstClockClass> {
+        if (msg.isStreamBeginning()) {
+            return msg.asStreamBeginning().stream().cls().defaultClockClass();
+        } else {
+            BT_ASSERT(msg.isMessageIteratorInactivity());
+            return msg.asMessageIteratorInactivity().clockSnapshot().clockClass();
+        }
+    });
+
+    /* Set the expectation or check it */
+    if (_mClkClsExpectation == _ClkClsExpectation::ANY) {
+        /* First message: set the expectation */
+        this->_setClkClsExpectation(clkCls);
+    } else {
+        /* Make sure clock class is expected */
+        this->_makeSureClkClsIsExpected(msg, clkCls);
+    }
+}
+
+MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
+{
+}
+
+bool MsgIter::_HeapComparator::operator()(
+    const UpstreamMsgIter * const upstreamMsgIterA,
+    const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
+{
+    /* The two messages to compare */
+    const auto msgA = upstreamMsgIterA->msg();
+    const auto msgB = upstreamMsgIterB->msg();
+    auto& msgTsA = upstreamMsgIterA->msgTs();
+    auto& msgTsB = upstreamMsgIterB->msgTs();
+
+    if (_mLogger.wouldLogT()) {
+        BT_CPPLOGT("Comparing two messages: "
+                   "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
+                   "port-name-b={}, msg-b-type={}, msg-b-ts={}",
+                   upstreamMsgIterA->portName(), msgTypeStr(msgA), optMsgTsStr(msgTsA),
+                   upstreamMsgIterB->portName(), msgTypeStr(msgB), optMsgTsStr(msgTsB));
+    }
+
+    /*
+     * Try to compare using timestamps.
+     *
+     * If both timestamps are set and their values are different, then
+     * use this to establish the ordering of the two messages.
+     *
+     * If one timestamp is set, but not the other, the latter always
+     * wins. This is because, for a given upstream message iterator, we
+     * need to consume all the messages having no timestamp so that we
+     * can reach a message with a timestamp to compare it.
+     *
+     * Otherwise, we'll fall back to using
+     * common_muxing_compare_messages().
+     */
+    if (G_LIKELY(msgTsA && msgTsB)) {
+        if (*msgTsA < *msgTsB) {
+            /*
+             * Return `true` because `_mHeap.top()` provides the
+             * "greatest" element. For us, the "greatest" message is
+             * the oldest one, that is, the one having the smallest
+             * timestamp.
+             */
+            BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
+            return true;
+        } else if (*msgTsA > *msgTsB) {
+            BT_CPPLOGT_STR(
+                "Timestamp of message A is greater than timestamp of message B: oldest=B");
+            return false;
+        }
+    } else if (msgTsA && !msgTsB) {
+        BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
+        return false;
+    } else if (!msgTsA && msgTsB) {
+        BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
+        return true;
+    }
+
+    /*
+     * Comparison failed using timestamps: determine an ordering using
+     * arbitrary properties, but in a deterministic way.
+     *
+     * common_muxing_compare_messages() returns less than 0 if the first
+     * message is considered older than the second, which corresponds to
+     * this comparator returning `true`.
+     */
+    const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
+
+    BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
+               res ? "A" : "B");
+    return res;
+}
+
+} /* namespace bt2mux */
This page took 0.028564 seconds and 4 git commands to generate.