2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #include "common/common.h"
14 #include "cpp-common/bt2c/call.hpp"
15 #include "cpp-common/bt2s/make-unique.hpp"
16 #include "cpp-common/vendor/fmt/format.h"
18 #include "plugins/common/muxing/muxing.h"
21 #include "msg-iter.hpp"
25 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter
,
26 const bt2::SelfMessageIteratorConfiguration cfg
, bt2::SelfComponentOutputPort
) :
27 bt2::UserMessageIterator
<MsgIter
, Comp
> {selfMsgIter
, "MSG-ITER"},
28 _mHeap
{_HeapComparator
{_mLogger
}}
31 * Create one upstream message iterator for each connected
34 auto canSeekForward
= true;
36 for (const auto inputPort
: this->_component()._inputPorts()) {
37 if (!inputPort
.isConnected()) {
38 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort
.name());
43 * Create new upstream message iterator and immediately make it
44 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
45 * deal with it when downstream calls next()).
47 auto upstreamMsgIter
= bt2s::make_unique
<UpstreamMsgIter
>(
48 this->_createMessageIterator(inputPort
), inputPort
.name(), _mLogger
);
50 canSeekForward
= canSeekForward
&& upstreamMsgIter
->canSeekForward();
51 _mUpstreamMsgItersToReload
.emplace_back(upstreamMsgIter
.get());
52 _mUpstreamMsgIters
.push_back(std::move(upstreamMsgIter
));
55 /* Set the "can seek forward" configuration */
56 cfg
.canSeekForward(canSeekForward
);
61 std::string
optMsgTsStr(const bt2s::optional
<std::int64_t>& ts
)
64 return fmt::to_string(*ts
);
72 void MsgIter::_next(bt2::ConstMessageArray
& msgs
)
74 /* Make sure all upstream message iterators are part of the heap */
75 this->_ensureFullHeap();
77 while (msgs
.length() < msgs
.capacity()) {
79 if (G_UNLIKELY(_mHeap
.isEmpty())) {
80 /* No more upstream messages! */
85 * Retrieve the upstream message iterator having the oldest message.
87 auto& oldestUpstreamMsgIter
= *_mHeap
.top();
89 /* Validate the clock class of the oldest message */
90 this->_validateMsgClkCls(oldestUpstreamMsgIter
.msg());
92 /* Append the oldest message and discard it */
93 msgs
.append(oldestUpstreamMsgIter
.msg().shared());
95 if (_mLogger
.wouldLogD()) {
96 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
97 oldestUpstreamMsgIter
.portName(),
98 optMsgTsStr(oldestUpstreamMsgIter
.msgTs()));
101 oldestUpstreamMsgIter
.discard();
104 * Immediately try to reload `oldestUpstreamMsgIter`.
106 * The possible outcomes are:
108 * There's an available message:
109 * Call `_mHeap.replaceTop()` to bring
110 * `oldestUpstreamMsgIter` back to the heap, performing a
111 * single heap rebalance.
113 * There isn't an available message (ended):
114 * Remove `oldestUpstreamMsgIter` from the heap.
116 * `bt2::TryAgain` is thrown:
117 * Remove `oldestUpstreamMsgIter` from the heap.
119 * Add `oldestUpstreamMsgIter` to the set of upstream
120 * message iterators to reload. The next call to _next()
121 * will move it to the heap again (if not ended) after
122 * having successfully called reload().
125 "Trying to reload upstream message iterator having the oldest message: port-name={}",
126 oldestUpstreamMsgIter
.portName());
129 if (G_LIKELY(oldestUpstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
130 /* New current message: update heap */
131 _mHeap
.replaceTop(&oldestUpstreamMsgIter
);
132 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
133 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
136 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
137 "port-name{}, heap-len={}",
138 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
140 } catch (const bt2::TryAgain
&) {
142 _mUpstreamMsgItersToReload
.push_back(&oldestUpstreamMsgIter
);
143 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
144 "port-name={}, heap-len={}, to-reload-len={}",
145 oldestUpstreamMsgIter
.portName(), _mHeap
.len(),
146 _mUpstreamMsgItersToReload
.size());
152 void MsgIter::_ensureFullHeap()
155 * Always remove from `_mUpstreamMsgItersToReload` when reload()
158 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
159 * then we don't need it anymore (remains alive in
160 * `_mUpstreamMsgIters`).
162 for (auto it
= _mUpstreamMsgItersToReload
.begin(); it
!= _mUpstreamMsgItersToReload
.end();
163 it
= _mUpstreamMsgItersToReload
.erase(it
)) {
164 auto& upstreamMsgIter
= **it
;
166 BT_CPPLOGD("Handling upstream message iterator to reload: "
167 "port-name={}, heap-len={}, to-reload-len={}",
168 upstreamMsgIter
.portName(), _mHeap
.len(), _mUpstreamMsgItersToReload
.size());
170 if (G_LIKELY(upstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
171 /* New current message: move to heap */
172 _mHeap
.insert(&upstreamMsgIter
);
173 BT_CPPLOGD("More messages available; "
174 "inserted upstream message iterator into heap from \"to reload\" set: "
175 "port-name={}, heap-len={}",
176 upstreamMsgIter
.portName(), _mHeap
.len());
178 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
180 upstreamMsgIter
.portName());
185 bool MsgIter::_canSeekBeginning()
188 * We can only seek our beginning if all our upstream message
189 * iterators also can.
191 return std::all_of(_mUpstreamMsgIters
.begin(), _mUpstreamMsgIters
.end(),
192 [](UpstreamMsgIter::UP
& upstreamMsgIter
) {
193 return upstreamMsgIter
->canSeekBeginning();
197 void MsgIter::_seekBeginning()
200 * The current approach is that this operation is either successful
201 * (all upstream message iterators seek) or not. If it's not, then
202 * we don't keep any state that some sought and some didn't: we'll
203 * restart the whole process when the user tries to seek again.
205 * The first step is to clear all the containers of upstream message
206 * iterator pointers so that we can process what's in
207 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
208 * if any seeking fails below, the downstream user is required to
209 * try the "seek beginning" operation again and only call
210 * bt_message_iterator_next() if it was successful.
212 * This means if the first four upstream message iterators seek, and
213 * then the fifth one throws `bt2::TryAgain`, then the next time
214 * this method executes, the first four upstream message iterators
215 * will seek again. That being said, it's such an unlikely scenario
216 * that the simplicity outweighs performance concerns here.
219 _mUpstreamMsgItersToReload
.clear();
221 /* Also reset clock class expectation */
222 _mClkClsExpectation
= _ClkClsExpectation::ANY
;
223 _mExpectedClkClsUuid
.reset();
225 /* Make each upstream message iterator seek */
226 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
227 /* This may throw! */
228 upstreamMsgIter
->seekBeginning();
232 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
233 * next call to _next() will deal with those.
235 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
236 _mUpstreamMsgItersToReload
.push_back(upstreamMsgIter
.get());
242 const char *msgTypeStr(const bt2::ConstMessage msg
) noexcept
244 return bt_common_message_type_string(static_cast<bt_message_type
>(msg
.type()));
247 std::string
optLogStr(const char * const str
) noexcept
249 return str
? fmt::format("\"{}\"", str
) : "(none)";
254 void MsgIter::_setClkClsExpectation(
255 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) noexcept
257 BT_ASSERT_DBG(_mClkClsExpectation
== _ClkClsExpectation::ANY
);
259 /* No initial clock class: also expect none afterwards */
261 _mClkClsExpectation
= _ClkClsExpectation::NONE
;
266 * This is the first clock class that this message iterator
267 * encounters. Its properties determine what to expect for the whole
268 * lifetime of the iterator.
270 if (clkCls
->originIsUnixEpoch()) {
271 /* Expect clock classes having a Unix epoch origin*/
272 _mClkClsExpectation
= _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
;
274 if (clkCls
->uuid()) {
276 * Expect clock classes not having a Unix epoch origin and
277 * with a specific UUID.
279 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
;
280 _mExpectedClkClsUuid
= *clkCls
->uuid();
283 * Expect clock classes not having a Unix epoch origin and
286 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
;
291 void MsgIter::_makeSureClkClsIsExpected(
292 const bt2::ConstMessage msg
,
293 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) const
295 BT_ASSERT_DBG(_mClkClsExpectation
!= _ClkClsExpectation::ANY
);
298 if (_mClkClsExpectation
!= _ClkClsExpectation::NONE
) {
300 * `msg` is a stream beginning message because a message
301 * iterator inactivity message always has a clock class.
303 const auto streamCls
= msg
.asStreamBeginning().stream().cls();
305 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
306 "Expecting a clock class, but got none: "
307 "stream-class-addr={}, stream-class-name=\"{}\", "
308 "stream-class-id={}",
309 static_cast<const void *>(streamCls
.libObjPtr()),
310 optLogStr(streamCls
.name()), streamCls
.id());
316 const auto clkClsAddr
= static_cast<const void *>(clkCls
->libObjPtr());
318 switch (_mClkClsExpectation
) {
319 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
:
320 if (!clkCls
->originIsUnixEpoch()) {
321 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
322 "Expecting a clock class having a Unix epoch origin, "
323 "but got one not having a Unix epoch origin: "
324 "clock-class-addr={}, clock-class-name={}",
325 clkClsAddr
, optLogStr(clkCls
->name()));
329 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
:
330 BT_ASSERT_DBG(!_mExpectedClkClsUuid
);
332 if (clkCls
->originIsUnixEpoch()) {
333 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
335 "Expecting a clock class not having a Unix epoch origin, "
336 "but got one having a Unix epoch origin: "
337 "clock-class-addr={}, clock-class-name={}",
338 clkClsAddr
, optLogStr(clkCls
->name()));
341 if (clkCls
->uuid()) {
342 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
344 "Expecting a clock class without a UUID, but got one with a UUID: "
345 "clock-class-addr={}, clock-class-name={}, uuid={}",
346 clkClsAddr
, optLogStr(clkCls
->name()), clkCls
->uuid()->str());
350 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
:
351 BT_ASSERT_DBG(_mExpectedClkClsUuid
);
353 if (clkCls
->originIsUnixEpoch()) {
354 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
356 "Expecting a clock class not having a Unix epoch origin, "
357 "but got one having a Unix epoch origin: "
358 "clock-class-addr={}, clock-class-name={}",
359 clkClsAddr
, optLogStr(clkCls
->name()));
362 if (!clkCls
->uuid()) {
363 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
365 "Expecting a clock class with a UUID, but got one without a UUID: "
366 "clock-class-addr={}, clock-class-name={}",
367 clkClsAddr
, optLogStr(clkCls
->name()));
370 if (*clkCls
->uuid() != bt2c::UuidView
{*_mExpectedClkClsUuid
}) {
371 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
372 "Expecting a clock class with a specific UUID, "
373 "but got one with a different UUID: "
374 "clock-class-addr={}, clock-class-name={}, "
375 "expected-uuid=\"{}\", uuid=\"{}\"",
376 clkClsAddr
, optLogStr(clkCls
->name()),
377 _mExpectedClkClsUuid
->str(), clkCls
->uuid()->str());
381 case _ClkClsExpectation::NONE
:
382 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
383 "Expecting no clock class, but got one: "
384 "clock-class-addr={}, clock-class-name={}",
385 clkClsAddr
, optLogStr(clkCls
->name()));
392 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg
)
394 if (G_LIKELY(!msg
.isStreamBeginning() && !msg
.isMessageIteratorInactivity())) {
396 * We don't care about the other types: all the messages related
397 * to a given stream shared the same default clock class, if
403 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msgTypeStr(msg
));
405 /* Get the clock class, if any, of `msg` */
406 const auto clkCls
= bt2c::call([msg
]() -> bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> {
407 if (msg
.isStreamBeginning()) {
408 return msg
.asStreamBeginning().stream().cls().defaultClockClass();
410 BT_ASSERT(msg
.isMessageIteratorInactivity());
411 return msg
.asMessageIteratorInactivity().clockSnapshot().clockClass();
415 /* Set the expectation or check it */
416 if (_mClkClsExpectation
== _ClkClsExpectation::ANY
) {
417 /* First message: set the expectation */
418 this->_setClkClsExpectation(clkCls
);
420 /* Make sure clock class is expected */
421 this->_makeSureClkClsIsExpected(msg
, clkCls
);
425 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger
& logger
) : _mLogger
{logger
}
429 bool MsgIter::_HeapComparator::operator()(
430 const UpstreamMsgIter
* const upstreamMsgIterA
,
431 const UpstreamMsgIter
* const upstreamMsgIterB
) const noexcept
433 /* The two messages to compare */
434 const auto msgA
= upstreamMsgIterA
->msg();
435 const auto msgB
= upstreamMsgIterB
->msg();
436 auto& msgTsA
= upstreamMsgIterA
->msgTs();
437 auto& msgTsB
= upstreamMsgIterB
->msgTs();
439 if (_mLogger
.wouldLogT()) {
440 BT_CPPLOGT("Comparing two messages: "
441 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
442 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
443 upstreamMsgIterA
->portName(), msgTypeStr(msgA
), optMsgTsStr(msgTsA
),
444 upstreamMsgIterB
->portName(), msgTypeStr(msgB
), optMsgTsStr(msgTsB
));
448 * Try to compare using timestamps.
450 * If both timestamps are set and their values are different, then
451 * use this to establish the ordering of the two messages.
453 * If one timestamp is set, but not the other, the latter always
454 * wins. This is because, for a given upstream message iterator, we
455 * need to consume all the messages having no timestamp so that we
456 * can reach a message with a timestamp to compare it.
458 * Otherwise, we'll fall back to using
459 * common_muxing_compare_messages().
461 if (G_LIKELY(msgTsA
&& msgTsB
)) {
462 if (*msgTsA
< *msgTsB
) {
464 * Return `true` because `_mHeap.top()` provides the
465 * "greatest" element. For us, the "greatest" message is
466 * the oldest one, that is, the one having the smallest
469 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
471 } else if (*msgTsA
> *msgTsB
) {
473 "Timestamp of message A is greater than timestamp of message B: oldest=B");
476 } else if (msgTsA
&& !msgTsB
) {
477 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
479 } else if (!msgTsA
&& msgTsB
) {
480 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
485 * Comparison failed using timestamps: determine an ordering using
486 * arbitrary properties, but in a deterministic way.
488 * common_muxing_compare_messages() returns less than 0 if the first
489 * message is considered older than the second, which corresponds to
490 * this comparator returning `true`.
492 const auto res
= common_muxing_compare_messages(msgA
.libObjPtr(), msgB
.libObjPtr()) < 0;
494 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
499 } /* namespace bt2mux */