2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #include "common/common.h"
14 #include "cpp-common/bt2c/call.hpp"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/bt2s/make-unique.hpp"
17 #include "cpp-common/vendor/fmt/format.h"
19 #include "plugins/common/muxing/muxing.h"
22 #include "msg-iter.hpp"
26 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter
,
27 const bt2::SelfMessageIteratorConfiguration cfg
, bt2::SelfComponentOutputPort
) :
28 bt2::UserMessageIterator
<MsgIter
, Comp
> {selfMsgIter
, "MSG-ITER"},
29 _mHeap
{_HeapComparator
{_mLogger
}}
32 * Create one upstream message iterator for each connected
35 auto canSeekForward
= true;
37 for (const auto inputPort
: this->_component()._inputPorts()) {
38 if (!inputPort
.isConnected()) {
39 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort
.name());
44 * Create new upstream message iterator and immediately make it
45 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
46 * deal with it when downstream calls next()).
48 auto upstreamMsgIter
= bt2s::make_unique
<UpstreamMsgIter
>(
49 this->_createMessageIterator(inputPort
), inputPort
.name(), _mLogger
);
51 canSeekForward
= canSeekForward
&& upstreamMsgIter
->canSeekForward();
52 _mUpstreamMsgItersToReload
.emplace_back(upstreamMsgIter
.get());
53 _mUpstreamMsgIters
.push_back(std::move(upstreamMsgIter
));
56 /* Set the "can seek forward" configuration */
57 cfg
.canSeekForward(canSeekForward
);
62 std::string
optMsgTsStr(const bt2s::optional
<std::int64_t>& ts
)
65 return fmt::to_string(*ts
);
73 void MsgIter::_next(bt2::ConstMessageArray
& msgs
)
75 /* Make sure all upstream message iterators are part of the heap */
76 this->_ensureFullHeap();
78 while (msgs
.length() < msgs
.capacity()) {
80 if (G_UNLIKELY(_mHeap
.isEmpty())) {
81 /* No more upstream messages! */
86 * Retrieve the upstream message iterator having the oldest message.
88 auto& oldestUpstreamMsgIter
= *_mHeap
.top();
90 /* Validate the clock class of the oldest message */
91 this->_validateMsgClkCls(oldestUpstreamMsgIter
.msg());
93 /* Append the oldest message and discard it */
94 msgs
.append(oldestUpstreamMsgIter
.msg().shared());
96 if (_mLogger
.wouldLogD()) {
97 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
98 oldestUpstreamMsgIter
.portName(),
99 optMsgTsStr(oldestUpstreamMsgIter
.msgTs()));
102 oldestUpstreamMsgIter
.discard();
105 * Immediately try to reload `oldestUpstreamMsgIter`.
107 * The possible outcomes are:
109 * There's an available message:
110 * Call `_mHeap.replaceTop()` to bring
111 * `oldestUpstreamMsgIter` back to the heap, performing a
112 * single heap rebalance.
114 * There isn't an available message (ended):
115 * Remove `oldestUpstreamMsgIter` from the heap.
117 * `bt2::TryAgain` is thrown:
118 * Remove `oldestUpstreamMsgIter` from the heap.
120 * Add `oldestUpstreamMsgIter` to the set of upstream
121 * message iterators to reload. The next call to _next()
122 * will move it to the heap again (if not ended) after
123 * having successfully called reload().
126 "Trying to reload upstream message iterator having the oldest message: port-name={}",
127 oldestUpstreamMsgIter
.portName());
130 if (G_LIKELY(oldestUpstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
131 /* New current message: update heap */
132 _mHeap
.replaceTop(&oldestUpstreamMsgIter
);
133 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
134 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
137 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
138 "port-name{}, heap-len={}",
139 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
141 } catch (const bt2::TryAgain
&) {
143 _mUpstreamMsgItersToReload
.push_back(&oldestUpstreamMsgIter
);
144 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
145 "port-name={}, heap-len={}, to-reload-len={}",
146 oldestUpstreamMsgIter
.portName(), _mHeap
.len(),
147 _mUpstreamMsgItersToReload
.size());
153 void MsgIter::_ensureFullHeap()
156 * Always remove from `_mUpstreamMsgItersToReload` when reload()
159 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
160 * then we don't need it anymore (remains alive in
161 * `_mUpstreamMsgIters`).
163 for (auto it
= _mUpstreamMsgItersToReload
.begin(); it
!= _mUpstreamMsgItersToReload
.end();
164 it
= _mUpstreamMsgItersToReload
.erase(it
)) {
165 auto& upstreamMsgIter
= **it
;
167 BT_CPPLOGD("Handling upstream message iterator to reload: "
168 "port-name={}, heap-len={}, to-reload-len={}",
169 upstreamMsgIter
.portName(), _mHeap
.len(), _mUpstreamMsgItersToReload
.size());
171 if (G_LIKELY(upstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::MORE
)) {
172 /* New current message: move to heap */
173 _mHeap
.insert(&upstreamMsgIter
);
174 BT_CPPLOGD("More messages available; "
175 "inserted upstream message iterator into heap from \"to reload\" set: "
176 "port-name={}, heap-len={}",
177 upstreamMsgIter
.portName(), _mHeap
.len());
179 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
181 upstreamMsgIter
.portName());
186 bool MsgIter::_canSeekBeginning()
189 * We can only seek our beginning if all our upstream message
190 * iterators also can.
192 return std::all_of(_mUpstreamMsgIters
.begin(), _mUpstreamMsgIters
.end(),
193 [](UpstreamMsgIter::UP
& upstreamMsgIter
) {
194 return upstreamMsgIter
->canSeekBeginning();
198 void MsgIter::_seekBeginning()
201 * The current approach is that this operation is either successful
202 * (all upstream message iterators seek) or not. If it's not, then
203 * we don't keep any state that some sought and some didn't: we'll
204 * restart the whole process when the user tries to seek again.
206 * The first step is to clear all the containers of upstream message
207 * iterator pointers so that we can process what's in
208 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
209 * if any seeking fails below, the downstream user is required to
210 * try the "seek beginning" operation again and only call
211 * bt_message_iterator_next() if it was successful.
213 * This means if the first four upstream message iterators seek, and
214 * then the fifth one throws `bt2::TryAgain`, then the next time
215 * this method executes, the first four upstream message iterators
216 * will seek again. That being said, it's such an unlikely scenario
217 * that the simplicity outweighs performance concerns here.
220 _mUpstreamMsgItersToReload
.clear();
222 /* Make each upstream message iterator seek */
223 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
224 /* This may throw! */
225 upstreamMsgIter
->seekBeginning();
229 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
230 * next call to _next() will deal with those.
232 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
233 _mUpstreamMsgItersToReload
.push_back(upstreamMsgIter
.get());
239 std::string
optLogStr(const char * const str
) noexcept
241 return str
? fmt::format("\"{}\"", str
) : "(none)";
246 void MsgIter::_setClkClsExpectation(
247 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) noexcept
249 BT_ASSERT_DBG(_mClkClsExpectation
== _ClkClsExpectation::ANY
);
251 /* No initial clock class: also expect none afterwards */
253 _mClkClsExpectation
= _ClkClsExpectation::NONE
;
258 * This is the first clock class that this message iterator
259 * encounters. Its properties determine what to expect for the whole
260 * lifetime of the iterator.
262 if (clkCls
->originIsUnixEpoch()) {
263 /* Expect clock classes having a Unix epoch origin*/
264 _mClkClsExpectation
= _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
;
266 if (clkCls
->uuid()) {
268 * Expect clock classes not having a Unix epoch origin and
269 * with a specific UUID.
271 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
;
272 _mExpectedClkClsUuid
= *clkCls
->uuid();
275 * Expect clock classes not having a Unix epoch origin and
278 _mClkClsExpectation
= _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
;
279 _mExpectedClkCls
= clkCls
->shared();
284 void MsgIter::_makeSureClkClsIsExpected(
285 const bt2::ConstMessage msg
,
286 const bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> clkCls
) const
288 BT_ASSERT_DBG(_mClkClsExpectation
!= _ClkClsExpectation::ANY
);
291 if (_mClkClsExpectation
!= _ClkClsExpectation::NONE
) {
293 * `msg` is a stream beginning message because a message
294 * iterator inactivity message always has a clock class.
296 const auto streamCls
= msg
.asStreamBeginning().stream().cls();
298 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
299 "Expecting a clock class, but got none: "
300 "stream-class-addr={}, stream-class-name=\"{}\", "
301 "stream-class-id={}",
302 static_cast<const void *>(streamCls
.libObjPtr()),
303 optLogStr(streamCls
.name()), streamCls
.id());
309 const auto clkClsAddr
= static_cast<const void *>(clkCls
->libObjPtr());
311 switch (_mClkClsExpectation
) {
312 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH
:
313 if (!clkCls
->originIsUnixEpoch()) {
314 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
315 "Expecting a clock class having a Unix epoch origin, "
316 "but got one not having a Unix epoch origin: "
317 "clock-class-addr={}, clock-class-name={}",
318 clkClsAddr
, optLogStr(clkCls
->name()));
322 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID
:
323 BT_ASSERT_DBG(!_mExpectedClkClsUuid
);
324 BT_ASSERT_DBG(_mExpectedClkCls
);
326 if (clkCls
->libObjPtr() != _mExpectedClkCls
->libObjPtr()) {
327 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
329 "Unexpected clock class: "
330 "expected-clock-class-addr={}, expected-clock-class-name={}, "
331 "actual-clock-class-addr={}, actual-clock-class-name={}",
332 fmt::ptr(_mExpectedClkCls
->libObjPtr()), optLogStr(_mExpectedClkCls
->name()),
333 clkClsAddr
, optLogStr(clkCls
->name()));
337 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID
:
338 BT_ASSERT_DBG(_mExpectedClkClsUuid
);
339 BT_ASSERT_DBG(!_mExpectedClkCls
);
341 if (clkCls
->originIsUnixEpoch()) {
342 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
344 "Expecting a clock class not having a Unix epoch origin, "
345 "but got one having a Unix epoch origin: "
346 "clock-class-addr={}, clock-class-name={}",
347 clkClsAddr
, optLogStr(clkCls
->name()));
350 if (!clkCls
->uuid()) {
351 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
353 "Expecting a clock class with a UUID, but got one without a UUID: "
354 "clock-class-addr={}, clock-class-name={}",
355 clkClsAddr
, optLogStr(clkCls
->name()));
358 if (*clkCls
->uuid() != bt2c::UuidView
{*_mExpectedClkClsUuid
}) {
359 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
360 "Expecting a clock class with a specific UUID, "
361 "but got one with a different UUID: "
362 "clock-class-addr={}, clock-class-name={}, "
363 "expected-uuid=\"{}\", uuid=\"{}\"",
364 clkClsAddr
, optLogStr(clkCls
->name()),
365 _mExpectedClkClsUuid
->str(), clkCls
->uuid()->str());
369 case _ClkClsExpectation::NONE
:
370 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
371 "Expecting no clock class, but got one: "
372 "clock-class-addr={}, clock-class-name={}",
373 clkClsAddr
, optLogStr(clkCls
->name()));
380 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg
)
382 if (G_LIKELY(!msg
.isStreamBeginning() && !msg
.isMessageIteratorInactivity())) {
384 * We don't care about the other types: all the messages related
385 * to a given stream shared the same default clock class, if
391 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg
.type());
393 /* Get the clock class, if any, of `msg` */
394 const auto clkCls
= bt2c::call([msg
]() -> bt2::OptionalBorrowedObject
<bt2::ConstClockClass
> {
395 if (msg
.isStreamBeginning()) {
396 return msg
.asStreamBeginning().streamClassDefaultClockClass();
398 BT_ASSERT(msg
.isMessageIteratorInactivity());
399 return msg
.asMessageIteratorInactivity().clockSnapshot().clockClass();
403 /* Set the expectation or check it */
404 if (_mClkClsExpectation
== _ClkClsExpectation::ANY
) {
405 /* First message: set the expectation */
406 this->_setClkClsExpectation(clkCls
);
408 /* Make sure clock class is expected */
409 this->_makeSureClkClsIsExpected(msg
, clkCls
);
413 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger
& logger
) : _mLogger
{logger
}
417 bool MsgIter::_HeapComparator::operator()(
418 const UpstreamMsgIter
* const upstreamMsgIterA
,
419 const UpstreamMsgIter
* const upstreamMsgIterB
) const noexcept
421 /* The two messages to compare */
422 const auto msgA
= upstreamMsgIterA
->msg();
423 const auto msgB
= upstreamMsgIterB
->msg();
424 auto& msgTsA
= upstreamMsgIterA
->msgTs();
425 auto& msgTsB
= upstreamMsgIterB
->msgTs();
427 if (_mLogger
.wouldLogT()) {
428 BT_CPPLOGT("Comparing two messages: "
429 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
430 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
431 upstreamMsgIterA
->portName(), msgA
.type(), optMsgTsStr(msgTsA
),
432 upstreamMsgIterB
->portName(), msgB
.type(), optMsgTsStr(msgTsB
));
436 * Try to compare using timestamps.
438 * If both timestamps are set and their values are different, then
439 * use this to establish the ordering of the two messages.
441 * If one timestamp is set, but not the other, the latter always
442 * wins. This is because, for a given upstream message iterator, we
443 * need to consume all the messages having no timestamp so that we
444 * can reach a message with a timestamp to compare it.
446 * Otherwise, we'll fall back to using
447 * common_muxing_compare_messages().
449 if (G_LIKELY(msgTsA
&& msgTsB
)) {
450 if (*msgTsA
< *msgTsB
) {
452 * Return `true` because `_mHeap.top()` provides the
453 * "greatest" element. For us, the "greatest" message is
454 * the oldest one, that is, the one having the smallest
457 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
459 } else if (*msgTsA
> *msgTsB
) {
461 "Timestamp of message A is greater than timestamp of message B: oldest=B");
464 } else if (msgTsA
&& !msgTsB
) {
465 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
467 } else if (!msgTsA
&& msgTsB
) {
468 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
473 * Comparison failed using timestamps: determine an ordering using
474 * arbitrary properties, but in a deterministic way.
476 * common_muxing_compare_messages() returns less than 0 if the first
477 * message is considered older than the second, which corresponds to
478 * this comparator returning `true`.
480 const auto res
= common_muxing_compare_messages(msgA
.libObjPtr(), msgB
.libObjPtr()) < 0;
482 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
487 } /* namespace bt2mux */