2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #include "cpp-common/bt2c/fmt.hpp" /* IWYU pragma: keep */
14 #include "cpp-common/bt2s/make-unique.hpp"
15 #include "cpp-common/vendor/fmt/format.h"
17 #include "plugins/common/muxing/muxing.hpp"
20 #include "msg-iter.hpp"
24 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter
,
25 const bt2::SelfMessageIteratorConfiguration cfg
, bt2::SelfComponentOutputPort
) :
26 bt2::UserMessageIterator
<MsgIter
, Comp
> {selfMsgIter
, "MSG-ITER"},
27 _mHeap
{_HeapComparator
{_mLogger
, selfMsgIter
.component().graphMipVersion()}}
30 * Create one upstream message iterator for each connected
33 auto canSeekForward
= true;
35 for (const auto inputPort
: this->_component()._inputPorts()) {
36 if (!inputPort
.isConnected()) {
37 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort
.name());
42 * Create new upstream message iterator and immediately make it
43 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
44 * deal with it when downstream calls next()).
46 auto upstreamMsgIter
= bt2s::make_unique
<UpstreamMsgIter
>(
47 this->_createMessageIterator(inputPort
), inputPort
.name(), _mLogger
);
49 canSeekForward
= canSeekForward
&& upstreamMsgIter
->canSeekForward();
50 _mUpstreamMsgItersToReload
.emplace_back(upstreamMsgIter
.get());
51 _mUpstreamMsgIters
.push_back(std::move(upstreamMsgIter
));
54 /* Set the "can seek forward" configuration */
55 cfg
.canSeekForward(canSeekForward
);
60 std::string
optMsgTsStr(const bt2s::optional
<std::int64_t>& ts
)
63 return fmt::to_string(*ts
);
71 void MsgIter::_next(bt2::ConstMessageArray
& msgs
)
73 /* Make sure all upstream message iterators are part of the heap */
74 this->_ensureFullHeap();
76 while (msgs
.length() < msgs
.capacity()) {
78 if (G_UNLIKELY(_mHeap
.isEmpty())) {
79 /* No more upstream messages! */
84 * Retrieve the upstream message iterator having the oldest message.
86 auto& oldestUpstreamMsgIter
= *_mHeap
.top();
88 /* Validate the clock class of the oldest message */
89 this->_validateMsgClkCls(oldestUpstreamMsgIter
.msg());
91 /* Append the oldest message and discard it */
92 msgs
.append(oldestUpstreamMsgIter
.msg().shared());
94 if (_mLogger
.wouldLogD()) {
95 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
96 oldestUpstreamMsgIter
.portName(),
97 optMsgTsStr(oldestUpstreamMsgIter
.msgTs()));
100 oldestUpstreamMsgIter
.discard();
103 * Immediately try to reload `oldestUpstreamMsgIter`.
105 * The possible outcomes are:
107 * There's an available message:
108 * Call `_mHeap.replaceTop()` to bring
109 * `oldestUpstreamMsgIter` back to the heap, performing a
110 * single heap rebalance.
112 * There isn't an available message (ended):
113 * Remove `oldestUpstreamMsgIter` from the heap.
115 * `bt2::TryAgain` is thrown:
116 * Remove `oldestUpstreamMsgIter` from the heap.
118 * Add `oldestUpstreamMsgIter` to the set of upstream
119 * message iterators to reload. The next call to _next()
120 * will move it to the heap again (if not ended) after
121 * having successfully called reload().
124 "Trying to reload upstream message iterator having the oldest message: port-name={}",
125 oldestUpstreamMsgIter
.portName());
128 if (G_LIKELY(oldestUpstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::More
)) {
129 /* New current message: update heap */
130 _mHeap
.replaceTop(&oldestUpstreamMsgIter
);
131 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
132 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
135 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
136 "port-name{}, heap-len={}",
137 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
139 } catch (const bt2::TryAgain
&) {
141 _mUpstreamMsgItersToReload
.push_back(&oldestUpstreamMsgIter
);
142 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
143 "port-name={}, heap-len={}, to-reload-len={}",
144 oldestUpstreamMsgIter
.portName(), _mHeap
.len(),
145 _mUpstreamMsgItersToReload
.size());
151 void MsgIter::_ensureFullHeap()
154 * Always remove from `_mUpstreamMsgItersToReload` when reload()
157 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
158 * then we don't need it anymore (remains alive in
159 * `_mUpstreamMsgIters`).
161 for (auto it
= _mUpstreamMsgItersToReload
.begin(); it
!= _mUpstreamMsgItersToReload
.end();
162 it
= _mUpstreamMsgItersToReload
.erase(it
)) {
163 auto& upstreamMsgIter
= **it
;
165 BT_CPPLOGD("Handling upstream message iterator to reload: "
166 "port-name={}, heap-len={}, to-reload-len={}",
167 upstreamMsgIter
.portName(), _mHeap
.len(), _mUpstreamMsgItersToReload
.size());
169 if (G_LIKELY(upstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::More
)) {
170 /* New current message: move to heap */
171 _mHeap
.insert(&upstreamMsgIter
);
172 BT_CPPLOGD("More messages available; "
173 "inserted upstream message iterator into heap from \"to reload\" set: "
174 "port-name={}, heap-len={}",
175 upstreamMsgIter
.portName(), _mHeap
.len());
177 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
179 upstreamMsgIter
.portName());
184 bool MsgIter::_canSeekBeginning()
187 * We can only seek our beginning if all our upstream message
188 * iterators also can.
190 return std::all_of(_mUpstreamMsgIters
.begin(), _mUpstreamMsgIters
.end(),
191 [](UpstreamMsgIter::UP
& upstreamMsgIter
) {
192 return upstreamMsgIter
->canSeekBeginning();
196 void MsgIter::_seekBeginning()
199 * The current approach is that this operation is either successful
200 * (all upstream message iterators seek) or not. If it's not, then
201 * we don't keep any state that some sought and some didn't: we'll
202 * restart the whole process when the user tries to seek again.
204 * The first step is to clear all the containers of upstream message
205 * iterator pointers so that we can process what's in
206 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
207 * if any seeking fails below, the downstream user is required to
208 * try the "seek beginning" operation again and only call
209 * bt_message_iterator_next() if it was successful.
211 * This means if the first four upstream message iterators seek, and
212 * then the fifth one throws `bt2::TryAgain`, then the next time
213 * this method executes, the first four upstream message iterators
214 * will seek again. That being said, it's such an unlikely scenario
215 * that the simplicity outweighs performance concerns here.
218 _mUpstreamMsgItersToReload
.clear();
220 /* Make each upstream message iterator seek */
221 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
222 /* This may throw! */
223 upstreamMsgIter
->seekBeginning();
227 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
228 * next call to _next() will deal with those.
230 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
231 _mUpstreamMsgItersToReload
.push_back(upstreamMsgIter
.get());
237 std::string
formatClkClsOrigin(const bt2::ClockOriginView clkClsOrigin
, const char * const prefix
,
238 const std::uint64_t graphMipVersion
)
240 if (graphMipVersion
== 0) {
241 return fmt::format("{}clock-class-origin-is-unix-epoch={}", prefix
,
242 clkClsOrigin
.isUnixEpoch());
244 return fmt::format("{0}clock-class-origin-ns={1}, {0}clock-class-origin-name={2}, "
245 "{0}clock-class-origin-uid={3}",
246 prefix
, clkClsOrigin
.nameSpace(), clkClsOrigin
.name(),
251 std::string
formatClkClsId(const bt2::ConstClockClass clkCls
, const char * const prefix
,
252 const std::uint64_t graphMipVersion
)
254 if (graphMipVersion
== 0) {
255 if (const auto uuid
= clkCls
.uuid()) {
256 return fmt::format("{}clock-class-uuid={}", prefix
, *uuid
);
258 return fmt::format("{}clock-class-uuid=(none)", prefix
);
261 return fmt::format("{0}clock-class-ns={1}, {0}clock-class-name={2}, {0}clock-class-uid={3}",
262 prefix
, clkCls
.nameSpace(), clkCls
.name(), clkCls
.uid());
266 std::string
formatClkCls(const bt2::ConstClockClass clkCls
, const char * const prefix
,
267 const std::uint64_t graphMipVersion
)
269 if (graphMipVersion
== 0) {
270 return fmt::format("{}clock-class-addr={}, {}clock-class-name={}, {}, {}", prefix
,
271 fmt::ptr(clkCls
.libObjPtr()), prefix
, clkCls
.name(),
272 formatClkClsId(clkCls
, prefix
, graphMipVersion
),
273 formatClkClsOrigin(clkCls
.origin(), prefix
, graphMipVersion
));
275 return fmt::format("{}clock-class-addr={}, {}, {}", prefix
, fmt::ptr(clkCls
.libObjPtr()),
276 formatClkClsId(clkCls
, prefix
, graphMipVersion
),
277 formatClkClsOrigin(clkCls
.origin(), prefix
, graphMipVersion
));
281 std::string
formatStreamCls(const bt2ccv::ClockCorrelationError
& error
,
282 const bool withTrailingComma
)
284 if (const auto streamCls
= error
.streamCls()) {
285 return fmt::format("stream-class-addr={}, stream-class-name=\"{}\", stream-class-id={}{}",
286 fmt::ptr(streamCls
->libObjPtr()), streamCls
->name(), streamCls
->id(),
287 withTrailingComma
? ", " : "");
289 return std::string
{};
295 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg
)
297 if (G_LIKELY(!msg
.isStreamBeginning() && !msg
.isMessageIteratorInactivity())) {
299 * We don't care about the other types: all the messages related
300 * to a given stream shared the same default clock class, if
306 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg
.type());
309 _mClkCorrValidator
.validate(msg
, this->_component()._graphMipVersion());
310 } catch (const bt2ccv::ClockCorrelationError
& error
) {
311 using Type
= bt2ccv::ClockCorrelationError::Type
;
313 const auto actualClkCls
= error
.actualClockCls();
314 const auto refClkCls
= error
.refClockCls();
315 const auto graphMipVersion
= this->_component()._graphMipVersion();
316 const auto formatExpClkClsOrigin
= [&] {
317 return formatClkClsOrigin(refClkCls
->origin(), "expected-", graphMipVersion
);
319 const auto clkCls
= [&] {
320 return formatClkClsId(*refClkCls
, "expected", graphMipVersion
);
322 const auto formatActClkCls
= [&] {
323 return formatClkCls(*actualClkCls
, "", graphMipVersion
);
325 const auto formatExpClkCls
= [&] {
326 return formatClkCls(*refClkCls
, "expected-", graphMipVersion
);
329 switch (error
.type()) {
330 case Type::ExpectingNoClockClassGotOne
:
331 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
, "Expecting no clock class, got one: {}{}",
332 formatStreamCls(error
, true), formatActClkCls());
334 case Type::ExpectingOriginKnownGotNoClockClass
:
335 if (graphMipVersion
== 0) {
336 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
337 bt2::Error
, "Expecting a clock class with a Unix epoch origin, got none: {}",
338 formatStreamCls(error
, false));
340 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
341 bt2::Error
, "Expecting a clock class with a known origin, got none: {}{}",
342 formatStreamCls(error
, true), formatExpClkClsOrigin());
345 case Type::ExpectingOriginKnownGotUnknownOrigin
:
346 if (graphMipVersion
== 0) {
347 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
349 "Expecting a clock class with a Unix epoch origin, got one with an unknown "
351 formatStreamCls(error
, true), formatActClkCls());
353 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
355 "Expecting a clock class with a known origin, got one with an unknown origin: "
357 formatStreamCls(error
, true), formatActClkCls(), formatExpClkClsOrigin());
360 case Type::ExpectingOriginKnownGotOtherOrigin
:
361 BT_ASSERT(graphMipVersion
> 0);
362 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
364 "Expecting a clock class with a known origin, got one with a wrong origin: {}{}, {}",
365 formatStreamCls(error
, true), formatActClkCls(), formatExpClkClsOrigin());
367 case Type::ExpectingOriginUnknownWithIdGotNoClockClass
:
368 if (graphMipVersion
== 0) {
369 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
371 "Expecting a clock class with an unknown origin and a specific UUID, got none: {}{}",
372 formatStreamCls(error
, true), clkCls());
374 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
376 "Expecting a clock class with an unknown origin and a specific identity, got none: {}{}",
377 formatStreamCls(error
, true), clkCls());
380 case Type::ExpectingOriginUnknownWithIdGotKnownOrigin
:
381 if (graphMipVersion
== 0) {
382 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
384 "Expecting a clock class with an unknown origin and a specific UUID, got one "
385 "with a Unix epoch origin: {}{}, {}",
386 formatStreamCls(error
, true), formatActClkCls(), clkCls());
388 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
390 "Expecting a clock class with an unknown origin and a specific identity, got one "
391 "with a known origin: {}{}, {}",
392 formatStreamCls(error
, true), formatActClkCls(), clkCls());
395 case Type::ExpectingOriginUnknownWithIdGotWithoutId
:
396 if (graphMipVersion
== 0) {
397 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
399 "Expecting a clock class with an unknown origin and a specific UUID, got one "
400 "without a UUID: {}{}, {}",
401 formatStreamCls(error
, true), formatActClkCls(), clkCls());
403 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
405 "Expecting a clock class with an unknown origin and a specific identity, got one "
406 "without identity: {}{}, {}",
407 formatStreamCls(error
, true), formatActClkCls(), clkCls());
410 case Type::ExpectingOriginUnknownWithIdGotOtherId
:
411 if (graphMipVersion
== 0) {
412 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
414 "Expecting a clock class with an unknown origin and a specific UUID, got one with "
415 "a different UUID: {}{}, {}",
416 formatStreamCls(error
, true), formatActClkCls(), clkCls());
418 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
420 "Expecting a clock class with an unknown origin and a specific identity, got one with "
421 "a different identity: {}{}, {}",
422 formatStreamCls(error
, true), formatActClkCls(), clkCls());
425 case Type::ExpectingOriginUnknownWithoutIdGotNoClockClass
:
426 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
, "Expecting a clock class, got none: {}{}",
427 formatStreamCls(error
, true), formatExpClkCls());
429 case Type::ExpectingOriginUnknownWithoutIdGotOtherClockClass
:
430 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
, "Unexpected clock class: {}{}, {}",
431 formatStreamCls(error
, true), formatActClkCls(),
437 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger
& logger
,
438 const std::uint64_t graphMipVersion
) :
440 _mMsgComparator
{graphMipVersion
}
444 bool MsgIter::_HeapComparator::operator()(
445 const UpstreamMsgIter
* const upstreamMsgIterA
,
446 const UpstreamMsgIter
* const upstreamMsgIterB
) const noexcept
448 /* The two messages to compare */
449 const auto msgA
= upstreamMsgIterA
->msg();
450 const auto msgB
= upstreamMsgIterB
->msg();
451 auto& msgTsA
= upstreamMsgIterA
->msgTs();
452 auto& msgTsB
= upstreamMsgIterB
->msgTs();
454 if (_mLogger
.wouldLogT()) {
455 BT_CPPLOGT("Comparing two messages: "
456 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
457 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
458 upstreamMsgIterA
->portName(), msgA
.type(), optMsgTsStr(msgTsA
),
459 upstreamMsgIterB
->portName(), msgB
.type(), optMsgTsStr(msgTsB
));
463 * Try to compare using timestamps.
465 * If both timestamps are set and their values are different, then
466 * use this to establish the ordering of the two messages.
468 * If one timestamp is set, but not the other, the latter always
469 * wins. This is because, for a given upstream message iterator, we
470 * need to consume all the messages having no timestamp so that we
471 * can reach a message with a timestamp to compare it.
473 * Otherwise, we'll fall back to using
474 * common_muxing_compare_messages().
476 if (G_LIKELY(msgTsA
&& msgTsB
)) {
477 if (*msgTsA
< *msgTsB
) {
479 * Return `true` because `_mHeap.top()` provides the
480 * "greatest" element. For us, the "greatest" message is
481 * the oldest one, that is, the one having the smallest
484 BT_CPPLOGT("Timestamp of message A is less than timestamp of message B: oldest=A");
486 } else if (*msgTsA
> *msgTsB
) {
487 BT_CPPLOGT("Timestamp of message A is greater than timestamp of message B: oldest=B");
490 } else if (msgTsA
&& !msgTsB
) {
491 BT_CPPLOGT("Message A has a timestamp, but message B has none: oldest=B");
493 } else if (!msgTsA
&& msgTsB
) {
494 BT_CPPLOGT("Message B has a timestamp, but message A has none: oldest=A");
499 * Comparison failed using timestamps: determine an ordering using
500 * arbitrary properties, but in a deterministic way.
502 * common_muxing_compare_messages() returns less than 0 if the first
503 * message is considered older than the second, which corresponds to
504 * this comparator returning `true`.
506 const auto res
= _mMsgComparator
.compare(msgA
, msgB
) < 0;
508 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
513 } /* namespace bt2mux */