Fix: flt.utils.muxer: reject two different clock classes with unknown origin and...
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
CommitLineData
fca1d0f5
PP
1/*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7#include <algorithm>
8
9#include <glib.h>
10
11#include <babeltrace2/babeltrace.h>
12
13#include "common/common.h"
14#include "cpp-common/bt2c/call.hpp"
62e8199a 15#include "cpp-common/bt2c/fmt.hpp"
fca1d0f5
PP
16#include "cpp-common/bt2s/make-unique.hpp"
17#include "cpp-common/vendor/fmt/format.h"
18
19#include "plugins/common/muxing/muxing.h"
20
21#include "comp.hpp"
22#include "msg-iter.hpp"
23
24namespace bt2mux {
25
26MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
27 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
28 bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
29 _mHeap {_HeapComparator {_mLogger}}
30{
31 /*
32 * Create one upstream message iterator for each connected
33 * input port.
34 */
35 auto canSeekForward = true;
36
37 for (const auto inputPort : this->_component()._inputPorts()) {
38 if (!inputPort.isConnected()) {
39 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
40 continue;
41 }
42
43 /*
44 * Create new upstream message iterator and immediately make it
45 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
46 * deal with it when downstream calls next()).
47 */
48 auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
49 this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
50
51 canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
52 _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
53 _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
54 }
55
56 /* Set the "can seek forward" configuration */
57 cfg.canSeekForward(canSeekForward);
58}
59
60namespace {
61
62std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
63{
64 if (ts) {
65 return fmt::to_string(*ts);
66 }
67
68 return "none";
69}
70
71} /* namespace */
72
73void MsgIter::_next(bt2::ConstMessageArray& msgs)
74{
75 /* Make sure all upstream message iterators are part of the heap */
76 this->_ensureFullHeap();
77
78 while (msgs.length() < msgs.capacity()) {
79 /* Empty heap? */
80 if (G_UNLIKELY(_mHeap.isEmpty())) {
81 /* No more upstream messages! */
82 return;
83 }
84
85 /*
86 * Retrieve the upstream message iterator having the oldest message.
87 */
88 auto& oldestUpstreamMsgIter = *_mHeap.top();
89
90 /* Validate the clock class of the oldest message */
91 this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
92
93 /* Append the oldest message and discard it */
94 msgs.append(oldestUpstreamMsgIter.msg().shared());
95
96 if (_mLogger.wouldLogD()) {
97 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
98 oldestUpstreamMsgIter.portName(),
99 optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
100 }
101
102 oldestUpstreamMsgIter.discard();
103
104 /*
105 * Immediately try to reload `oldestUpstreamMsgIter`.
106 *
107 * The possible outcomes are:
108 *
109 * There's an available message:
110 * Call `_mHeap.replaceTop()` to bring
111 * `oldestUpstreamMsgIter` back to the heap, performing a
112 * single heap rebalance.
113 *
114 * There isn't an available message (ended):
115 * Remove `oldestUpstreamMsgIter` from the heap.
116 *
117 * `bt2::TryAgain` is thrown:
118 * Remove `oldestUpstreamMsgIter` from the heap.
119 *
120 * Add `oldestUpstreamMsgIter` to the set of upstream
121 * message iterators to reload. The next call to _next()
122 * will move it to the heap again (if not ended) after
123 * having successfully called reload().
124 */
125 BT_CPPLOGD(
126 "Trying to reload upstream message iterator having the oldest message: port-name={}",
127 oldestUpstreamMsgIter.portName());
128
129 try {
130 if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
131 /* New current message: update heap */
132 _mHeap.replaceTop(&oldestUpstreamMsgIter);
133 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
134 oldestUpstreamMsgIter.portName(), _mHeap.len());
135 } else {
136 _mHeap.removeTop();
137 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
138 "port-name{}, heap-len={}",
139 oldestUpstreamMsgIter.portName(), _mHeap.len());
140 }
141 } catch (const bt2::TryAgain&) {
142 _mHeap.removeTop();
143 _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
144 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
145 "port-name={}, heap-len={}, to-reload-len={}",
146 oldestUpstreamMsgIter.portName(), _mHeap.len(),
147 _mUpstreamMsgItersToReload.size());
148 throw;
149 }
150 }
151}
152
153void MsgIter::_ensureFullHeap()
154{
155 /*
156 * Always remove from `_mUpstreamMsgItersToReload` when reload()
157 * doesn't throw.
158 *
159 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
160 * then we don't need it anymore (remains alive in
161 * `_mUpstreamMsgIters`).
162 */
163 for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
164 it = _mUpstreamMsgItersToReload.erase(it)) {
165 auto& upstreamMsgIter = **it;
166
167 BT_CPPLOGD("Handling upstream message iterator to reload: "
168 "port-name={}, heap-len={}, to-reload-len={}",
169 upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
170
171 if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
172 /* New current message: move to heap */
173 _mHeap.insert(&upstreamMsgIter);
174 BT_CPPLOGD("More messages available; "
175 "inserted upstream message iterator into heap from \"to reload\" set: "
176 "port-name={}, heap-len={}",
177 upstreamMsgIter.portName(), _mHeap.len());
178 } else {
179 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
180 "port-name={}",
181 upstreamMsgIter.portName());
182 }
183 }
184}
185
186bool MsgIter::_canSeekBeginning()
187{
188 /*
189 * We can only seek our beginning if all our upstream message
190 * iterators also can.
191 */
192 return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
193 [](UpstreamMsgIter::UP& upstreamMsgIter) {
194 return upstreamMsgIter->canSeekBeginning();
195 });
196}
197
198void MsgIter::_seekBeginning()
199{
200 /*
201 * The current approach is that this operation is either successful
202 * (all upstream message iterators seek) or not. If it's not, then
203 * we don't keep any state that some sought and some didn't: we'll
204 * restart the whole process when the user tries to seek again.
205 *
206 * The first step is to clear all the containers of upstream message
207 * iterator pointers so that we can process what's in
208 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
209 * if any seeking fails below, the downstream user is required to
210 * try the "seek beginning" operation again and only call
211 * bt_message_iterator_next() if it was successful.
212 *
213 * This means if the first four upstream message iterators seek, and
214 * then the fifth one throws `bt2::TryAgain`, then the next time
215 * this method executes, the first four upstream message iterators
216 * will seek again. That being said, it's such an unlikely scenario
217 * that the simplicity outweighs performance concerns here.
218 */
219 _mHeap.clear();
220 _mUpstreamMsgItersToReload.clear();
221
222 /* Also reset clock class expectation */
223 _mClkClsExpectation = _ClkClsExpectation::ANY;
224 _mExpectedClkClsUuid.reset();
225
226 /* Make each upstream message iterator seek */
227 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
228 /* This may throw! */
229 upstreamMsgIter->seekBeginning();
230 }
231
232 /*
233 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
234 * next call to _next() will deal with those.
235 */
236 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
237 _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
238 }
239}
240
241namespace {
242
fca1d0f5
PP
243std::string optLogStr(const char * const str) noexcept
244{
245 return str ? fmt::format("\"{}\"", str) : "(none)";
246}
247
248} /* namespace */
249
250void MsgIter::_setClkClsExpectation(
251 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept
252{
253 BT_ASSERT_DBG(_mClkClsExpectation == _ClkClsExpectation::ANY);
254
255 /* No initial clock class: also expect none afterwards */
256 if (!clkCls) {
257 _mClkClsExpectation = _ClkClsExpectation::NONE;
258 return;
259 }
260
261 /*
262 * This is the first clock class that this message iterator
263 * encounters. Its properties determine what to expect for the whole
264 * lifetime of the iterator.
265 */
266 if (clkCls->originIsUnixEpoch()) {
267 /* Expect clock classes having a Unix epoch origin*/
268 _mClkClsExpectation = _ClkClsExpectation::ORIG_IS_UNIX_EPOCH;
269 } else {
270 if (clkCls->uuid()) {
271 /*
272 * Expect clock classes not having a Unix epoch origin and
273 * with a specific UUID.
274 */
275 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID;
276 _mExpectedClkClsUuid = *clkCls->uuid();
277 } else {
278 /*
279 * Expect clock classes not having a Unix epoch origin and
280 * without a UUID.
281 */
282 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID;
0f635209 283 _mExpectedClkCls = clkCls->shared();
fca1d0f5
PP
284 }
285 }
286}
287
288void MsgIter::_makeSureClkClsIsExpected(
289 const bt2::ConstMessage msg,
290 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const
291{
292 BT_ASSERT_DBG(_mClkClsExpectation != _ClkClsExpectation::ANY);
293
294 if (!clkCls) {
295 if (_mClkClsExpectation != _ClkClsExpectation::NONE) {
296 /*
297 * `msg` is a stream beginning message because a message
298 * iterator inactivity message always has a clock class.
299 */
300 const auto streamCls = msg.asStreamBeginning().stream().cls();
301
302 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
303 "Expecting a clock class, but got none: "
304 "stream-class-addr={}, stream-class-name=\"{}\", "
305 "stream-class-id={}",
306 static_cast<const void *>(streamCls.libObjPtr()),
307 optLogStr(streamCls.name()), streamCls.id());
308 }
309
310 return;
311 }
312
313 const auto clkClsAddr = static_cast<const void *>(clkCls->libObjPtr());
314
315 switch (_mClkClsExpectation) {
316 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH:
317 if (!clkCls->originIsUnixEpoch()) {
318 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
319 "Expecting a clock class having a Unix epoch origin, "
320 "but got one not having a Unix epoch origin: "
321 "clock-class-addr={}, clock-class-name={}",
322 clkClsAddr, optLogStr(clkCls->name()));
323 }
324
325 break;
326 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID:
327 BT_ASSERT_DBG(!_mExpectedClkClsUuid);
0f635209 328 BT_ASSERT_DBG(_mExpectedClkCls);
fca1d0f5 329
0f635209 330 if (clkCls->libObjPtr() != _mExpectedClkCls->libObjPtr()) {
fca1d0f5
PP
331 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
332 bt2::Error,
0f635209
SM
333 "Unexpected clock class: "
334 "expected-clock-class-addr={}, expected-clock-class-name={}, "
335 "actual-clock-class-addr={}, actual-clock-class-name={}",
336 fmt::ptr(_mExpectedClkCls->libObjPtr()), optLogStr(_mExpectedClkCls->name()),
fca1d0f5
PP
337 clkClsAddr, optLogStr(clkCls->name()));
338 }
339
fca1d0f5
PP
340 break;
341 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID:
342 BT_ASSERT_DBG(_mExpectedClkClsUuid);
0f635209 343 BT_ASSERT_DBG(!_mExpectedClkCls);
fca1d0f5
PP
344
345 if (clkCls->originIsUnixEpoch()) {
346 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
347 bt2::Error,
348 "Expecting a clock class not having a Unix epoch origin, "
349 "but got one having a Unix epoch origin: "
350 "clock-class-addr={}, clock-class-name={}",
351 clkClsAddr, optLogStr(clkCls->name()));
352 }
353
354 if (!clkCls->uuid()) {
355 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
356 bt2::Error,
357 "Expecting a clock class with a UUID, but got one without a UUID: "
358 "clock-class-addr={}, clock-class-name={}",
359 clkClsAddr, optLogStr(clkCls->name()));
360 }
361
362 if (*clkCls->uuid() != bt2c::UuidView {*_mExpectedClkClsUuid}) {
363 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
364 "Expecting a clock class with a specific UUID, "
365 "but got one with a different UUID: "
366 "clock-class-addr={}, clock-class-name={}, "
367 "expected-uuid=\"{}\", uuid=\"{}\"",
368 clkClsAddr, optLogStr(clkCls->name()),
369 _mExpectedClkClsUuid->str(), clkCls->uuid()->str());
370 }
371
372 break;
373 case _ClkClsExpectation::NONE:
374 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
375 "Expecting no clock class, but got one: "
376 "clock-class-addr={}, clock-class-name={}",
377 clkClsAddr, optLogStr(clkCls->name()));
378 break;
379 default:
380 bt_common_abort();
381 }
382}
383
384void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
385{
386 if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
387 /*
388 * We don't care about the other types: all the messages related
389 * to a given stream shared the same default clock class, if
390 * any.
391 */
392 return;
393 }
394
62e8199a 395 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg.type());
fca1d0f5
PP
396
397 /* Get the clock class, if any, of `msg` */
398 const auto clkCls = bt2c::call([msg]() -> bt2::OptionalBorrowedObject<bt2::ConstClockClass> {
399 if (msg.isStreamBeginning()) {
8de58945 400 return msg.asStreamBeginning().streamClassDefaultClockClass();
fca1d0f5
PP
401 } else {
402 BT_ASSERT(msg.isMessageIteratorInactivity());
403 return msg.asMessageIteratorInactivity().clockSnapshot().clockClass();
404 }
405 });
406
407 /* Set the expectation or check it */
408 if (_mClkClsExpectation == _ClkClsExpectation::ANY) {
409 /* First message: set the expectation */
410 this->_setClkClsExpectation(clkCls);
411 } else {
412 /* Make sure clock class is expected */
413 this->_makeSureClkClsIsExpected(msg, clkCls);
414 }
415}
416
417MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
418{
419}
420
421bool MsgIter::_HeapComparator::operator()(
422 const UpstreamMsgIter * const upstreamMsgIterA,
423 const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
424{
425 /* The two messages to compare */
426 const auto msgA = upstreamMsgIterA->msg();
427 const auto msgB = upstreamMsgIterB->msg();
428 auto& msgTsA = upstreamMsgIterA->msgTs();
429 auto& msgTsB = upstreamMsgIterB->msgTs();
430
431 if (_mLogger.wouldLogT()) {
432 BT_CPPLOGT("Comparing two messages: "
433 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
434 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
62e8199a
SM
435 upstreamMsgIterA->portName(), msgA.type(), optMsgTsStr(msgTsA),
436 upstreamMsgIterB->portName(), msgB.type(), optMsgTsStr(msgTsB));
fca1d0f5
PP
437 }
438
439 /*
440 * Try to compare using timestamps.
441 *
442 * If both timestamps are set and their values are different, then
443 * use this to establish the ordering of the two messages.
444 *
445 * If one timestamp is set, but not the other, the latter always
446 * wins. This is because, for a given upstream message iterator, we
447 * need to consume all the messages having no timestamp so that we
448 * can reach a message with a timestamp to compare it.
449 *
450 * Otherwise, we'll fall back to using
451 * common_muxing_compare_messages().
452 */
453 if (G_LIKELY(msgTsA && msgTsB)) {
454 if (*msgTsA < *msgTsB) {
455 /*
456 * Return `true` because `_mHeap.top()` provides the
457 * "greatest" element. For us, the "greatest" message is
458 * the oldest one, that is, the one having the smallest
459 * timestamp.
460 */
461 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
462 return true;
463 } else if (*msgTsA > *msgTsB) {
464 BT_CPPLOGT_STR(
465 "Timestamp of message A is greater than timestamp of message B: oldest=B");
466 return false;
467 }
468 } else if (msgTsA && !msgTsB) {
469 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
470 return false;
471 } else if (!msgTsA && msgTsB) {
472 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
473 return true;
474 }
475
476 /*
477 * Comparison failed using timestamps: determine an ordering using
478 * arbitrary properties, but in a deterministic way.
479 *
480 * common_muxing_compare_messages() returns less than 0 if the first
481 * message is considered older than the second, which corresponds to
482 * this comparator returning `true`.
483 */
484 const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
485
486 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
487 res ? "A" : "B");
488 return res;
489}
490
491} /* namespace bt2mux */
This page took 0.04363 seconds and 4 git commands to generate.