abfd61042e17732063e1dd885a0f210f8dda39f6
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #include <algorithm>
8
9 #include <glib.h>
10
11 #include <babeltrace2/babeltrace.h>
12
13 #include "cpp-common/bt2c/fmt.hpp" /* IWYU pragma: keep */
14 #include "cpp-common/bt2s/make-unique.hpp"
15 #include "cpp-common/vendor/fmt/format.h"
16
17 #include "plugins/common/muxing/muxing.hpp"
18
19 #include "comp.hpp"
20 #include "msg-iter.hpp"
21
22 namespace bt2mux {
23
24 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
25 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
26 bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
27 _mHeap {_HeapComparator {_mLogger, selfMsgIter.component().graphMipVersion()}}
28 {
29 /*
30 * Create one upstream message iterator for each connected
31 * input port.
32 */
33 auto canSeekForward = true;
34
35 for (const auto inputPort : this->_component()._inputPorts()) {
36 if (!inputPort.isConnected()) {
37 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
38 continue;
39 }
40
41 /*
42 * Create new upstream message iterator and immediately make it
43 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
44 * deal with it when downstream calls next()).
45 */
46 auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
47 this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
48
49 canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
50 _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
51 _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
52 }
53
54 /* Set the "can seek forward" configuration */
55 cfg.canSeekForward(canSeekForward);
56 }
57
58 namespace {
59
60 std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
61 {
62 if (ts) {
63 return fmt::to_string(*ts);
64 }
65
66 return "none";
67 }
68
69 } /* namespace */
70
71 void MsgIter::_next(bt2::ConstMessageArray& msgs)
72 {
73 /* Make sure all upstream message iterators are part of the heap */
74 this->_ensureFullHeap();
75
76 while (msgs.length() < msgs.capacity()) {
77 /* Empty heap? */
78 if (G_UNLIKELY(_mHeap.isEmpty())) {
79 /* No more upstream messages! */
80 return;
81 }
82
83 /*
84 * Retrieve the upstream message iterator having the oldest message.
85 */
86 auto& oldestUpstreamMsgIter = *_mHeap.top();
87
88 /* Validate the clock class of the oldest message */
89 this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
90
91 /* Append the oldest message and discard it */
92 msgs.append(oldestUpstreamMsgIter.msg().shared());
93
94 if (_mLogger.wouldLogD()) {
95 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
96 oldestUpstreamMsgIter.portName(),
97 optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
98 }
99
100 oldestUpstreamMsgIter.discard();
101
102 /*
103 * Immediately try to reload `oldestUpstreamMsgIter`.
104 *
105 * The possible outcomes are:
106 *
107 * There's an available message:
108 * Call `_mHeap.replaceTop()` to bring
109 * `oldestUpstreamMsgIter` back to the heap, performing a
110 * single heap rebalance.
111 *
112 * There isn't an available message (ended):
113 * Remove `oldestUpstreamMsgIter` from the heap.
114 *
115 * `bt2::TryAgain` is thrown:
116 * Remove `oldestUpstreamMsgIter` from the heap.
117 *
118 * Add `oldestUpstreamMsgIter` to the set of upstream
119 * message iterators to reload. The next call to _next()
120 * will move it to the heap again (if not ended) after
121 * having successfully called reload().
122 */
123 BT_CPPLOGD(
124 "Trying to reload upstream message iterator having the oldest message: port-name={}",
125 oldestUpstreamMsgIter.portName());
126
127 try {
128 if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::More)) {
129 /* New current message: update heap */
130 _mHeap.replaceTop(&oldestUpstreamMsgIter);
131 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
132 oldestUpstreamMsgIter.portName(), _mHeap.len());
133 } else {
134 _mHeap.removeTop();
135 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
136 "port-name{}, heap-len={}",
137 oldestUpstreamMsgIter.portName(), _mHeap.len());
138 }
139 } catch (const bt2::TryAgain&) {
140 _mHeap.removeTop();
141 _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
142 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
143 "port-name={}, heap-len={}, to-reload-len={}",
144 oldestUpstreamMsgIter.portName(), _mHeap.len(),
145 _mUpstreamMsgItersToReload.size());
146 throw;
147 }
148 }
149 }
150
151 void MsgIter::_ensureFullHeap()
152 {
153 /*
154 * Always remove from `_mUpstreamMsgItersToReload` when reload()
155 * doesn't throw.
156 *
157 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
158 * then we don't need it anymore (remains alive in
159 * `_mUpstreamMsgIters`).
160 */
161 for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
162 it = _mUpstreamMsgItersToReload.erase(it)) {
163 auto& upstreamMsgIter = **it;
164
165 BT_CPPLOGD("Handling upstream message iterator to reload: "
166 "port-name={}, heap-len={}, to-reload-len={}",
167 upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
168
169 if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::More)) {
170 /* New current message: move to heap */
171 _mHeap.insert(&upstreamMsgIter);
172 BT_CPPLOGD("More messages available; "
173 "inserted upstream message iterator into heap from \"to reload\" set: "
174 "port-name={}, heap-len={}",
175 upstreamMsgIter.portName(), _mHeap.len());
176 } else {
177 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
178 "port-name={}",
179 upstreamMsgIter.portName());
180 }
181 }
182 }
183
184 bool MsgIter::_canSeekBeginning()
185 {
186 /*
187 * We can only seek our beginning if all our upstream message
188 * iterators also can.
189 */
190 return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
191 [](UpstreamMsgIter::UP& upstreamMsgIter) {
192 return upstreamMsgIter->canSeekBeginning();
193 });
194 }
195
196 void MsgIter::_seekBeginning()
197 {
198 /*
199 * The current approach is that this operation is either successful
200 * (all upstream message iterators seek) or not. If it's not, then
201 * we don't keep any state that some sought and some didn't: we'll
202 * restart the whole process when the user tries to seek again.
203 *
204 * The first step is to clear all the containers of upstream message
205 * iterator pointers so that we can process what's in
206 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
207 * if any seeking fails below, the downstream user is required to
208 * try the "seek beginning" operation again and only call
209 * bt_message_iterator_next() if it was successful.
210 *
211 * This means if the first four upstream message iterators seek, and
212 * then the fifth one throws `bt2::TryAgain`, then the next time
213 * this method executes, the first four upstream message iterators
214 * will seek again. That being said, it's such an unlikely scenario
215 * that the simplicity outweighs performance concerns here.
216 */
217 _mHeap.clear();
218 _mUpstreamMsgItersToReload.clear();
219
220 /* Make each upstream message iterator seek */
221 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
222 /* This may throw! */
223 upstreamMsgIter->seekBeginning();
224 }
225
226 /*
227 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
228 * next call to _next() will deal with those.
229 */
230 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
231 _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
232 }
233 }
234
235 namespace {
236
237 std::string formatClkClsOrigin(const bt2::ClockOriginView clkClsOrigin, const char * const prefix,
238 const std::uint64_t graphMipVersion)
239 {
240 if (graphMipVersion == 0) {
241 return fmt::format("{}clock-class-origin-is-unix-epoch={}", prefix,
242 clkClsOrigin.isUnixEpoch());
243 } else {
244 return fmt::format("{0}clock-class-origin-ns={1}, {0}clock-class-origin-name={2}, "
245 "{0}clock-class-origin-uid={3}",
246 prefix, clkClsOrigin.nameSpace(), clkClsOrigin.name(),
247 clkClsOrigin.uid());
248 }
249 }
250
251 std::string formatClkClsId(const bt2::ConstClockClass clkCls, const char * const prefix,
252 const std::uint64_t graphMipVersion)
253 {
254 if (graphMipVersion == 0) {
255 if (const auto uuid = clkCls.uuid()) {
256 return fmt::format("{}clock-class-uuid={}", prefix, *uuid);
257 } else {
258 return fmt::format("{}clock-class-uuid=(none)", prefix);
259 }
260 } else {
261 return fmt::format("{0}clock-class-ns={1}, {0}clock-class-name={2}, {0}clock-class-uid={3}",
262 prefix, clkCls.nameSpace(), clkCls.name(), clkCls.uid());
263 }
264 }
265
266 std::string formatClkCls(const bt2::ConstClockClass clkCls, const char * const prefix,
267 const std::uint64_t graphMipVersion)
268 {
269 if (graphMipVersion == 0) {
270 return fmt::format("{}clock-class-addr={}, {}clock-class-name={}, {}, {}", prefix,
271 fmt::ptr(clkCls.libObjPtr()), prefix, clkCls.name(),
272 formatClkClsId(clkCls, prefix, graphMipVersion),
273 formatClkClsOrigin(clkCls.origin(), prefix, graphMipVersion));
274 } else {
275 return fmt::format("{}clock-class-addr={}, {}, {}", prefix, fmt::ptr(clkCls.libObjPtr()),
276 formatClkClsId(clkCls, prefix, graphMipVersion),
277 formatClkClsOrigin(clkCls.origin(), prefix, graphMipVersion));
278 }
279 }
280
281 std::string formatStreamCls(const bt2ccv::ClockCorrelationError& error,
282 const bool withTrailingComma)
283 {
284 if (const auto streamCls = error.streamCls()) {
285 return fmt::format("stream-class-addr={}, stream-class-name=\"{}\", stream-class-id={}{}",
286 fmt::ptr(streamCls->libObjPtr()), streamCls->name(), streamCls->id(),
287 withTrailingComma ? ", " : "");
288 } else {
289 return std::string {};
290 }
291 }
292
293 } /* namespace */
294
295 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
296 {
297 if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
298 /*
299 * We don't care about the other types: all the messages related
300 * to a given stream shared the same default clock class, if
301 * any.
302 */
303 return;
304 }
305
306 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg.type());
307
308 try {
309 _mClkCorrValidator.validate(msg, this->_component()._graphMipVersion());
310 } catch (const bt2ccv::ClockCorrelationError& error) {
311 using Type = bt2ccv::ClockCorrelationError::Type;
312
313 const auto actualClkCls = error.actualClockCls();
314 const auto refClkCls = error.refClockCls();
315 const auto graphMipVersion = this->_component()._graphMipVersion();
316 const auto formatExpClkClsOrigin = [&] {
317 return formatClkClsOrigin(refClkCls->origin(), "expected-", graphMipVersion);
318 };
319 const auto clkCls = [&] {
320 return formatClkClsId(*refClkCls, "expected", graphMipVersion);
321 };
322 const auto formatActClkCls = [&] {
323 return formatClkCls(*actualClkCls, "", graphMipVersion);
324 };
325 const auto formatExpClkCls = [&] {
326 return formatClkCls(*refClkCls, "expected-", graphMipVersion);
327 };
328
329 switch (error.type()) {
330 case Type::ExpectingNoClockClassGotOne:
331 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Expecting no clock class, got one: {}{}",
332 formatStreamCls(error, true), formatActClkCls());
333
334 case Type::ExpectingOriginKnownGotNoClockClass:
335 if (graphMipVersion == 0) {
336 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
337 bt2::Error, "Expecting a clock class with a Unix epoch origin, got none: {}",
338 formatStreamCls(error, false));
339 } else {
340 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
341 bt2::Error, "Expecting a clock class with a known origin, got none: {}{}",
342 formatStreamCls(error, true), formatExpClkClsOrigin());
343 }
344
345 case Type::ExpectingOriginKnownGotUnknownOrigin:
346 if (graphMipVersion == 0) {
347 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
348 bt2::Error,
349 "Expecting a clock class with a Unix epoch origin, got one with an unknown "
350 "origin: {}{}",
351 formatStreamCls(error, true), formatActClkCls());
352 } else {
353 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
354 bt2::Error,
355 "Expecting a clock class with a known origin, got one with an unknown origin: "
356 "{}{}, {}",
357 formatStreamCls(error, true), formatActClkCls(), formatExpClkClsOrigin());
358 }
359
360 case Type::ExpectingOriginKnownGotOtherOrigin:
361 BT_ASSERT(graphMipVersion > 0);
362 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
363 bt2::Error,
364 "Expecting a clock class with a known origin, got one with a wrong origin: {}{}, {}",
365 formatStreamCls(error, true), formatActClkCls(), formatExpClkClsOrigin());
366
367 case Type::ExpectingOriginUnknownWithIdGotNoClockClass:
368 if (graphMipVersion == 0) {
369 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
370 bt2::Error,
371 "Expecting a clock class with an unknown origin and a specific UUID, got none: {}{}",
372 formatStreamCls(error, true), clkCls());
373 } else {
374 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
375 bt2::Error,
376 "Expecting a clock class with an unknown origin and a specific identity, got none: {}{}",
377 formatStreamCls(error, true), clkCls());
378 }
379
380 case Type::ExpectingOriginUnknownWithIdGotKnownOrigin:
381 if (graphMipVersion == 0) {
382 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
383 bt2::Error,
384 "Expecting a clock class with an unknown origin and a specific UUID, got one "
385 "with a Unix epoch origin: {}{}, {}",
386 formatStreamCls(error, true), formatActClkCls(), clkCls());
387 } else {
388 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
389 bt2::Error,
390 "Expecting a clock class with an unknown origin and a specific identity, got one "
391 "with a known origin: {}{}, {}",
392 formatStreamCls(error, true), formatActClkCls(), clkCls());
393 }
394
395 case Type::ExpectingOriginUnknownWithIdGotWithoutId:
396 if (graphMipVersion == 0) {
397 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
398 bt2::Error,
399 "Expecting a clock class with an unknown origin and a specific UUID, got one "
400 "without a UUID: {}{}, {}",
401 formatStreamCls(error, true), formatActClkCls(), clkCls());
402 } else {
403 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
404 bt2::Error,
405 "Expecting a clock class with an unknown origin and a specific identity, got one "
406 "without identity: {}{}, {}",
407 formatStreamCls(error, true), formatActClkCls(), clkCls());
408 }
409
410 case Type::ExpectingOriginUnknownWithIdGotOtherId:
411 if (graphMipVersion == 0) {
412 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
413 bt2::Error,
414 "Expecting a clock class with an unknown origin and a specific UUID, got one with "
415 "a different UUID: {}{}, {}",
416 formatStreamCls(error, true), formatActClkCls(), clkCls());
417 } else {
418 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
419 bt2::Error,
420 "Expecting a clock class with an unknown origin and a specific identity, got one with "
421 "a different identity: {}{}, {}",
422 formatStreamCls(error, true), formatActClkCls(), clkCls());
423 }
424
425 case Type::ExpectingOriginUnknownWithoutIdGotNoClockClass:
426 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Expecting a clock class, got none: {}{}",
427 formatStreamCls(error, true), formatExpClkCls());
428
429 case Type::ExpectingOriginUnknownWithoutIdGotOtherClockClass:
430 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Unexpected clock class: {}{}, {}",
431 formatStreamCls(error, true), formatActClkCls(),
432 formatExpClkCls());
433 }
434 }
435 }
436
437 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger,
438 const std::uint64_t graphMipVersion) :
439 _mLogger {logger},
440 _mMsgComparator {graphMipVersion}
441 {
442 }
443
444 bool MsgIter::_HeapComparator::operator()(
445 const UpstreamMsgIter * const upstreamMsgIterA,
446 const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
447 {
448 /* The two messages to compare */
449 const auto msgA = upstreamMsgIterA->msg();
450 const auto msgB = upstreamMsgIterB->msg();
451 auto& msgTsA = upstreamMsgIterA->msgTs();
452 auto& msgTsB = upstreamMsgIterB->msgTs();
453
454 if (_mLogger.wouldLogT()) {
455 BT_CPPLOGT("Comparing two messages: "
456 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
457 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
458 upstreamMsgIterA->portName(), msgA.type(), optMsgTsStr(msgTsA),
459 upstreamMsgIterB->portName(), msgB.type(), optMsgTsStr(msgTsB));
460 }
461
462 /*
463 * Try to compare using timestamps.
464 *
465 * If both timestamps are set and their values are different, then
466 * use this to establish the ordering of the two messages.
467 *
468 * If one timestamp is set, but not the other, the latter always
469 * wins. This is because, for a given upstream message iterator, we
470 * need to consume all the messages having no timestamp so that we
471 * can reach a message with a timestamp to compare it.
472 *
473 * Otherwise, we'll fall back to using
474 * common_muxing_compare_messages().
475 */
476 if (G_LIKELY(msgTsA && msgTsB)) {
477 if (*msgTsA < *msgTsB) {
478 /*
479 * Return `true` because `_mHeap.top()` provides the
480 * "greatest" element. For us, the "greatest" message is
481 * the oldest one, that is, the one having the smallest
482 * timestamp.
483 */
484 BT_CPPLOGT("Timestamp of message A is less than timestamp of message B: oldest=A");
485 return true;
486 } else if (*msgTsA > *msgTsB) {
487 BT_CPPLOGT("Timestamp of message A is greater than timestamp of message B: oldest=B");
488 return false;
489 }
490 } else if (msgTsA && !msgTsB) {
491 BT_CPPLOGT("Message A has a timestamp, but message B has none: oldest=B");
492 return false;
493 } else if (!msgTsA && msgTsB) {
494 BT_CPPLOGT("Message B has a timestamp, but message A has none: oldest=A");
495 return true;
496 }
497
498 /*
499 * Comparison failed using timestamps: determine an ordering using
500 * arbitrary properties, but in a deterministic way.
501 *
502 * common_muxing_compare_messages() returns less than 0 if the first
503 * message is considered older than the second, which corresponds to
504 * this comparator returning `true`.
505 */
506 const auto res = _mMsgComparator.compare(msgA, msgB) < 0;
507
508 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
509 res ? "A" : "B");
510 return res;
511 }
512
513 } /* namespace bt2mux */
This page took 0.043475 seconds and 5 git commands to generate.