2121a0f222595687d4164226311163af024c3acd
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #include <algorithm>
8
9 #include <glib.h>
10
11 #include <babeltrace2/babeltrace.h>
12
13 #include "common/common.h"
14 #include "cpp-common/bt2c/call.hpp"
15 #include "cpp-common/bt2c/fmt.hpp"
16 #include "cpp-common/bt2s/make-unique.hpp"
17 #include "cpp-common/vendor/fmt/format.h"
18
19 #include "plugins/common/muxing/muxing.h"
20
21 #include "comp.hpp"
22 #include "msg-iter.hpp"
23
24 namespace bt2mux {
25
26 MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
27 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
28 bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
29 _mHeap {_HeapComparator {_mLogger}}
30 {
31 /*
32 * Create one upstream message iterator for each connected
33 * input port.
34 */
35 auto canSeekForward = true;
36
37 for (const auto inputPort : this->_component()._inputPorts()) {
38 if (!inputPort.isConnected()) {
39 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
40 continue;
41 }
42
43 /*
44 * Create new upstream message iterator and immediately make it
45 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
46 * deal with it when downstream calls next()).
47 */
48 auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
49 this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
50
51 canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
52 _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
53 _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
54 }
55
56 /* Set the "can seek forward" configuration */
57 cfg.canSeekForward(canSeekForward);
58 }
59
60 namespace {
61
62 std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
63 {
64 if (ts) {
65 return fmt::to_string(*ts);
66 }
67
68 return "none";
69 }
70
71 } /* namespace */
72
73 void MsgIter::_next(bt2::ConstMessageArray& msgs)
74 {
75 /* Make sure all upstream message iterators are part of the heap */
76 this->_ensureFullHeap();
77
78 while (msgs.length() < msgs.capacity()) {
79 /* Empty heap? */
80 if (G_UNLIKELY(_mHeap.isEmpty())) {
81 /* No more upstream messages! */
82 return;
83 }
84
85 /*
86 * Retrieve the upstream message iterator having the oldest message.
87 */
88 auto& oldestUpstreamMsgIter = *_mHeap.top();
89
90 /* Validate the clock class of the oldest message */
91 this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
92
93 /* Append the oldest message and discard it */
94 msgs.append(oldestUpstreamMsgIter.msg().shared());
95
96 if (_mLogger.wouldLogD()) {
97 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
98 oldestUpstreamMsgIter.portName(),
99 optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
100 }
101
102 oldestUpstreamMsgIter.discard();
103
104 /*
105 * Immediately try to reload `oldestUpstreamMsgIter`.
106 *
107 * The possible outcomes are:
108 *
109 * There's an available message:
110 * Call `_mHeap.replaceTop()` to bring
111 * `oldestUpstreamMsgIter` back to the heap, performing a
112 * single heap rebalance.
113 *
114 * There isn't an available message (ended):
115 * Remove `oldestUpstreamMsgIter` from the heap.
116 *
117 * `bt2::TryAgain` is thrown:
118 * Remove `oldestUpstreamMsgIter` from the heap.
119 *
120 * Add `oldestUpstreamMsgIter` to the set of upstream
121 * message iterators to reload. The next call to _next()
122 * will move it to the heap again (if not ended) after
123 * having successfully called reload().
124 */
125 BT_CPPLOGD(
126 "Trying to reload upstream message iterator having the oldest message: port-name={}",
127 oldestUpstreamMsgIter.portName());
128
129 try {
130 if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
131 /* New current message: update heap */
132 _mHeap.replaceTop(&oldestUpstreamMsgIter);
133 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
134 oldestUpstreamMsgIter.portName(), _mHeap.len());
135 } else {
136 _mHeap.removeTop();
137 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
138 "port-name{}, heap-len={}",
139 oldestUpstreamMsgIter.portName(), _mHeap.len());
140 }
141 } catch (const bt2::TryAgain&) {
142 _mHeap.removeTop();
143 _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
144 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
145 "port-name={}, heap-len={}, to-reload-len={}",
146 oldestUpstreamMsgIter.portName(), _mHeap.len(),
147 _mUpstreamMsgItersToReload.size());
148 throw;
149 }
150 }
151 }
152
153 void MsgIter::_ensureFullHeap()
154 {
155 /*
156 * Always remove from `_mUpstreamMsgItersToReload` when reload()
157 * doesn't throw.
158 *
159 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
160 * then we don't need it anymore (remains alive in
161 * `_mUpstreamMsgIters`).
162 */
163 for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
164 it = _mUpstreamMsgItersToReload.erase(it)) {
165 auto& upstreamMsgIter = **it;
166
167 BT_CPPLOGD("Handling upstream message iterator to reload: "
168 "port-name={}, heap-len={}, to-reload-len={}",
169 upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
170
171 if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
172 /* New current message: move to heap */
173 _mHeap.insert(&upstreamMsgIter);
174 BT_CPPLOGD("More messages available; "
175 "inserted upstream message iterator into heap from \"to reload\" set: "
176 "port-name={}, heap-len={}",
177 upstreamMsgIter.portName(), _mHeap.len());
178 } else {
179 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
180 "port-name={}",
181 upstreamMsgIter.portName());
182 }
183 }
184 }
185
186 bool MsgIter::_canSeekBeginning()
187 {
188 /*
189 * We can only seek our beginning if all our upstream message
190 * iterators also can.
191 */
192 return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
193 [](UpstreamMsgIter::UP& upstreamMsgIter) {
194 return upstreamMsgIter->canSeekBeginning();
195 });
196 }
197
198 void MsgIter::_seekBeginning()
199 {
200 /*
201 * The current approach is that this operation is either successful
202 * (all upstream message iterators seek) or not. If it's not, then
203 * we don't keep any state that some sought and some didn't: we'll
204 * restart the whole process when the user tries to seek again.
205 *
206 * The first step is to clear all the containers of upstream message
207 * iterator pointers so that we can process what's in
208 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
209 * if any seeking fails below, the downstream user is required to
210 * try the "seek beginning" operation again and only call
211 * bt_message_iterator_next() if it was successful.
212 *
213 * This means if the first four upstream message iterators seek, and
214 * then the fifth one throws `bt2::TryAgain`, then the next time
215 * this method executes, the first four upstream message iterators
216 * will seek again. That being said, it's such an unlikely scenario
217 * that the simplicity outweighs performance concerns here.
218 */
219 _mHeap.clear();
220 _mUpstreamMsgItersToReload.clear();
221
222 /* Also reset clock class expectation */
223 _mClkClsExpectation = _ClkClsExpectation::ANY;
224 _mExpectedClkClsUuid.reset();
225
226 /* Make each upstream message iterator seek */
227 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
228 /* This may throw! */
229 upstreamMsgIter->seekBeginning();
230 }
231
232 /*
233 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
234 * next call to _next() will deal with those.
235 */
236 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
237 _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
238 }
239 }
240
241 namespace {
242
243 std::string optLogStr(const char * const str) noexcept
244 {
245 return str ? fmt::format("\"{}\"", str) : "(none)";
246 }
247
248 } /* namespace */
249
250 void MsgIter::_setClkClsExpectation(
251 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept
252 {
253 BT_ASSERT_DBG(_mClkClsExpectation == _ClkClsExpectation::ANY);
254
255 /* No initial clock class: also expect none afterwards */
256 if (!clkCls) {
257 _mClkClsExpectation = _ClkClsExpectation::NONE;
258 return;
259 }
260
261 /*
262 * This is the first clock class that this message iterator
263 * encounters. Its properties determine what to expect for the whole
264 * lifetime of the iterator.
265 */
266 if (clkCls->originIsUnixEpoch()) {
267 /* Expect clock classes having a Unix epoch origin*/
268 _mClkClsExpectation = _ClkClsExpectation::ORIG_IS_UNIX_EPOCH;
269 } else {
270 if (clkCls->uuid()) {
271 /*
272 * Expect clock classes not having a Unix epoch origin and
273 * with a specific UUID.
274 */
275 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID;
276 _mExpectedClkClsUuid = *clkCls->uuid();
277 } else {
278 /*
279 * Expect clock classes not having a Unix epoch origin and
280 * without a UUID.
281 */
282 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID;
283 }
284 }
285 }
286
287 void MsgIter::_makeSureClkClsIsExpected(
288 const bt2::ConstMessage msg,
289 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const
290 {
291 BT_ASSERT_DBG(_mClkClsExpectation != _ClkClsExpectation::ANY);
292
293 if (!clkCls) {
294 if (_mClkClsExpectation != _ClkClsExpectation::NONE) {
295 /*
296 * `msg` is a stream beginning message because a message
297 * iterator inactivity message always has a clock class.
298 */
299 const auto streamCls = msg.asStreamBeginning().stream().cls();
300
301 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
302 "Expecting a clock class, but got none: "
303 "stream-class-addr={}, stream-class-name=\"{}\", "
304 "stream-class-id={}",
305 static_cast<const void *>(streamCls.libObjPtr()),
306 optLogStr(streamCls.name()), streamCls.id());
307 }
308
309 return;
310 }
311
312 const auto clkClsAddr = static_cast<const void *>(clkCls->libObjPtr());
313
314 switch (_mClkClsExpectation) {
315 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH:
316 if (!clkCls->originIsUnixEpoch()) {
317 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
318 "Expecting a clock class having a Unix epoch origin, "
319 "but got one not having a Unix epoch origin: "
320 "clock-class-addr={}, clock-class-name={}",
321 clkClsAddr, optLogStr(clkCls->name()));
322 }
323
324 break;
325 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID:
326 BT_ASSERT_DBG(!_mExpectedClkClsUuid);
327
328 if (clkCls->originIsUnixEpoch()) {
329 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
330 bt2::Error,
331 "Expecting a clock class not having a Unix epoch origin, "
332 "but got one having a Unix epoch origin: "
333 "clock-class-addr={}, clock-class-name={}",
334 clkClsAddr, optLogStr(clkCls->name()));
335 }
336
337 if (clkCls->uuid()) {
338 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
339 bt2::Error,
340 "Expecting a clock class without a UUID, but got one with a UUID: "
341 "clock-class-addr={}, clock-class-name={}, uuid={}",
342 clkClsAddr, optLogStr(clkCls->name()), clkCls->uuid()->str());
343 }
344
345 break;
346 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID:
347 BT_ASSERT_DBG(_mExpectedClkClsUuid);
348
349 if (clkCls->originIsUnixEpoch()) {
350 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
351 bt2::Error,
352 "Expecting a clock class not having a Unix epoch origin, "
353 "but got one having a Unix epoch origin: "
354 "clock-class-addr={}, clock-class-name={}",
355 clkClsAddr, optLogStr(clkCls->name()));
356 }
357
358 if (!clkCls->uuid()) {
359 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
360 bt2::Error,
361 "Expecting a clock class with a UUID, but got one without a UUID: "
362 "clock-class-addr={}, clock-class-name={}",
363 clkClsAddr, optLogStr(clkCls->name()));
364 }
365
366 if (*clkCls->uuid() != bt2c::UuidView {*_mExpectedClkClsUuid}) {
367 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
368 "Expecting a clock class with a specific UUID, "
369 "but got one with a different UUID: "
370 "clock-class-addr={}, clock-class-name={}, "
371 "expected-uuid=\"{}\", uuid=\"{}\"",
372 clkClsAddr, optLogStr(clkCls->name()),
373 _mExpectedClkClsUuid->str(), clkCls->uuid()->str());
374 }
375
376 break;
377 case _ClkClsExpectation::NONE:
378 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
379 "Expecting no clock class, but got one: "
380 "clock-class-addr={}, clock-class-name={}",
381 clkClsAddr, optLogStr(clkCls->name()));
382 break;
383 default:
384 bt_common_abort();
385 }
386 }
387
388 void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
389 {
390 if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
391 /*
392 * We don't care about the other types: all the messages related
393 * to a given stream shared the same default clock class, if
394 * any.
395 */
396 return;
397 }
398
399 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg.type());
400
401 /* Get the clock class, if any, of `msg` */
402 const auto clkCls = bt2c::call([msg]() -> bt2::OptionalBorrowedObject<bt2::ConstClockClass> {
403 if (msg.isStreamBeginning()) {
404 return msg.asStreamBeginning().streamClassDefaultClockClass();
405 } else {
406 BT_ASSERT(msg.isMessageIteratorInactivity());
407 return msg.asMessageIteratorInactivity().clockSnapshot().clockClass();
408 }
409 });
410
411 /* Set the expectation or check it */
412 if (_mClkClsExpectation == _ClkClsExpectation::ANY) {
413 /* First message: set the expectation */
414 this->_setClkClsExpectation(clkCls);
415 } else {
416 /* Make sure clock class is expected */
417 this->_makeSureClkClsIsExpected(msg, clkCls);
418 }
419 }
420
421 MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
422 {
423 }
424
425 bool MsgIter::_HeapComparator::operator()(
426 const UpstreamMsgIter * const upstreamMsgIterA,
427 const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
428 {
429 /* The two messages to compare */
430 const auto msgA = upstreamMsgIterA->msg();
431 const auto msgB = upstreamMsgIterB->msg();
432 auto& msgTsA = upstreamMsgIterA->msgTs();
433 auto& msgTsB = upstreamMsgIterB->msgTs();
434
435 if (_mLogger.wouldLogT()) {
436 BT_CPPLOGT("Comparing two messages: "
437 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
438 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
439 upstreamMsgIterA->portName(), msgA.type(), optMsgTsStr(msgTsA),
440 upstreamMsgIterB->portName(), msgB.type(), optMsgTsStr(msgTsB));
441 }
442
443 /*
444 * Try to compare using timestamps.
445 *
446 * If both timestamps are set and their values are different, then
447 * use this to establish the ordering of the two messages.
448 *
449 * If one timestamp is set, but not the other, the latter always
450 * wins. This is because, for a given upstream message iterator, we
451 * need to consume all the messages having no timestamp so that we
452 * can reach a message with a timestamp to compare it.
453 *
454 * Otherwise, we'll fall back to using
455 * common_muxing_compare_messages().
456 */
457 if (G_LIKELY(msgTsA && msgTsB)) {
458 if (*msgTsA < *msgTsB) {
459 /*
460 * Return `true` because `_mHeap.top()` provides the
461 * "greatest" element. For us, the "greatest" message is
462 * the oldest one, that is, the one having the smallest
463 * timestamp.
464 */
465 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
466 return true;
467 } else if (*msgTsA > *msgTsB) {
468 BT_CPPLOGT_STR(
469 "Timestamp of message A is greater than timestamp of message B: oldest=B");
470 return false;
471 }
472 } else if (msgTsA && !msgTsB) {
473 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
474 return false;
475 } else if (!msgTsA && msgTsB) {
476 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
477 return true;
478 }
479
480 /*
481 * Comparison failed using timestamps: determine an ordering using
482 * arbitrary properties, but in a deterministic way.
483 *
484 * common_muxing_compare_messages() returns less than 0 if the first
485 * message is considered older than the second, which corresponds to
486 * this comparator returning `true`.
487 */
488 const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
489
490 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
491 res ? "A" : "B");
492 return res;
493 }
494
495 } /* namespace bt2mux */
This page took 0.040062 seconds and 4 git commands to generate.