2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
11 #include <babeltrace2/babeltrace.h>
13 #include "cpp-common/bt2c/fmt.hpp"
14 #include "cpp-common/bt2s/make-unique.hpp"
15 #include "cpp-common/vendor/fmt/format.h"
17 #include "plugins/common/muxing/muxing.h"
20 #include "msg-iter.hpp"
24 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter
,
25 const bt2::SelfMessageIteratorConfiguration cfg
, bt2::SelfComponentOutputPort
) :
26 bt2::UserMessageIterator
<MsgIter
, Comp
> {selfMsgIter
, "MSG-ITER"},
27 _mHeap
{_HeapComparator
{_mLogger
}}
30 * Create one upstream message iterator for each connected
33 auto canSeekForward
= true;
35 for (const auto inputPort
: this->_component()._inputPorts()) {
36 if (!inputPort
.isConnected()) {
37 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort
.name());
42 * Create new upstream message iterator and immediately make it
43 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
44 * deal with it when downstream calls next()).
46 auto upstreamMsgIter
= bt2s::make_unique
<UpstreamMsgIter
>(
47 this->_createMessageIterator(inputPort
), inputPort
.name(), _mLogger
);
49 canSeekForward
= canSeekForward
&& upstreamMsgIter
->canSeekForward();
50 _mUpstreamMsgItersToReload
.emplace_back(upstreamMsgIter
.get());
51 _mUpstreamMsgIters
.push_back(std::move(upstreamMsgIter
));
54 /* Set the "can seek forward" configuration */
55 cfg
.canSeekForward(canSeekForward
);
60 std::string
optMsgTsStr(const bt2s::optional
<std::int64_t>& ts
)
63 return fmt::to_string(*ts
);
71 void MsgIter::_next(bt2::ConstMessageArray
& msgs
)
73 /* Make sure all upstream message iterators are part of the heap */
74 this->_ensureFullHeap();
76 while (msgs
.length() < msgs
.capacity()) {
78 if (G_UNLIKELY(_mHeap
.isEmpty())) {
79 /* No more upstream messages! */
84 * Retrieve the upstream message iterator having the oldest message.
86 auto& oldestUpstreamMsgIter
= *_mHeap
.top();
88 /* Validate the clock class of the oldest message */
89 this->_validateMsgClkCls(oldestUpstreamMsgIter
.msg());
91 /* Append the oldest message and discard it */
92 msgs
.append(oldestUpstreamMsgIter
.msg().shared());
94 if (_mLogger
.wouldLogD()) {
95 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
96 oldestUpstreamMsgIter
.portName(),
97 optMsgTsStr(oldestUpstreamMsgIter
.msgTs()));
100 oldestUpstreamMsgIter
.discard();
103 * Immediately try to reload `oldestUpstreamMsgIter`.
105 * The possible outcomes are:
107 * There's an available message:
108 * Call `_mHeap.replaceTop()` to bring
109 * `oldestUpstreamMsgIter` back to the heap, performing a
110 * single heap rebalance.
112 * There isn't an available message (ended):
113 * Remove `oldestUpstreamMsgIter` from the heap.
115 * `bt2::TryAgain` is thrown:
116 * Remove `oldestUpstreamMsgIter` from the heap.
118 * Add `oldestUpstreamMsgIter` to the set of upstream
119 * message iterators to reload. The next call to _next()
120 * will move it to the heap again (if not ended) after
121 * having successfully called reload().
124 "Trying to reload upstream message iterator having the oldest message: port-name={}",
125 oldestUpstreamMsgIter
.portName());
128 if (G_LIKELY(oldestUpstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::More
)) {
129 /* New current message: update heap */
130 _mHeap
.replaceTop(&oldestUpstreamMsgIter
);
131 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
132 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
135 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
136 "port-name{}, heap-len={}",
137 oldestUpstreamMsgIter
.portName(), _mHeap
.len());
139 } catch (const bt2::TryAgain
&) {
141 _mUpstreamMsgItersToReload
.push_back(&oldestUpstreamMsgIter
);
142 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
143 "port-name={}, heap-len={}, to-reload-len={}",
144 oldestUpstreamMsgIter
.portName(), _mHeap
.len(),
145 _mUpstreamMsgItersToReload
.size());
151 void MsgIter::_ensureFullHeap()
154 * Always remove from `_mUpstreamMsgItersToReload` when reload()
157 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
158 * then we don't need it anymore (remains alive in
159 * `_mUpstreamMsgIters`).
161 for (auto it
= _mUpstreamMsgItersToReload
.begin(); it
!= _mUpstreamMsgItersToReload
.end();
162 it
= _mUpstreamMsgItersToReload
.erase(it
)) {
163 auto& upstreamMsgIter
= **it
;
165 BT_CPPLOGD("Handling upstream message iterator to reload: "
166 "port-name={}, heap-len={}, to-reload-len={}",
167 upstreamMsgIter
.portName(), _mHeap
.len(), _mUpstreamMsgItersToReload
.size());
169 if (G_LIKELY(upstreamMsgIter
.reload() == UpstreamMsgIter::ReloadStatus::More
)) {
170 /* New current message: move to heap */
171 _mHeap
.insert(&upstreamMsgIter
);
172 BT_CPPLOGD("More messages available; "
173 "inserted upstream message iterator into heap from \"to reload\" set: "
174 "port-name={}, heap-len={}",
175 upstreamMsgIter
.portName(), _mHeap
.len());
177 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
179 upstreamMsgIter
.portName());
184 bool MsgIter::_canSeekBeginning()
187 * We can only seek our beginning if all our upstream message
188 * iterators also can.
190 return std::all_of(_mUpstreamMsgIters
.begin(), _mUpstreamMsgIters
.end(),
191 [](UpstreamMsgIter::UP
& upstreamMsgIter
) {
192 return upstreamMsgIter
->canSeekBeginning();
196 void MsgIter::_seekBeginning()
199 * The current approach is that this operation is either successful
200 * (all upstream message iterators seek) or not. If it's not, then
201 * we don't keep any state that some sought and some didn't: we'll
202 * restart the whole process when the user tries to seek again.
204 * The first step is to clear all the containers of upstream message
205 * iterator pointers so that we can process what's in
206 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
207 * if any seeking fails below, the downstream user is required to
208 * try the "seek beginning" operation again and only call
209 * bt_message_iterator_next() if it was successful.
211 * This means if the first four upstream message iterators seek, and
212 * then the fifth one throws `bt2::TryAgain`, then the next time
213 * this method executes, the first four upstream message iterators
214 * will seek again. That being said, it's such an unlikely scenario
215 * that the simplicity outweighs performance concerns here.
218 _mUpstreamMsgItersToReload
.clear();
220 /* Make each upstream message iterator seek */
221 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
222 /* This may throw! */
223 upstreamMsgIter
->seekBeginning();
227 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
228 * next call to _next() will deal with those.
230 for (auto& upstreamMsgIter
: _mUpstreamMsgIters
) {
231 _mUpstreamMsgItersToReload
.push_back(upstreamMsgIter
.get());
235 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg
)
237 if (G_LIKELY(!msg
.isStreamBeginning() && !msg
.isMessageIteratorInactivity())) {
239 * We don't care about the other types: all the messages related
240 * to a given stream shared the same default clock class, if
246 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg
.type());
249 _mClkCorrValidator
.validate(msg
);
250 } catch (const bt2ccv::ClockCorrelationError
& error
) {
251 using Type
= bt2ccv::ClockCorrelationError::Type
;
253 const auto actualClockCls
= error
.actualClockCls();
255 switch (error
.type()) {
256 case Type::ExpectingNoClockClassGotOne
:
257 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
258 "Expecting no clock class, but got one: "
259 "clock-class-addr={}, clock-class-name={}",
260 fmt::ptr(actualClockCls
->libObjPtr()),
261 actualClockCls
->name());
263 case Type::ExpectingOriginUnixGotNone
:
264 case Type::ExpectingOriginUuidGotNone
:
265 case Type::ExpectingOriginNoUuidGotNone
:
267 const auto streamCls
= *error
.streamCls();
269 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
270 "Expecting a clock class, but got none: "
271 "stream-class-addr={}, stream-class-name=\"{}\", "
272 "stream-class-id={}",
273 fmt::ptr(streamCls
.libObjPtr()), streamCls
.name(),
277 case Type::ExpectingOriginUnixGotOther
:
278 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
279 "Expecting a clock class having a Unix epoch origin, "
280 "but got one not having a Unix epoch origin: "
281 "clock-class-addr={}, clock-class-name={}",
282 fmt::ptr(actualClockCls
->libObjPtr()),
283 actualClockCls
->name());
285 case Type::ExpectingOriginUuidGotUnix
:
286 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
288 "Expecting a clock class not having a Unix epoch origin, "
289 "but got one having a Unix epoch origin: "
290 "clock-class-addr={}, clock-class-name={}",
291 fmt::ptr(actualClockCls
->libObjPtr()), actualClockCls
->name());
293 case Type::ExpectingOriginUuidGotNoUuid
:
294 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
296 "Expecting a clock class with a UUID, but got one without a UUID: "
297 "clock-class-addr={}, clock-class-name={}",
298 fmt::ptr(actualClockCls
->libObjPtr()), actualClockCls
->name());
300 case Type::ExpectingOriginUuidGotOtherUuid
:
301 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error
,
302 "Expecting a clock class with a specific UUID, "
303 "but got one with a different UUID: "
304 "clock-class-addr={}, clock-class-name={}, "
305 "expected-uuid=\"{}\", uuid=\"{}\"",
306 fmt::ptr(actualClockCls
->libObjPtr()),
307 actualClockCls
->name(), *error
.expectedUuid(),
308 *actualClockCls
->uuid());
310 case Type::ExpectingOriginNoUuidGotOther
:
312 const auto expectedClockCls
= error
.expectedClockCls();
314 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
316 "Unexpected clock class: "
317 "expected-clock-class-addr={}, expected-clock-class-name={}, "
318 "actual-clock-class-addr={}, actual-clock-class-name={}",
319 fmt::ptr(expectedClockCls
->libObjPtr()), expectedClockCls
->name(),
320 fmt::ptr(actualClockCls
->libObjPtr()), actualClockCls
->name());
326 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger
& logger
) : _mLogger
{logger
}
330 bool MsgIter::_HeapComparator::operator()(
331 const UpstreamMsgIter
* const upstreamMsgIterA
,
332 const UpstreamMsgIter
* const upstreamMsgIterB
) const noexcept
334 /* The two messages to compare */
335 const auto msgA
= upstreamMsgIterA
->msg();
336 const auto msgB
= upstreamMsgIterB
->msg();
337 auto& msgTsA
= upstreamMsgIterA
->msgTs();
338 auto& msgTsB
= upstreamMsgIterB
->msgTs();
340 if (_mLogger
.wouldLogT()) {
341 BT_CPPLOGT("Comparing two messages: "
342 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
343 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
344 upstreamMsgIterA
->portName(), msgA
.type(), optMsgTsStr(msgTsA
),
345 upstreamMsgIterB
->portName(), msgB
.type(), optMsgTsStr(msgTsB
));
349 * Try to compare using timestamps.
351 * If both timestamps are set and their values are different, then
352 * use this to establish the ordering of the two messages.
354 * If one timestamp is set, but not the other, the latter always
355 * wins. This is because, for a given upstream message iterator, we
356 * need to consume all the messages having no timestamp so that we
357 * can reach a message with a timestamp to compare it.
359 * Otherwise, we'll fall back to using
360 * common_muxing_compare_messages().
362 if (G_LIKELY(msgTsA
&& msgTsB
)) {
363 if (*msgTsA
< *msgTsB
) {
365 * Return `true` because `_mHeap.top()` provides the
366 * "greatest" element. For us, the "greatest" message is
367 * the oldest one, that is, the one having the smallest
370 BT_CPPLOGT("Timestamp of message A is less than timestamp of message B: oldest=A");
372 } else if (*msgTsA
> *msgTsB
) {
373 BT_CPPLOGT("Timestamp of message A is greater than timestamp of message B: oldest=B");
376 } else if (msgTsA
&& !msgTsB
) {
377 BT_CPPLOGT("Message A has a timestamp, but message B has none: oldest=B");
379 } else if (!msgTsA
&& msgTsB
) {
380 BT_CPPLOGT("Message B has a timestamp, but message A has none: oldest=A");
385 * Comparison failed using timestamps: determine an ordering using
386 * arbitrary properties, but in a deterministic way.
388 * common_muxing_compare_messages() returns less than 0 if the first
389 * message is considered older than the second, which corresponds to
390 * this comparator returning `true`.
392 const auto res
= common_muxing_compare_messages(msgA
.libObjPtr(), msgB
.libObjPtr()) < 0;
394 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
399 } /* namespace bt2mux */