2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #include "common/common.h"
14 #include "cpp-common/bt2c/call.hpp"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/bt2s/make-unique.hpp"
17 #include "cpp-common/vendor/fmt/format.h"
19 #include "plugins/common/muxing/muxing.h"
22 #include "msg-iter.hpp"
26 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter
,
27 const bt2::SelfMessageIteratorConfiguration cfg
, bt2::SelfComponentOutputPort
) :
28 bt2::UserMessageIterator
<MsgIter
, Comp
> {selfMsgIter
, "MSG-ITER"},
29 _mHeap
{_HeapComparator
{_mLogger
}}
32 * Create one upstream message iterator for each connected
35 auto canSeekForward
= true;
37 for (const auto inputPort
: this->_component()._inputPorts()) {
38 if (!inputPort
.isConnected()) {
39 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort
.name());
44 * Create new upstream message iterator and immediately make it
45 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
46 * deal with it when downstream calls next()).
48 auto upstreamMsgIter
= bt2s::make_unique
<UpstreamMsgIter
>(
49 this->_createMessageIterator(inputPort
), inputPort
.name(), _mLogger
);
51 canSeekForward
= canSeekForward
&& upstreamMsgIter
->canSeekForward();
52 _mUpstreamMsgItersToReload
.emplace_back(upstreamMsgIter
.get());
53 _mUpstreamMsgIters
.push_back(std::move(upstreamMsgIter
));
56 /* Set the "can seek forward" configuration */
57 cfg
.canSeekForward(canSeekForward
);
62 std::string
optMsgTsStr(const bt2s::optional
<std::int64_t>& ts
)
65 return fmt::to_string(*ts
);
73 void MsgIter::_next(bt2::ConstMessageArray
& msgs
)
75 /* Make sure all upstream message iterators are part of the heap */
76 this->_ensureFullHeap();
78 while (msgs
.length() < msgs
.capacity()) {
80 if (G_UNLIKELY(_mHeap
.isEmpty())) {
81 /* No more upstream messages! */
86 * Retrieve the upstream message iterator having the oldest message.
88 auto& oldestUpstreamMsgIter
= *_mHeap
.top();
90 /* Validate the clock class of the oldest message */
91 this->_validateMsgClkCls(oldestUpstreamMsgIter
.msg());
93 /* Append the oldest message and discard it */
94 msgs
.append(oldestUpstreamMsgIter
.msg().shared());
96 if (_mLogger
.wouldLogD()) {
97 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
98 oldestUpstreamMsgIter
.portName(),
99 optMsgTsStr(oldestUpstreamMsgIter
.msgTs()));
102 oldestUpstreamMsgIter
.discard();
105 * Immediately try to reload `oldestUpstreamMsgIter`.
107 * The possible outcomes are:
109 * There's an available message:
110 * Call `_mHeap.replaceTop()` to bring
111 * `oldestUpstreamMsgIter` back to the heap, performing a
112 * single heap rebalance.
114 * There isn't an available message (ended):
115 * Remove `oldestUpstreamMsgIter` from the heap.
117 * `bt2::TryAgain` is thrown:
118 * Remove `oldestUpstreamMsgIter` from the heap.
120 * Add `oldestUpstreamMsgIter` to the set of upstream
121 * message iterators to reload. The next call to _next()
122 * will move it to the heap again (if not ended) after
123 * having successfully called reload().
126 "Trying to reload upstream message iterator having the oldest message: port-name={}",
127 oldestUpstreamMsgIter
.portName());
130 if (G_LIKELY(oldestUpstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
131 /* New current message: update heap */
132 _mHeap
.replaceTop(&oldestUpstreamMsgIter
);
133 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
134 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
137 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
138 "port-name{}, heap-len={}",
139 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
141 } catch (const bt2::TryAgain
&) {
143 _mUpstreamMsgItersToReload
.push_back(&oldestUpstreamMsgIter
);
144 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
145 "port-name={}, heap-len={}, to-reload-len={}",
146 oldestUpstreamMsgIter
.portName(), _mHeap
.len(),
147 _mUpstreamMsgItersToReload
.size());
153 void MsgIter::_ensureFullHeap()
156 * Always remove from `_mUpstreamMsgItersToReload` when reload()
159 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
160 * then we don't need it anymore (remains alive in
161 * `_mUpstreamMsgIters`).
163 for (auto it
= _mUpstreamMsgItersToReload
.begin(); it
!= _mUpstreamMsgItersToReload
.end();
164 it
= _mUpstreamMsgItersToReload
.erase(it
)) {
165 auto& upstreamMsgIter
= **it
;
167 BT_CPPLOGD("Handling upstream message iterator to reload: "
168 "port-name={}, heap-len={}, to-reload-len={}",
169 upstreamMsgIter
.portName(), _mHeap
.len(), _mUpstreamMsgItersToReload
.size());
171 if (G_LIKELY(upstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
172 /* New current message: move to heap */
173 _mHeap
.insert(&upstreamMsgIter
);
174 BT_CPPLOGD("More messages available; "
175 "inserted upstream message iterator into heap from \"to reload\" set: "
176 "port-name={}, heap-len={}",
177 upstreamMsgIter
.portName(), _mHeap
.len());
179 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
181 upstreamMsgIter
.portName());
186 bool MsgIter::_canSeekBeginning()
189 * We can only seek our beginning if all our upstream message
190 * iterators also can.
192 return std::all_of(_mUpstreamMsgIters
.begin(), _mUpstreamMsgIters
.end(),
193 [](UpstreamMsgIter::UP
& upstreamMsgIter
) {
194 return upstreamMsgIter
->canSeekBeginning();
198 void MsgIter::_seekBeginning()
201 * The current approach is that this operation is either successful
202 * (all upstream message iterators seek) or not. If it's not, then
203 * we don't keep any state that some sought and some didn't: we'll
204 * restart the whole process when the user tries to seek again.
206 * The first step is to clear all the containers of upstream message
207 * iterator pointers so that we can process what's in
208 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
209 * if any seeking fails below, the downstream user is required to
210 * try the "seek beginning" operation again and only call
211 * bt_message_iterator_next() if it was successful.
213 * This means if the first four upstream message iterators seek, and
214 * then the fifth one throws `bt2::TryAgain`, then the next time
215 * this method executes, the first four upstream message iterators
216 * will seek again. That being said, it's such an unlikely scenario
217 * that the simplicity outweighs performance concerns here.
220 _mUpstreamMsgItersToReload
.clear();
222 /* Also reset clock class expectation */
223 _mClkClsExpectation
= _ClkClsExpectation::ANY
;
224 _mExpectedClkClsUuid
.reset();
226 /* Make each upstream message iterator seek */
227 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
228 /* This may throw! */
229 upstreamMsgIter
->seekBeginning();
233 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
234 * next call to _next() will deal with those.
236 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
237 _mUpstreamMsgItersToReload
.push_back(upstreamMsgIter
.get());
243 std::string
optLogStr(const char * const str
) noexcept
245 return str
? fmt::format("\"{}\"", str
) : "(none)";
250 void MsgIter::_setClkClsExpectation(
251 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) noexcept
253 BT_ASSERT_DBG(_mClkClsExpectation
== _ClkClsExpectation::ANY
);
255 /* No initial clock class: also expect none afterwards */
257 _mClkClsExpectation
= _ClkClsExpectation::NONE
;
262 * This is the first clock class that this message iterator
263 * encounters. Its properties determine what to expect for the whole
264 * lifetime of the iterator.
266 if (clkCls
->originIsUnixEpoch()) {
267 /* Expect clock classes having a Unix epoch origin*/
268 _mClkClsExpectation
= _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
;
270 if (clkCls
->uuid()) {
272 * Expect clock classes not having a Unix epoch origin and
273 * with a specific UUID.
275 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
;
276 _mExpectedClkClsUuid
= *clkCls
->uuid();
279 * Expect clock classes not having a Unix epoch origin and
282 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
;
287 void MsgIter::_makeSureClkClsIsExpected(
288 const bt2::ConstMessage msg
,
289 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) const
291 BT_ASSERT_DBG(_mClkClsExpectation
!= _ClkClsExpectation::ANY
);
294 if (_mClkClsExpectation
!= _ClkClsExpectation::NONE
) {
296 * `msg` is a stream beginning message because a message
297 * iterator inactivity message always has a clock class.
299 const auto streamCls
= msg
.asStreamBeginning().stream().cls();
301 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
302 "Expecting a clock class, but got none: "
303 "stream-class-addr={}, stream-class-name=\"{}\", "
304 "stream-class-id={}",
305 static_cast<const void *>(streamCls
.libObjPtr()),
306 optLogStr(streamCls
.name()), streamCls
.id());
312 const auto clkClsAddr
= static_cast<const void *>(clkCls
->libObjPtr());
314 switch (_mClkClsExpectation
) {
315 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
:
316 if (!clkCls
->originIsUnixEpoch()) {
317 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
318 "Expecting a clock class having a Unix epoch origin, "
319 "but got one not having a Unix epoch origin: "
320 "clock-class-addr={}, clock-class-name={}",
321 clkClsAddr
, optLogStr(clkCls
->name()));
325 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
:
326 BT_ASSERT_DBG(!_mExpectedClkClsUuid
);
328 if (clkCls
->originIsUnixEpoch()) {
329 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
331 "Expecting a clock class not having a Unix epoch origin, "
332 "but got one having a Unix epoch origin: "
333 "clock-class-addr={}, clock-class-name={}",
334 clkClsAddr
, optLogStr(clkCls
->name()));
337 if (clkCls
->uuid()) {
338 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
340 "Expecting a clock class without a UUID, but got one with a UUID: "
341 "clock-class-addr={}, clock-class-name={}, uuid={}",
342 clkClsAddr
, optLogStr(clkCls
->name()), clkCls
->uuid()->str());
346 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
:
347 BT_ASSERT_DBG(_mExpectedClkClsUuid
);
349 if (clkCls
->originIsUnixEpoch()) {
350 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
352 "Expecting a clock class not having a Unix epoch origin, "
353 "but got one having a Unix epoch origin: "
354 "clock-class-addr={}, clock-class-name={}",
355 clkClsAddr
, optLogStr(clkCls
->name()));
358 if (!clkCls
->uuid()) {
359 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
361 "Expecting a clock class with a UUID, but got one without a UUID: "
362 "clock-class-addr={}, clock-class-name={}",
363 clkClsAddr
, optLogStr(clkCls
->name()));
366 if (*clkCls
->uuid() != bt2c::UuidView
{*_mExpectedClkClsUuid
}) {
367 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
368 "Expecting a clock class with a specific UUID, "
369 "but got one with a different UUID: "
370 "clock-class-addr={}, clock-class-name={}, "
371 "expected-uuid=\"{}\", uuid=\"{}\"",
372 clkClsAddr
, optLogStr(clkCls
->name()),
373 _mExpectedClkClsUuid
->str(), clkCls
->uuid()->str());
377 case _ClkClsExpectation::NONE
:
378 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
379 "Expecting no clock class, but got one: "
380 "clock-class-addr={}, clock-class-name={}",
381 clkClsAddr
, optLogStr(clkCls
->name()));
388 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg
)
390 if (G_LIKELY(!msg
.isStreamBeginning() && !msg
.isMessageIteratorInactivity())) {
392 * We don't care about the other types: all the messages related
393 * to a given stream shared the same default clock class, if
399 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg
.type());
401 /* Get the clock class, if any, of `msg` */
402 const auto clkCls
= bt2c::call([msg
]() -> bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> {
403 if (msg
.isStreamBeginning()) {
404 return msg
.asStreamBeginning().streamClassDefaultClockClass();
406 BT_ASSERT(msg
.isMessageIteratorInactivity());
407 return msg
.asMessageIteratorInactivity().clockSnapshot().clockClass();
411 /* Set the expectation or check it */
412 if (_mClkClsExpectation
== _ClkClsExpectation::ANY
) {
413 /* First message: set the expectation */
414 this->_setClkClsExpectation(clkCls
);
416 /* Make sure clock class is expected */
417 this->_makeSureClkClsIsExpected(msg
, clkCls
);
421 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger
& logger
) : _mLogger
{logger
}
425 bool MsgIter::_HeapComparator::operator()(
426 const UpstreamMsgIter
* const upstreamMsgIterA
,
427 const UpstreamMsgIter
* const upstreamMsgIterB
) const noexcept
429 /* The two messages to compare */
430 const auto msgA
= upstreamMsgIterA
->msg();
431 const auto msgB
= upstreamMsgIterB
->msg();
432 auto& msgTsA
= upstreamMsgIterA
->msgTs();
433 auto& msgTsB
= upstreamMsgIterB
->msgTs();
435 if (_mLogger
.wouldLogT()) {
436 BT_CPPLOGT("Comparing two messages: "
437 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
438 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
439 upstreamMsgIterA
->portName(), msgA
.type(), optMsgTsStr(msgTsA
),
440 upstreamMsgIterB
->portName(), msgB
.type(), optMsgTsStr(msgTsB
));
444 * Try to compare using timestamps.
446 * If both timestamps are set and their values are different, then
447 * use this to establish the ordering of the two messages.
449 * If one timestamp is set, but not the other, the latter always
450 * wins. This is because, for a given upstream message iterator, we
451 * need to consume all the messages having no timestamp so that we
452 * can reach a message with a timestamp to compare it.
454 * Otherwise, we'll fall back to using
455 * common_muxing_compare_messages().
457 if (G_LIKELY(msgTsA
&& msgTsB
)) {
458 if (*msgTsA
< *msgTsB
) {
460 * Return `true` because `_mHeap.top()` provides the
461 * "greatest" element. For us, the "greatest" message is
462 * the oldest one, that is, the one having the smallest
465 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
467 } else if (*msgTsA
> *msgTsB
) {
469 "Timestamp of message A is greater than timestamp of message B: oldest=B");
472 } else if (msgTsA
&& !msgTsB
) {
473 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
475 } else if (!msgTsA
&& msgTsB
) {
476 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
481 * Comparison failed using timestamps: determine an ordering using
482 * arbitrary properties, but in a deterministic way.
484 * common_muxing_compare_messages() returns less than 0 if the first
485 * message is considered older than the second, which corresponds to
486 * this comparator returning `true`.
488 const auto res
= common_muxing_compare_messages(msgA
.libObjPtr(), msgB
.libObjPtr()) < 0;
490 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
495 } /* namespace bt2mux */