526be00726fd42edc72ddca20b4d0ea029c45608
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #include <algorithm>
8
9 #include <glib.h>
10
11 #include <babeltrace2/babeltrace.h>
12
13 #include "common/common.h"
14 #include "cpp-common/bt2c/call.hpp"
15 #include "cpp-common/bt2s/make-unique.hpp"
16 #include "cpp-common/vendor/fmt/format.h"
17
18 #include "plugins/common/muxing/muxing.h"
19
20 #include "comp.hpp"
21 #include "msg-iter.hpp"
22
23 namespace bt2mux {
24
25 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
26 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
27 bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
28 _mHeap {_HeapComparator {_mLogger}}
29 {
30 /*
31 * Create one upstream message iterator for each connected
32 * input port.
33 */
34 auto canSeekForward = true;
35
36 for (const auto inputPort : this->_component()._inputPorts()) {
37 if (!inputPort.isConnected()) {
38 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
39 continue;
40 }
41
42 /*
43 * Create new upstream message iterator and immediately make it
44 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
45 * deal with it when downstream calls next()).
46 */
47 auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
48 this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
49
50 canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
51 _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
52 _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
53 }
54
55 /* Set the "can seek forward" configuration */
56 cfg.canSeekForward(canSeekForward);
57 }
58
59 namespace {
60
61 std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
62 {
63 if (ts) {
64 return fmt::to_string(*ts);
65 }
66
67 return "none";
68 }
69
70 } /* namespace */
71
72 void MsgIter::_next(bt2::ConstMessageArray& msgs)
73 {
74 /* Make sure all upstream message iterators are part of the heap */
75 this->_ensureFullHeap();
76
77 while (msgs.length() < msgs.capacity()) {
78 /* Empty heap? */
79 if (G_UNLIKELY(_mHeap.isEmpty())) {
80 /* No more upstream messages! */
81 return;
82 }
83
84 /*
85 * Retrieve the upstream message iterator having the oldest message.
86 */
87 auto& oldestUpstreamMsgIter = *_mHeap.top();
88
89 /* Validate the clock class of the oldest message */
90 this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
91
92 /* Append the oldest message and discard it */
93 msgs.append(oldestUpstreamMsgIter.msg().shared());
94
95 if (_mLogger.wouldLogD()) {
96 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
97 oldestUpstreamMsgIter.portName(),
98 optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
99 }
100
101 oldestUpstreamMsgIter.discard();
102
103 /*
104 * Immediately try to reload `oldestUpstreamMsgIter`.
105 *
106 * The possible outcomes are:
107 *
108 * There's an available message:
109 * Call `_mHeap.replaceTop()` to bring
110 * `oldestUpstreamMsgIter` back to the heap, performing a
111 * single heap rebalance.
112 *
113 * There isn't an available message (ended):
114 * Remove `oldestUpstreamMsgIter` from the heap.
115 *
116 * `bt2::TryAgain` is thrown:
117 * Remove `oldestUpstreamMsgIter` from the heap.
118 *
119 * Add `oldestUpstreamMsgIter` to the set of upstream
120 * message iterators to reload. The next call to _next()
121 * will move it to the heap again (if not ended) after
122 * having successfully called reload().
123 */
124 BT_CPPLOGD(
125 "Trying to reload upstream message iterator having the oldest message: port-name={}",
126 oldestUpstreamMsgIter.portName());
127
128 try {
129 if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
130 /* New current message: update heap */
131 _mHeap.replaceTop(&oldestUpstreamMsgIter);
132 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
133 oldestUpstreamMsgIter.portName(), _mHeap.len());
134 } else {
135 _mHeap.removeTop();
136 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
137 "port-name{}, heap-len={}",
138 oldestUpstreamMsgIter.portName(), _mHeap.len());
139 }
140 } catch (const bt2::TryAgain&) {
141 _mHeap.removeTop();
142 _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
143 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
144 "port-name={}, heap-len={}, to-reload-len={}",
145 oldestUpstreamMsgIter.portName(), _mHeap.len(),
146 _mUpstreamMsgItersToReload.size());
147 throw;
148 }
149 }
150 }
151
152 void MsgIter::_ensureFullHeap()
153 {
154 /*
155 * Always remove from `_mUpstreamMsgItersToReload` when reload()
156 * doesn't throw.
157 *
158 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
159 * then we don't need it anymore (remains alive in
160 * `_mUpstreamMsgIters`).
161 */
162 for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
163 it = _mUpstreamMsgItersToReload.erase(it)) {
164 auto& upstreamMsgIter = **it;
165
166 BT_CPPLOGD("Handling upstream message iterator to reload: "
167 "port-name={}, heap-len={}, to-reload-len={}",
168 upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
169
170 if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
171 /* New current message: move to heap */
172 _mHeap.insert(&upstreamMsgIter);
173 BT_CPPLOGD("More messages available; "
174 "inserted upstream message iterator into heap from \"to reload\" set: "
175 "port-name={}, heap-len={}",
176 upstreamMsgIter.portName(), _mHeap.len());
177 } else {
178 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
179 "port-name={}",
180 upstreamMsgIter.portName());
181 }
182 }
183 }
184
185 bool MsgIter::_canSeekBeginning()
186 {
187 /*
188 * We can only seek our beginning if all our upstream message
189 * iterators also can.
190 */
191 return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
192 [](UpstreamMsgIter::UP& upstreamMsgIter) {
193 return upstreamMsgIter->canSeekBeginning();
194 });
195 }
196
197 void MsgIter::_seekBeginning()
198 {
199 /*
200 * The current approach is that this operation is either successful
201 * (all upstream message iterators seek) or not. If it's not, then
202 * we don't keep any state that some sought and some didn't: we'll
203 * restart the whole process when the user tries to seek again.
204 *
205 * The first step is to clear all the containers of upstream message
206 * iterator pointers so that we can process what's in
207 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
208 * if any seeking fails below, the downstream user is required to
209 * try the "seek beginning" operation again and only call
210 * bt_message_iterator_next() if it was successful.
211 *
212 * This means if the first four upstream message iterators seek, and
213 * then the fifth one throws `bt2::TryAgain`, then the next time
214 * this method executes, the first four upstream message iterators
215 * will seek again. That being said, it's such an unlikely scenario
216 * that the simplicity outweighs performance concerns here.
217 */
218 _mHeap.clear();
219 _mUpstreamMsgItersToReload.clear();
220
221 /* Also reset clock class expectation */
222 _mClkClsExpectation = _ClkClsExpectation::ANY;
223 _mExpectedClkClsUuid.reset();
224
225 /* Make each upstream message iterator seek */
226 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
227 /* This may throw! */
228 upstreamMsgIter->seekBeginning();
229 }
230
231 /*
232 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
233 * next call to _next() will deal with those.
234 */
235 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
236 _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
237 }
238 }
239
240 namespace {
241
242 const char *msgTypeStr(const bt2::ConstMessage msg) noexcept
243 {
244 return bt_common_message_type_string(static_cast<bt_message_type>(msg.type()));
245 }
246
247 std::string optLogStr(const char * const str) noexcept
248 {
249 return str ? fmt::format("\"{}\"", str) : "(none)";
250 }
251
252 } /* namespace */
253
254 void MsgIter::_setClkClsExpectation(
255 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept
256 {
257 BT_ASSERT_DBG(_mClkClsExpectation == _ClkClsExpectation::ANY);
258
259 /* No initial clock class: also expect none afterwards */
260 if (!clkCls) {
261 _mClkClsExpectation = _ClkClsExpectation::NONE;
262 return;
263 }
264
265 /*
266 * This is the first clock class that this message iterator
267 * encounters. Its properties determine what to expect for the whole
268 * lifetime of the iterator.
269 */
270 if (clkCls->originIsUnixEpoch()) {
271 /* Expect clock classes having a Unix epoch origin*/
272 _mClkClsExpectation = _ClkClsExpectation::ORIG_IS_UNIX_EPOCH;
273 } else {
274 if (clkCls->uuid()) {
275 /*
276 * Expect clock classes not having a Unix epoch origin and
277 * with a specific UUID.
278 */
279 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID;
280 _mExpectedClkClsUuid = *clkCls->uuid();
281 } else {
282 /*
283 * Expect clock classes not having a Unix epoch origin and
284 * without a UUID.
285 */
286 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID;
287 }
288 }
289 }
290
291 void MsgIter::_makeSureClkClsIsExpected(
292 const bt2::ConstMessage msg,
293 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const
294 {
295 BT_ASSERT_DBG(_mClkClsExpectation != _ClkClsExpectation::ANY);
296
297 if (!clkCls) {
298 if (_mClkClsExpectation != _ClkClsExpectation::NONE) {
299 /*
300 * `msg` is a stream beginning message because a message
301 * iterator inactivity message always has a clock class.
302 */
303 const auto streamCls = msg.asStreamBeginning().stream().cls();
304
305 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
306 "Expecting a clock class, but got none: "
307 "stream-class-addr={}, stream-class-name=\"{}\", "
308 "stream-class-id={}",
309 static_cast<const void *>(streamCls.libObjPtr()),
310 optLogStr(streamCls.name()), streamCls.id());
311 }
312
313 return;
314 }
315
316 const auto clkClsAddr = static_cast<const void *>(clkCls->libObjPtr());
317
318 switch (_mClkClsExpectation) {
319 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH:
320 if (!clkCls->originIsUnixEpoch()) {
321 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
322 "Expecting a clock class having a Unix epoch origin, "
323 "but got one not having a Unix epoch origin: "
324 "clock-class-addr={}, clock-class-name={}",
325 clkClsAddr, optLogStr(clkCls->name()));
326 }
327
328 break;
329 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID:
330 BT_ASSERT_DBG(!_mExpectedClkClsUuid);
331
332 if (clkCls->originIsUnixEpoch()) {
333 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
334 bt2::Error,
335 "Expecting a clock class not having a Unix epoch origin, "
336 "but got one having a Unix epoch origin: "
337 "clock-class-addr={}, clock-class-name={}",
338 clkClsAddr, optLogStr(clkCls->name()));
339 }
340
341 if (clkCls->uuid()) {
342 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
343 bt2::Error,
344 "Expecting a clock class without a UUID, but got one with a UUID: "
345 "clock-class-addr={}, clock-class-name={}, uuid={}",
346 clkClsAddr, optLogStr(clkCls->name()), clkCls->uuid()->str());
347 }
348
349 break;
350 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID:
351 BT_ASSERT_DBG(_mExpectedClkClsUuid);
352
353 if (clkCls->originIsUnixEpoch()) {
354 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
355 bt2::Error,
356 "Expecting a clock class not having a Unix epoch origin, "
357 "but got one having a Unix epoch origin: "
358 "clock-class-addr={}, clock-class-name={}",
359 clkClsAddr, optLogStr(clkCls->name()));
360 }
361
362 if (!clkCls->uuid()) {
363 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
364 bt2::Error,
365 "Expecting a clock class with a UUID, but got one without a UUID: "
366 "clock-class-addr={}, clock-class-name={}",
367 clkClsAddr, optLogStr(clkCls->name()));
368 }
369
370 if (*clkCls->uuid() != bt2c::UuidView {*_mExpectedClkClsUuid}) {
371 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
372 "Expecting a clock class with a specific UUID, "
373 "but got one with a different UUID: "
374 "clock-class-addr={}, clock-class-name={}, "
375 "expected-uuid=\"{}\", uuid=\"{}\"",
376 clkClsAddr, optLogStr(clkCls->name()),
377 _mExpectedClkClsUuid->str(), clkCls->uuid()->str());
378 }
379
380 break;
381 case _ClkClsExpectation::NONE:
382 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
383 "Expecting no clock class, but got one: "
384 "clock-class-addr={}, clock-class-name={}",
385 clkClsAddr, optLogStr(clkCls->name()));
386 break;
387 default:
388 bt_common_abort();
389 }
390 }
391
392 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
393 {
394 if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
395 /*
396 * We don't care about the other types: all the messages related
397 * to a given stream shared the same default clock class, if
398 * any.
399 */
400 return;
401 }
402
403 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msgTypeStr(msg));
404
405 /* Get the clock class, if any, of `msg` */
406 const auto clkCls = bt2c::call([msg]() -> bt2::OptionalBorrowedObject<bt2::ConstClockClass> {
407 if (msg.isStreamBeginning()) {
408 return msg.asStreamBeginning().stream().cls().defaultClockClass();
409 } else {
410 BT_ASSERT(msg.isMessageIteratorInactivity());
411 return msg.asMessageIteratorInactivity().clockSnapshot().clockClass();
412 }
413 });
414
415 /* Set the expectation or check it */
416 if (_mClkClsExpectation == _ClkClsExpectation::ANY) {
417 /* First message: set the expectation */
418 this->_setClkClsExpectation(clkCls);
419 } else {
420 /* Make sure clock class is expected */
421 this->_makeSureClkClsIsExpected(msg, clkCls);
422 }
423 }
424
425 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
426 {
427 }
428
429 bool MsgIter::_HeapComparator::operator()(
430 const UpstreamMsgIter * const upstreamMsgIterA,
431 const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
432 {
433 /* The two messages to compare */
434 const auto msgA = upstreamMsgIterA->msg();
435 const auto msgB = upstreamMsgIterB->msg();
436 auto& msgTsA = upstreamMsgIterA->msgTs();
437 auto& msgTsB = upstreamMsgIterB->msgTs();
438
439 if (_mLogger.wouldLogT()) {
440 BT_CPPLOGT("Comparing two messages: "
441 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
442 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
443 upstreamMsgIterA->portName(), msgTypeStr(msgA), optMsgTsStr(msgTsA),
444 upstreamMsgIterB->portName(), msgTypeStr(msgB), optMsgTsStr(msgTsB));
445 }
446
447 /*
448 * Try to compare using timestamps.
449 *
450 * If both timestamps are set and their values are different, then
451 * use this to establish the ordering of the two messages.
452 *
453 * If one timestamp is set, but not the other, the latter always
454 * wins. This is because, for a given upstream message iterator, we
455 * need to consume all the messages having no timestamp so that we
456 * can reach a message with a timestamp to compare it.
457 *
458 * Otherwise, we'll fall back to using
459 * common_muxing_compare_messages().
460 */
461 if (G_LIKELY(msgTsA && msgTsB)) {
462 if (*msgTsA < *msgTsB) {
463 /*
464 * Return `true` because `_mHeap.top()` provides the
465 * "greatest" element. For us, the "greatest" message is
466 * the oldest one, that is, the one having the smallest
467 * timestamp.
468 */
469 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
470 return true;
471 } else if (*msgTsA > *msgTsB) {
472 BT_CPPLOGT_STR(
473 "Timestamp of message A is greater than timestamp of message B: oldest=B");
474 return false;
475 }
476 } else if (msgTsA && !msgTsB) {
477 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
478 return false;
479 } else if (!msgTsA && msgTsB) {
480 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
481 return true;
482 }
483
484 /*
485 * Comparison failed using timestamps: determine an ordering using
486 * arbitrary properties, but in a deterministic way.
487 *
488 * common_muxing_compare_messages() returns less than 0 if the first
489 * message is considered older than the second, which corresponds to
490 * this comparator returning `true`.
491 */
492 const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
493
494 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
495 res ? "A" : "B");
496 return res;
497 }
498
499 } /* namespace bt2mux */
This page took 0.041295 seconds and 3 git commands to generate.