2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #include "common/common.h"
14 #include "cpp-common/bt2c/call.hpp"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/bt2s/make-unique.hpp"
17 #include "cpp-common/vendor/fmt/format.h"
19 #include "plugins/common/muxing/muxing.h"
22 #include "msg-iter.hpp"
26 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter
,
27 const bt2::SelfMessageIteratorConfiguration cfg
, bt2::SelfComponentOutputPort
) :
28 bt2::UserMessageIterator
<MsgIter
, Comp
> {selfMsgIter
, "MSG-ITER"},
29 _mHeap
{_HeapComparator
{_mLogger
}}
32 * Create one upstream message iterator for each connected
35 auto canSeekForward
= true;
37 for (const auto inputPort
: this->_component()._inputPorts()) {
38 if (!inputPort
.isConnected()) {
39 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort
.name());
44 * Create new upstream message iterator and immediately make it
45 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
46 * deal with it when downstream calls next()).
48 auto upstreamMsgIter
= bt2s::make_unique
<UpstreamMsgIter
>(
49 this->_createMessageIterator(inputPort
), inputPort
.name(), _mLogger
);
51 canSeekForward
= canSeekForward
&& upstreamMsgIter
->canSeekForward();
52 _mUpstreamMsgItersToReload
.emplace_back(upstreamMsgIter
.get());
53 _mUpstreamMsgIters
.push_back(std::move(upstreamMsgIter
));
56 /* Set the "can seek forward" configuration */
57 cfg
.canSeekForward(canSeekForward
);
62 std::string
optMsgTsStr(const bt2s::optional
<std::int64_t>& ts
)
65 return fmt::to_string(*ts
);
73 void MsgIter::_next(bt2::ConstMessageArray
& msgs
)
75 /* Make sure all upstream message iterators are part of the heap */
76 this->_ensureFullHeap();
78 while (msgs
.length() < msgs
.capacity()) {
80 if (G_UNLIKELY(_mHeap
.isEmpty())) {
81 /* No more upstream messages! */
86 * Retrieve the upstream message iterator having the oldest message.
88 auto& oldestUpstreamMsgIter
= *_mHeap
.top();
90 /* Validate the clock class of the oldest message */
91 this->_validateMsgClkCls(oldestUpstreamMsgIter
.msg());
93 /* Append the oldest message and discard it */
94 msgs
.append(oldestUpstreamMsgIter
.msg().shared());
96 if (_mLogger
.wouldLogD()) {
97 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
98 oldestUpstreamMsgIter
.portName(),
99 optMsgTsStr(oldestUpstreamMsgIter
.msgTs()));
102 oldestUpstreamMsgIter
.discard();
105 * Immediately try to reload `oldestUpstreamMsgIter`.
107 * The possible outcomes are:
109 * There's an available message:
110 * Call `_mHeap.replaceTop()` to bring
111 * `oldestUpstreamMsgIter` back to the heap, performing a
112 * single heap rebalance.
114 * There isn't an available message (ended):
115 * Remove `oldestUpstreamMsgIter` from the heap.
117 * `bt2::TryAgain` is thrown:
118 * Remove `oldestUpstreamMsgIter` from the heap.
120 * Add `oldestUpstreamMsgIter` to the set of upstream
121 * message iterators to reload. The next call to _next()
122 * will move it to the heap again (if not ended) after
123 * having successfully called reload().
126 "Trying to reload upstream message iterator having the oldest message: port-name={}",
127 oldestUpstreamMsgIter
.portName());
130 if (G_LIKELY(oldestUpstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
131 /* New current message: update heap */
132 _mHeap
.replaceTop(&oldestUpstreamMsgIter
);
133 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
134 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
137 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
138 "port-name{}, heap-len={}",
139 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
141 } catch (const bt2::TryAgain
&) {
143 _mUpstreamMsgItersToReload
.push_back(&oldestUpstreamMsgIter
);
144 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
145 "port-name={}, heap-len={}, to-reload-len={}",
146 oldestUpstreamMsgIter
.portName(), _mHeap
.len(),
147 _mUpstreamMsgItersToReload
.size());
153 void MsgIter::_ensureFullHeap()
156 * Always remove from `_mUpstreamMsgItersToReload` when reload()
159 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
160 * then we don't need it anymore (remains alive in
161 * `_mUpstreamMsgIters`).
163 for (auto it
= _mUpstreamMsgItersToReload
.begin(); it
!= _mUpstreamMsgItersToReload
.end();
164 it
= _mUpstreamMsgItersToReload
.erase(it
)) {
165 auto& upstreamMsgIter
= **it
;
167 BT_CPPLOGD("Handling upstream message iterator to reload: "
168 "port-name={}, heap-len={}, to-reload-len={}",
169 upstreamMsgIter
.portName(), _mHeap
.len(), _mUpstreamMsgItersToReload
.size());
171 if (G_LIKELY(upstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
172 /* New current message: move to heap */
173 _mHeap
.insert(&upstreamMsgIter
);
174 BT_CPPLOGD("More messages available; "
175 "inserted upstream message iterator into heap from \"to reload\" set: "
176 "port-name={}, heap-len={}",
177 upstreamMsgIter
.portName(), _mHeap
.len());
179 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
181 upstreamMsgIter
.portName());
186 bool MsgIter::_canSeekBeginning()
189 * We can only seek our beginning if all our upstream message
190 * iterators also can.
192 return std::all_of(_mUpstreamMsgIters
.begin(), _mUpstreamMsgIters
.end(),
193 [](UpstreamMsgIter::UP
& upstreamMsgIter
) {
194 return upstreamMsgIter
->canSeekBeginning();
198 void MsgIter::_seekBeginning()
201 * The current approach is that this operation is either successful
202 * (all upstream message iterators seek) or not. If it's not, then
203 * we don't keep any state that some sought and some didn't: we'll
204 * restart the whole process when the user tries to seek again.
206 * The first step is to clear all the containers of upstream message
207 * iterator pointers so that we can process what's in
208 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
209 * if any seeking fails below, the downstream user is required to
210 * try the "seek beginning" operation again and only call
211 * bt_message_iterator_next() if it was successful.
213 * This means if the first four upstream message iterators seek, and
214 * then the fifth one throws `bt2::TryAgain`, then the next time
215 * this method executes, the first four upstream message iterators
216 * will seek again. That being said, it's such an unlikely scenario
217 * that the simplicity outweighs performance concerns here.
220 _mUpstreamMsgItersToReload
.clear();
222 /* Also reset clock class expectation */
223 _mClkClsExpectation
= _ClkClsExpectation::ANY
;
224 _mExpectedClkClsUuid
.reset();
226 /* Make each upstream message iterator seek */
227 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
228 /* This may throw! */
229 upstreamMsgIter
->seekBeginning();
233 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
234 * next call to _next() will deal with those.
236 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
237 _mUpstreamMsgItersToReload
.push_back(upstreamMsgIter
.get());
243 std::string
optLogStr(const char * const str
) noexcept
245 return str
? fmt::format("\"{}\"", str
) : "(none)";
250 void MsgIter::_setClkClsExpectation(
251 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) noexcept
253 BT_ASSERT_DBG(_mClkClsExpectation
== _ClkClsExpectation::ANY
);
255 /* No initial clock class: also expect none afterwards */
257 _mClkClsExpectation
= _ClkClsExpectation::NONE
;
262 * This is the first clock class that this message iterator
263 * encounters. Its properties determine what to expect for the whole
264 * lifetime of the iterator.
266 if (clkCls
->originIsUnixEpoch()) {
267 /* Expect clock classes having a Unix epoch origin*/
268 _mClkClsExpectation
= _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
;
270 if (clkCls
->uuid()) {
272 * Expect clock classes not having a Unix epoch origin and
273 * with a specific UUID.
275 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
;
276 _mExpectedClkClsUuid
= *clkCls
->uuid();
279 * Expect clock classes not having a Unix epoch origin and
282 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
;
283 _mExpectedClkCls
= clkCls
->shared();
288 void MsgIter::_makeSureClkClsIsExpected(
289 const bt2::ConstMessage msg
,
290 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) const
292 BT_ASSERT_DBG(_mClkClsExpectation
!= _ClkClsExpectation::ANY
);
295 if (_mClkClsExpectation
!= _ClkClsExpectation::NONE
) {
297 * `msg` is a stream beginning message because a message
298 * iterator inactivity message always has a clock class.
300 const auto streamCls
= msg
.asStreamBeginning().stream().cls();
302 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
303 "Expecting a clock class, but got none: "
304 "stream-class-addr={}, stream-class-name=\"{}\", "
305 "stream-class-id={}",
306 static_cast<const void *>(streamCls
.libObjPtr()),
307 optLogStr(streamCls
.name()), streamCls
.id());
313 const auto clkClsAddr
= static_cast<const void *>(clkCls
->libObjPtr());
315 switch (_mClkClsExpectation
) {
316 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
:
317 if (!clkCls
->originIsUnixEpoch()) {
318 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
319 "Expecting a clock class having a Unix epoch origin, "
320 "but got one not having a Unix epoch origin: "
321 "clock-class-addr={}, clock-class-name={}",
322 clkClsAddr
, optLogStr(clkCls
->name()));
326 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
:
327 BT_ASSERT_DBG(!_mExpectedClkClsUuid
);
328 BT_ASSERT_DBG(_mExpectedClkCls
);
330 if (clkCls
->libObjPtr() != _mExpectedClkCls
->libObjPtr()) {
331 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
333 "Unexpected clock class: "
334 "expected-clock-class-addr={}, expected-clock-class-name={}, "
335 "actual-clock-class-addr={}, actual-clock-class-name={}",
336 fmt::ptr(_mExpectedClkCls
->libObjPtr()), optLogStr(_mExpectedClkCls
->name()),
337 clkClsAddr
, optLogStr(clkCls
->name()));
341 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
:
342 BT_ASSERT_DBG(_mExpectedClkClsUuid
);
343 BT_ASSERT_DBG(!_mExpectedClkCls
);
345 if (clkCls
->originIsUnixEpoch()) {
346 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
348 "Expecting a clock class not having a Unix epoch origin, "
349 "but got one having a Unix epoch origin: "
350 "clock-class-addr={}, clock-class-name={}",
351 clkClsAddr
, optLogStr(clkCls
->name()));
354 if (!clkCls
->uuid()) {
355 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
357 "Expecting a clock class with a UUID, but got one without a UUID: "
358 "clock-class-addr={}, clock-class-name={}",
359 clkClsAddr
, optLogStr(clkCls
->name()));
362 if (*clkCls
->uuid() != bt2c::UuidView
{*_mExpectedClkClsUuid
}) {
363 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
364 "Expecting a clock class with a specific UUID, "
365 "but got one with a different UUID: "
366 "clock-class-addr={}, clock-class-name={}, "
367 "expected-uuid=\"{}\", uuid=\"{}\"",
368 clkClsAddr
, optLogStr(clkCls
->name()),
369 _mExpectedClkClsUuid
->str(), clkCls
->uuid()->str());
373 case _ClkClsExpectation::NONE
:
374 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
375 "Expecting no clock class, but got one: "
376 "clock-class-addr={}, clock-class-name={}",
377 clkClsAddr
, optLogStr(clkCls
->name()));
384 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg
)
386 if (G_LIKELY(!msg
.isStreamBeginning() && !msg
.isMessageIteratorInactivity())) {
388 * We don't care about the other types: all the messages related
389 * to a given stream shared the same default clock class, if
395 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg
.type());
397 /* Get the clock class, if any, of `msg` */
398 const auto clkCls
= bt2c::call([msg
]() -> bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> {
399 if (msg
.isStreamBeginning()) {
400 return msg
.asStreamBeginning().streamClassDefaultClockClass();
402 BT_ASSERT(msg
.isMessageIteratorInactivity());
403 return msg
.asMessageIteratorInactivity().clockSnapshot().clockClass();
407 /* Set the expectation or check it */
408 if (_mClkClsExpectation
== _ClkClsExpectation::ANY
) {
409 /* First message: set the expectation */
410 this->_setClkClsExpectation(clkCls
);
412 /* Make sure clock class is expected */
413 this->_makeSureClkClsIsExpected(msg
, clkCls
);
417 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger
& logger
) : _mLogger
{logger
}
421 bool MsgIter::_HeapComparator::operator()(
422 const UpstreamMsgIter
* const upstreamMsgIterA
,
423 const UpstreamMsgIter
* const upstreamMsgIterB
) const noexcept
425 /* The two messages to compare */
426 const auto msgA
= upstreamMsgIterA
->msg();
427 const auto msgB
= upstreamMsgIterB
->msg();
428 auto& msgTsA
= upstreamMsgIterA
->msgTs();
429 auto& msgTsB
= upstreamMsgIterB
->msgTs();
431 if (_mLogger
.wouldLogT()) {
432 BT_CPPLOGT("Comparing two messages: "
433 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
434 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
435 upstreamMsgIterA
->portName(), msgA
.type(), optMsgTsStr(msgTsA
),
436 upstreamMsgIterB
->portName(), msgB
.type(), optMsgTsStr(msgTsB
));
440 * Try to compare using timestamps.
442 * If both timestamps are set and their values are different, then
443 * use this to establish the ordering of the two messages.
445 * If one timestamp is set, but not the other, the latter always
446 * wins. This is because, for a given upstream message iterator, we
447 * need to consume all the messages having no timestamp so that we
448 * can reach a message with a timestamp to compare it.
450 * Otherwise, we'll fall back to using
451 * common_muxing_compare_messages().
453 if (G_LIKELY(msgTsA
&& msgTsB
)) {
454 if (*msgTsA
< *msgTsB
) {
456 * Return `true` because `_mHeap.top()` provides the
457 * "greatest" element. For us, the "greatest" message is
458 * the oldest one, that is, the one having the smallest
461 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
463 } else if (*msgTsA
> *msgTsB
) {
465 "Timestamp of message A is greater than timestamp of message B: oldest=B");
468 } else if (msgTsA
&& !msgTsB
) {
469 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
471 } else if (!msgTsA
&& msgTsB
) {
472 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
477 * Comparison failed using timestamps: determine an ordering using
478 * arbitrary properties, but in a deterministic way.
480 * common_muxing_compare_messages() returns less than 0 if the first
481 * message is considered older than the second, which corresponds to
482 * this comparator returning `true`.
484 const auto res
= common_muxing_compare_messages(msgA
.libObjPtr(), msgB
.libObjPtr()) < 0;
486 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
491 } /* namespace bt2mux */