flt.utils.muxer: don't reset clock expectation when seeking beginning
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
CommitLineData
fca1d0f5
PP
1/*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7#include <algorithm>
8
9#include <glib.h>
10
11#include <babeltrace2/babeltrace.h>
12
13#include "common/common.h"
14#include "cpp-common/bt2c/call.hpp"
62e8199a 15#include "cpp-common/bt2c/fmt.hpp"
fca1d0f5
PP
16#include "cpp-common/bt2s/make-unique.hpp"
17#include "cpp-common/vendor/fmt/format.h"
18
19#include "plugins/common/muxing/muxing.h"
20
21#include "comp.hpp"
22#include "msg-iter.hpp"
23
24namespace bt2mux {
25
26MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
27 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
28 bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
29 _mHeap {_HeapComparator {_mLogger}}
30{
31 /*
32 * Create one upstream message iterator for each connected
33 * input port.
34 */
35 auto canSeekForward = true;
36
37 for (const auto inputPort : this->_component()._inputPorts()) {
38 if (!inputPort.isConnected()) {
39 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
40 continue;
41 }
42
43 /*
44 * Create new upstream message iterator and immediately make it
45 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
46 * deal with it when downstream calls next()).
47 */
48 auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
49 this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
50
51 canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
52 _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
53 _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
54 }
55
56 /* Set the "can seek forward" configuration */
57 cfg.canSeekForward(canSeekForward);
58}
59
60namespace {
61
62std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
63{
64 if (ts) {
65 return fmt::to_string(*ts);
66 }
67
68 return "none";
69}
70
71} /* namespace */
72
73void MsgIter::_next(bt2::ConstMessageArray& msgs)
74{
75 /* Make sure all upstream message iterators are part of the heap */
76 this->_ensureFullHeap();
77
78 while (msgs.length() < msgs.capacity()) {
79 /* Empty heap? */
80 if (G_UNLIKELY(_mHeap.isEmpty())) {
81 /* No more upstream messages! */
82 return;
83 }
84
85 /*
86 * Retrieve the upstream message iterator having the oldest message.
87 */
88 auto& oldestUpstreamMsgIter = *_mHeap.top();
89
90 /* Validate the clock class of the oldest message */
91 this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
92
93 /* Append the oldest message and discard it */
94 msgs.append(oldestUpstreamMsgIter.msg().shared());
95
96 if (_mLogger.wouldLogD()) {
97 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
98 oldestUpstreamMsgIter.portName(),
99 optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
100 }
101
102 oldestUpstreamMsgIter.discard();
103
104 /*
105 * Immediately try to reload `oldestUpstreamMsgIter`.
106 *
107 * The possible outcomes are:
108 *
109 * There's an available message:
110 * Call `_mHeap.replaceTop()` to bring
111 * `oldestUpstreamMsgIter` back to the heap, performing a
112 * single heap rebalance.
113 *
114 * There isn't an available message (ended):
115 * Remove `oldestUpstreamMsgIter` from the heap.
116 *
117 * `bt2::TryAgain` is thrown:
118 * Remove `oldestUpstreamMsgIter` from the heap.
119 *
120 * Add `oldestUpstreamMsgIter` to the set of upstream
121 * message iterators to reload. The next call to _next()
122 * will move it to the heap again (if not ended) after
123 * having successfully called reload().
124 */
125 BT_CPPLOGD(
126 "Trying to reload upstream message iterator having the oldest message: port-name={}",
127 oldestUpstreamMsgIter.portName());
128
129 try {
130 if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
131 /* New current message: update heap */
132 _mHeap.replaceTop(&oldestUpstreamMsgIter);
133 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
134 oldestUpstreamMsgIter.portName(), _mHeap.len());
135 } else {
136 _mHeap.removeTop();
137 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
138 "port-name{}, heap-len={}",
139 oldestUpstreamMsgIter.portName(), _mHeap.len());
140 }
141 } catch (const bt2::TryAgain&) {
142 _mHeap.removeTop();
143 _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
144 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
145 "port-name={}, heap-len={}, to-reload-len={}",
146 oldestUpstreamMsgIter.portName(), _mHeap.len(),
147 _mUpstreamMsgItersToReload.size());
148 throw;
149 }
150 }
151}
152
153void MsgIter::_ensureFullHeap()
154{
155 /*
156 * Always remove from `_mUpstreamMsgItersToReload` when reload()
157 * doesn't throw.
158 *
159 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
160 * then we don't need it anymore (remains alive in
161 * `_mUpstreamMsgIters`).
162 */
163 for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
164 it = _mUpstreamMsgItersToReload.erase(it)) {
165 auto& upstreamMsgIter = **it;
166
167 BT_CPPLOGD("Handling upstream message iterator to reload: "
168 "port-name={}, heap-len={}, to-reload-len={}",
169 upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
170
171 if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
172 /* New current message: move to heap */
173 _mHeap.insert(&upstreamMsgIter);
174 BT_CPPLOGD("More messages available; "
175 "inserted upstream message iterator into heap from \"to reload\" set: "
176 "port-name={}, heap-len={}",
177 upstreamMsgIter.portName(), _mHeap.len());
178 } else {
179 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
180 "port-name={}",
181 upstreamMsgIter.portName());
182 }
183 }
184}
185
186bool MsgIter::_canSeekBeginning()
187{
188 /*
189 * We can only seek our beginning if all our upstream message
190 * iterators also can.
191 */
192 return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
193 [](UpstreamMsgIter::UP& upstreamMsgIter) {
194 return upstreamMsgIter->canSeekBeginning();
195 });
196}
197
198void MsgIter::_seekBeginning()
199{
200 /*
201 * The current approach is that this operation is either successful
202 * (all upstream message iterators seek) or not. If it's not, then
203 * we don't keep any state that some sought and some didn't: we'll
204 * restart the whole process when the user tries to seek again.
205 *
206 * The first step is to clear all the containers of upstream message
207 * iterator pointers so that we can process what's in
208 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
209 * if any seeking fails below, the downstream user is required to
210 * try the "seek beginning" operation again and only call
211 * bt_message_iterator_next() if it was successful.
212 *
213 * This means if the first four upstream message iterators seek, and
214 * then the fifth one throws `bt2::TryAgain`, then the next time
215 * this method executes, the first four upstream message iterators
216 * will seek again. That being said, it's such an unlikely scenario
217 * that the simplicity outweighs performance concerns here.
218 */
219 _mHeap.clear();
220 _mUpstreamMsgItersToReload.clear();
221
fca1d0f5
PP
222 /* Make each upstream message iterator seek */
223 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
224 /* This may throw! */
225 upstreamMsgIter->seekBeginning();
226 }
227
228 /*
229 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
230 * next call to _next() will deal with those.
231 */
232 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
233 _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
234 }
235}
236
237namespace {
238
fca1d0f5
PP
239std::string optLogStr(const char * const str) noexcept
240{
241 return str ? fmt::format("\"{}\"", str) : "(none)";
242}
243
244} /* namespace */
245
246void MsgIter::_setClkClsExpectation(
247 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) noexcept
248{
249 BT_ASSERT_DBG(_mClkClsExpectation == _ClkClsExpectation::ANY);
250
251 /* No initial clock class: also expect none afterwards */
252 if (!clkCls) {
253 _mClkClsExpectation = _ClkClsExpectation::NONE;
254 return;
255 }
256
257 /*
258 * This is the first clock class that this message iterator
259 * encounters. Its properties determine what to expect for the whole
260 * lifetime of the iterator.
261 */
262 if (clkCls->originIsUnixEpoch()) {
263 /* Expect clock classes having a Unix epoch origin*/
264 _mClkClsExpectation = _ClkClsExpectation::ORIG_IS_UNIX_EPOCH;
265 } else {
266 if (clkCls->uuid()) {
267 /*
268 * Expect clock classes not having a Unix epoch origin and
269 * with a specific UUID.
270 */
271 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID;
272 _mExpectedClkClsUuid = *clkCls->uuid();
273 } else {
274 /*
275 * Expect clock classes not having a Unix epoch origin and
276 * without a UUID.
277 */
278 _mClkClsExpectation = _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID;
0f635209 279 _mExpectedClkCls = clkCls->shared();
fca1d0f5
PP
280 }
281 }
282}
283
284void MsgIter::_makeSureClkClsIsExpected(
285 const bt2::ConstMessage msg,
286 const bt2::OptionalBorrowedObject<bt2::ConstClockClass> clkCls) const
287{
288 BT_ASSERT_DBG(_mClkClsExpectation != _ClkClsExpectation::ANY);
289
290 if (!clkCls) {
291 if (_mClkClsExpectation != _ClkClsExpectation::NONE) {
292 /*
293 * `msg` is a stream beginning message because a message
294 * iterator inactivity message always has a clock class.
295 */
296 const auto streamCls = msg.asStreamBeginning().stream().cls();
297
298 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
299 "Expecting a clock class, but got none: "
300 "stream-class-addr={}, stream-class-name=\"{}\", "
301 "stream-class-id={}",
302 static_cast<const void *>(streamCls.libObjPtr()),
303 optLogStr(streamCls.name()), streamCls.id());
304 }
305
306 return;
307 }
308
309 const auto clkClsAddr = static_cast<const void *>(clkCls->libObjPtr());
310
311 switch (_mClkClsExpectation) {
312 case _ClkClsExpectation::ORIG_IS_UNIX_EPOCH:
313 if (!clkCls->originIsUnixEpoch()) {
314 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
315 "Expecting a clock class having a Unix epoch origin, "
316 "but got one not having a Unix epoch origin: "
317 "clock-class-addr={}, clock-class-name={}",
318 clkClsAddr, optLogStr(clkCls->name()));
319 }
320
321 break;
322 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_NO_UUID:
323 BT_ASSERT_DBG(!_mExpectedClkClsUuid);
0f635209 324 BT_ASSERT_DBG(_mExpectedClkCls);
fca1d0f5 325
0f635209 326 if (clkCls->libObjPtr() != _mExpectedClkCls->libObjPtr()) {
fca1d0f5
PP
327 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
328 bt2::Error,
0f635209
SM
329 "Unexpected clock class: "
330 "expected-clock-class-addr={}, expected-clock-class-name={}, "
331 "actual-clock-class-addr={}, actual-clock-class-name={}",
332 fmt::ptr(_mExpectedClkCls->libObjPtr()), optLogStr(_mExpectedClkCls->name()),
fca1d0f5
PP
333 clkClsAddr, optLogStr(clkCls->name()));
334 }
335
fca1d0f5
PP
336 break;
337 case _ClkClsExpectation::ORIG_ISNT_UNIX_EPOCH_AND_SPEC_UUID:
338 BT_ASSERT_DBG(_mExpectedClkClsUuid);
0f635209 339 BT_ASSERT_DBG(!_mExpectedClkCls);
fca1d0f5
PP
340
341 if (clkCls->originIsUnixEpoch()) {
342 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
343 bt2::Error,
344 "Expecting a clock class not having a Unix epoch origin, "
345 "but got one having a Unix epoch origin: "
346 "clock-class-addr={}, clock-class-name={}",
347 clkClsAddr, optLogStr(clkCls->name()));
348 }
349
350 if (!clkCls->uuid()) {
351 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
352 bt2::Error,
353 "Expecting a clock class with a UUID, but got one without a UUID: "
354 "clock-class-addr={}, clock-class-name={}",
355 clkClsAddr, optLogStr(clkCls->name()));
356 }
357
358 if (*clkCls->uuid() != bt2c::UuidView {*_mExpectedClkClsUuid}) {
359 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
360 "Expecting a clock class with a specific UUID, "
361 "but got one with a different UUID: "
362 "clock-class-addr={}, clock-class-name={}, "
363 "expected-uuid=\"{}\", uuid=\"{}\"",
364 clkClsAddr, optLogStr(clkCls->name()),
365 _mExpectedClkClsUuid->str(), clkCls->uuid()->str());
366 }
367
368 break;
369 case _ClkClsExpectation::NONE:
370 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
371 "Expecting no clock class, but got one: "
372 "clock-class-addr={}, clock-class-name={}",
373 clkClsAddr, optLogStr(clkCls->name()));
374 break;
375 default:
376 bt_common_abort();
377 }
378}
379
380void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
381{
382 if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
383 /*
384 * We don't care about the other types: all the messages related
385 * to a given stream shared the same default clock class, if
386 * any.
387 */
388 return;
389 }
390
62e8199a 391 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg.type());
fca1d0f5
PP
392
393 /* Get the clock class, if any, of `msg` */
394 const auto clkCls = bt2c::call([msg]() -> bt2::OptionalBorrowedObject<bt2::ConstClockClass> {
395 if (msg.isStreamBeginning()) {
8de58945 396 return msg.asStreamBeginning().streamClassDefaultClockClass();
fca1d0f5
PP
397 } else {
398 BT_ASSERT(msg.isMessageIteratorInactivity());
399 return msg.asMessageIteratorInactivity().clockSnapshot().clockClass();
400 }
401 });
402
403 /* Set the expectation or check it */
404 if (_mClkClsExpectation == _ClkClsExpectation::ANY) {
405 /* First message: set the expectation */
406 this->_setClkClsExpectation(clkCls);
407 } else {
408 /* Make sure clock class is expected */
409 this->_makeSureClkClsIsExpected(msg, clkCls);
410 }
411}
412
413MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
414{
415}
416
417bool MsgIter::_HeapComparator::operator()(
418 const UpstreamMsgIter * const upstreamMsgIterA,
419 const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
420{
421 /* The two messages to compare */
422 const auto msgA = upstreamMsgIterA->msg();
423 const auto msgB = upstreamMsgIterB->msg();
424 auto& msgTsA = upstreamMsgIterA->msgTs();
425 auto& msgTsB = upstreamMsgIterB->msgTs();
426
427 if (_mLogger.wouldLogT()) {
428 BT_CPPLOGT("Comparing two messages: "
429 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
430 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
62e8199a
SM
431 upstreamMsgIterA->portName(), msgA.type(), optMsgTsStr(msgTsA),
432 upstreamMsgIterB->portName(), msgB.type(), optMsgTsStr(msgTsB));
fca1d0f5
PP
433 }
434
435 /*
436 * Try to compare using timestamps.
437 *
438 * If both timestamps are set and their values are different, then
439 * use this to establish the ordering of the two messages.
440 *
441 * If one timestamp is set, but not the other, the latter always
442 * wins. This is because, for a given upstream message iterator, we
443 * need to consume all the messages having no timestamp so that we
444 * can reach a message with a timestamp to compare it.
445 *
446 * Otherwise, we'll fall back to using
447 * common_muxing_compare_messages().
448 */
449 if (G_LIKELY(msgTsA && msgTsB)) {
450 if (*msgTsA < *msgTsB) {
451 /*
452 * Return `true` because `_mHeap.top()` provides the
453 * "greatest" element. For us, the "greatest" message is
454 * the oldest one, that is, the one having the smallest
455 * timestamp.
456 */
457 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
458 return true;
459 } else if (*msgTsA > *msgTsB) {
460 BT_CPPLOGT_STR(
461 "Timestamp of message A is greater than timestamp of message B: oldest=B");
462 return false;
463 }
464 } else if (msgTsA && !msgTsB) {
465 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
466 return false;
467 } else if (!msgTsA && msgTsB) {
468 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
469 return true;
470 }
471
472 /*
473 * Comparison failed using timestamps: determine an ordering using
474 * arbitrary properties, but in a deterministic way.
475 *
476 * common_muxing_compare_messages() returns less than 0 if the first
477 * message is considered older than the second, which corresponds to
478 * this comparator returning `true`.
479 */
480 const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
481
482 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
483 res ? "A" : "B");
484 return res;
485}
486
487} /* namespace bt2mux */
This page took 0.043623 seconds and 4 git commands to generate.