flt.utils.muxer: use a heap to sort iterators (also: rewrite in C++)
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Mon, 6 Nov 2023 21:16:54 +0000 (16:16 -0500)
committerSimon Marchi <simon.marchi@efficios.com>
Thu, 14 Dec 2023 15:57:04 +0000 (10:57 -0500)
commitfca1d0f55bf24741f07d2a86d70ad7274d3ec851
tree8722efcff93ceb738b2362aa6ad543248ccfcf31
parente7f0f07bea73c3fdf14bbc919bfd44bff3eb85e1
flt.utils.muxer: use a heap to sort iterators (also: rewrite in C++)

The main goal of this patch was to use a heap instead of an array to
sort upstream message iterators so as to improve the performance of a
`flt.utils.muxer` message iterator managing a lot of usptream message
iterators, especially with a lot of connected input ports.

To do this, I first decided to use `std::priority_queue`, and since I'm
not a fan of mixed C and C++, I also decided to take the opportunity to
convert the whole component class C source to C++. This looked like a
good excuse to prove that it's possible, using only common C++ stuff,
including libbabeltrace2 bindings (`bt2` namespace).

Therefore, I'm glad to announce that `flt.utils.muxer` is the first
upstream C++-only component class! ðŸ¥³

In the end, because it's more efficient, I decided to write a C++
version of what used to be the `bt_heap_` C API, `bt2c::PrioHeap`, and
to use this instead of `std::priority_queue`. ðŸ¤·

This new version has more or less the same logic as the previous one. It
has three classes under the `bt2mux` namespace:

`Comp`:
    The filter component class which doesn't do much.

    Its main purpose is to add an available input port when one is
    connected.

`UpstreamMsgIter`:
    Wraps a `bt2::MessageIterator`, keeping an internal array of
    messages (what bt2::MessageIterator::next() returns), and offering
    the msg() (current message), msgTs() (cached timestamp, if any, of
    current message), discard(), and reload() methods to deal with a
    single message at a time.

    You must call always reload() before calling msg()/msgTs(), and if
    there's current message, you must call discard() before you call
    reload(). Making the caller responsible of this logic removes
    redundant conditions in this class. You may view the
    msg() + discard() pair as a form of message popping. The sequence is
    usually:

    1. We know there's a current message because the last call to
       reload() was successful.

    2. Get the current message with msg(), and then call discard() to
       discard/remove it.

    3. Try to call reload().

       This may return `UpstreamMsgIter::ReloadStatus::NO_MORE`; if it
       does, stop using this upstream message iterator.

       This may also throw `bt2::TryAgain`; if it does, call reload()
       again later.

    Caching the timestamp of the current message is an improvement
    because the heap rebalancing operation could need to use them
    several times when comparing two upstream message iterators.

    This class also offers canSeekBeginning(), seekBeginning(), and
    canSeekForward() which more or less wrap the corresponding
    `bt2::MessageIterator` methods.

`MsgIter`:
    The message iterator class.

    This is where most of the changes are.

    It keeps a `bt2c::PrioHeap` of `UpstreamMsgIter` pointers to sort
    them. The comparator works like before, so that the output message
    order of this version for a given set of upstream message iterators
    should be the same as the previous version.

    Like before, it has an array of pointers to upstream message
    iterators to reload. An upstream message iterator is either part of
    the heap, or part of this array. We need to call reload() on each of
    the upstream message iterators in this array. This is either
    because:

    * We put them here on `MsgIter` construction to avoid getting any
      exception from bt2::MessageIterator::next().

    * The last attempt to call reload() on them threw `bt2::TryAgain`.

    Before MsgIter::_next() selects a message to add to the message
    array, all non-ended upstream message iterators need to be part of
    the heap.

    The logic of clock class expectation is unchanged.

In general, there are more detailed comments, and logging is improved,
in particular the trace level statements of the heap comparator:

    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:501 Comparing two messages: port-name-a=in0, msg-a-type=PACKET_BEGINNING, msg-a-ts=1441852773113941931, port-name-b=in1, msg-b-type=STREAM_BEGINNING, msg-b-ts=none
    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:538 Message A has a timestamp, but message B has none: oldest=B
    ...
    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:501 Comparing two messages: port-name-a=in2, msg-a-type=PACKET_END, msg-a-ts=1441852847496965372, port-name-b=in1, msg-b-type=PACKET_END, msg-b-ts=1441852847496953796
    [...] T PLUGIN/FLT.UTILS.MUXER/[muxer]/MSG-ITER operator()@plugins/utils/muxer/msg-iter.cpp:533 Timestamp of message A is greater than timestamp of message B: oldest=B

My preliminary experiments show that this version is slightly (a few
percentage points) more efficient than the previous one for a typical
four-stream scenario, and then around 12-13 times more efficient for a
208-stream scenario.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Change-Id: I2e393a2ef1c33c6b134284028807c687f8adf0e8
Reviewed-on: https://review.lttng.org/c/babeltrace/+/11273
Reviewed-by: Simon Marchi <simon.marchi@efficios.com>
CI-Build: Simon Marchi <simon.marchi@efficios.com>
Tested-by: jenkins <jenkins@lttng.org>
15 files changed:
src/Makefile.am
src/cli/Makefile.am
src/plugins/utils/counter/counter.h
src/plugins/utils/dummy/dummy.h
src/plugins/utils/muxer/comp.cpp [new file with mode: 0644]
src/plugins/utils/muxer/comp.hpp [new file with mode: 0644]
src/plugins/utils/muxer/msg-iter.cpp [new file with mode: 0644]
src/plugins/utils/muxer/msg-iter.hpp [new file with mode: 0644]
src/plugins/utils/muxer/muxer.c [deleted file]
src/plugins/utils/muxer/muxer.h [deleted file]
src/plugins/utils/muxer/upstream-msg-iter.cpp [new file with mode: 0644]
src/plugins/utils/muxer/upstream-msg-iter.hpp [new file with mode: 0644]
src/plugins/utils/plugin.c [deleted file]
src/plugins/utils/plugin.cpp [new file with mode: 0644]
src/plugins/utils/trimmer/trimmer.h
This page took 0.02598 seconds and 4 git commands to generate.