flt.utils.muxer: use clock correlation validator
[babeltrace.git] / src / plugins / utils / muxer / msg-iter.cpp
CommitLineData
fca1d0f5
PP
1/*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7#include <algorithm>
8
9#include <glib.h>
10
11#include <babeltrace2/babeltrace.h>
12
62e8199a 13#include "cpp-common/bt2c/fmt.hpp"
fca1d0f5
PP
14#include "cpp-common/bt2s/make-unique.hpp"
15#include "cpp-common/vendor/fmt/format.h"
16
17#include "plugins/common/muxing/muxing.h"
18
19#include "comp.hpp"
20#include "msg-iter.hpp"
21
22namespace bt2mux {
23
24MsgIter::MsgIter(const bt2::SelfMessageIterator selfMsgIter,
25 const bt2::SelfMessageIteratorConfiguration cfg, bt2::SelfComponentOutputPort) :
26 bt2::UserMessageIterator<MsgIter, Comp> {selfMsgIter, "MSG-ITER"},
27 _mHeap {_HeapComparator {_mLogger}}
28{
29 /*
30 * Create one upstream message iterator for each connected
31 * input port.
32 */
33 auto canSeekForward = true;
34
35 for (const auto inputPort : this->_component()._inputPorts()) {
36 if (!inputPort.isConnected()) {
37 BT_CPPLOGI("Ignoring disconnected port: name={}", inputPort.name());
38 continue;
39 }
40
41 /*
42 * Create new upstream message iterator and immediately make it
43 * part of `_mUpstreamMsgItersToReload` (_ensureFullHeap() will
44 * deal with it when downstream calls next()).
45 */
46 auto upstreamMsgIter = bt2s::make_unique<UpstreamMsgIter>(
47 this->_createMessageIterator(inputPort), inputPort.name(), _mLogger);
48
49 canSeekForward = canSeekForward && upstreamMsgIter->canSeekForward();
50 _mUpstreamMsgItersToReload.emplace_back(upstreamMsgIter.get());
51 _mUpstreamMsgIters.push_back(std::move(upstreamMsgIter));
52 }
53
54 /* Set the "can seek forward" configuration */
55 cfg.canSeekForward(canSeekForward);
56}
57
58namespace {
59
60std::string optMsgTsStr(const bt2s::optional<std::int64_t>& ts)
61{
62 if (ts) {
63 return fmt::to_string(*ts);
64 }
65
66 return "none";
67}
68
69} /* namespace */
70
71void MsgIter::_next(bt2::ConstMessageArray& msgs)
72{
73 /* Make sure all upstream message iterators are part of the heap */
74 this->_ensureFullHeap();
75
76 while (msgs.length() < msgs.capacity()) {
77 /* Empty heap? */
78 if (G_UNLIKELY(_mHeap.isEmpty())) {
79 /* No more upstream messages! */
80 return;
81 }
82
83 /*
84 * Retrieve the upstream message iterator having the oldest message.
85 */
86 auto& oldestUpstreamMsgIter = *_mHeap.top();
87
88 /* Validate the clock class of the oldest message */
89 this->_validateMsgClkCls(oldestUpstreamMsgIter.msg());
90
91 /* Append the oldest message and discard it */
92 msgs.append(oldestUpstreamMsgIter.msg().shared());
93
94 if (_mLogger.wouldLogD()) {
95 BT_CPPLOGD("Appended message to array: port-name={}, ts={}",
96 oldestUpstreamMsgIter.portName(),
97 optMsgTsStr(oldestUpstreamMsgIter.msgTs()));
98 }
99
100 oldestUpstreamMsgIter.discard();
101
102 /*
103 * Immediately try to reload `oldestUpstreamMsgIter`.
104 *
105 * The possible outcomes are:
106 *
107 * There's an available message:
108 * Call `_mHeap.replaceTop()` to bring
109 * `oldestUpstreamMsgIter` back to the heap, performing a
110 * single heap rebalance.
111 *
112 * There isn't an available message (ended):
113 * Remove `oldestUpstreamMsgIter` from the heap.
114 *
115 * `bt2::TryAgain` is thrown:
116 * Remove `oldestUpstreamMsgIter` from the heap.
117 *
118 * Add `oldestUpstreamMsgIter` to the set of upstream
119 * message iterators to reload. The next call to _next()
120 * will move it to the heap again (if not ended) after
121 * having successfully called reload().
122 */
123 BT_CPPLOGD(
124 "Trying to reload upstream message iterator having the oldest message: port-name={}",
125 oldestUpstreamMsgIter.portName());
126
127 try {
128 if (G_LIKELY(oldestUpstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
129 /* New current message: update heap */
130 _mHeap.replaceTop(&oldestUpstreamMsgIter);
131 BT_CPPLOGD("More messages available; updated heap: port-name={}, heap-len={}",
132 oldestUpstreamMsgIter.portName(), _mHeap.len());
133 } else {
134 _mHeap.removeTop();
135 BT_CPPLOGD("Upstream message iterator has no more messages; removed from heap: "
136 "port-name{}, heap-len={}",
137 oldestUpstreamMsgIter.portName(), _mHeap.len());
138 }
139 } catch (const bt2::TryAgain&) {
140 _mHeap.removeTop();
141 _mUpstreamMsgItersToReload.push_back(&oldestUpstreamMsgIter);
142 BT_CPPLOGD("Moved upstream message iterator from heap to \"to reload\" set: "
143 "port-name={}, heap-len={}, to-reload-len={}",
144 oldestUpstreamMsgIter.portName(), _mHeap.len(),
145 _mUpstreamMsgItersToReload.size());
146 throw;
147 }
148 }
149}
150
151void MsgIter::_ensureFullHeap()
152{
153 /*
154 * Always remove from `_mUpstreamMsgItersToReload` when reload()
155 * doesn't throw.
156 *
157 * If reload() returns `UpstreamMsgIter::ReloadStatus::NO_MORE`,
158 * then we don't need it anymore (remains alive in
159 * `_mUpstreamMsgIters`).
160 */
161 for (auto it = _mUpstreamMsgItersToReload.begin(); it != _mUpstreamMsgItersToReload.end();
162 it = _mUpstreamMsgItersToReload.erase(it)) {
163 auto& upstreamMsgIter = **it;
164
165 BT_CPPLOGD("Handling upstream message iterator to reload: "
166 "port-name={}, heap-len={}, to-reload-len={}",
167 upstreamMsgIter.portName(), _mHeap.len(), _mUpstreamMsgItersToReload.size());
168
169 if (G_LIKELY(upstreamMsgIter.reload() == UpstreamMsgIter::ReloadStatus::MORE)) {
170 /* New current message: move to heap */
171 _mHeap.insert(&upstreamMsgIter);
172 BT_CPPLOGD("More messages available; "
173 "inserted upstream message iterator into heap from \"to reload\" set: "
174 "port-name={}, heap-len={}",
175 upstreamMsgIter.portName(), _mHeap.len());
176 } else {
177 BT_CPPLOGD("Not inserting upstream message iterator into heap (no more messages): "
178 "port-name={}",
179 upstreamMsgIter.portName());
180 }
181 }
182}
183
184bool MsgIter::_canSeekBeginning()
185{
186 /*
187 * We can only seek our beginning if all our upstream message
188 * iterators also can.
189 */
190 return std::all_of(_mUpstreamMsgIters.begin(), _mUpstreamMsgIters.end(),
191 [](UpstreamMsgIter::UP& upstreamMsgIter) {
192 return upstreamMsgIter->canSeekBeginning();
193 });
194}
195
196void MsgIter::_seekBeginning()
197{
198 /*
199 * The current approach is that this operation is either successful
200 * (all upstream message iterators seek) or not. If it's not, then
201 * we don't keep any state that some sought and some didn't: we'll
202 * restart the whole process when the user tries to seek again.
203 *
204 * The first step is to clear all the containers of upstream message
205 * iterator pointers so that we can process what's in
206 * `_mUpstreamMsgIters` only. This is irreversible, but it's okay:
207 * if any seeking fails below, the downstream user is required to
208 * try the "seek beginning" operation again and only call
209 * bt_message_iterator_next() if it was successful.
210 *
211 * This means if the first four upstream message iterators seek, and
212 * then the fifth one throws `bt2::TryAgain`, then the next time
213 * this method executes, the first four upstream message iterators
214 * will seek again. That being said, it's such an unlikely scenario
215 * that the simplicity outweighs performance concerns here.
216 */
217 _mHeap.clear();
218 _mUpstreamMsgItersToReload.clear();
219
fca1d0f5
PP
220 /* Make each upstream message iterator seek */
221 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
222 /* This may throw! */
223 upstreamMsgIter->seekBeginning();
224 }
225
226 /*
227 * All sought successfully: fill `_mUpstreamMsgItersToReload`; the
228 * next call to _next() will deal with those.
229 */
230 for (auto& upstreamMsgIter : _mUpstreamMsgIters) {
231 _mUpstreamMsgItersToReload.push_back(upstreamMsgIter.get());
232 }
233}
234
df8863ad 235void MsgIter::_validateMsgClkCls(const bt2::ConstMessage msg)
fca1d0f5 236{
df8863ad
SM
237 if (G_LIKELY(!msg.isStreamBeginning() && !msg.isMessageIteratorInactivity())) {
238 /*
239 * We don't care about the other types: all the messages related
240 * to a given stream shared the same default clock class, if
241 * any.
242 */
fca1d0f5
PP
243 return;
244 }
245
df8863ad 246 BT_CPPLOGD("Validating the clock class of a message: msg-type={}", msg.type());
fca1d0f5 247
df8863ad
SM
248 try {
249 _mClkCorrValidator.validate(msg);
250 } catch (const bt2ccv::ClockCorrelationError& error) {
251 using Type = bt2ccv::ClockCorrelationError::Type;
252
253 const auto actualClockCls = error.actualClockCls();
254
255 switch (error.type()) {
256 case Type::EXPECTING_NO_CLOCK_CLASS_GOT_ONE:
257 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
258 "Expecting no clock class, but got one: "
259 "clock-class-addr={}, clock-class-name={}",
260 fmt::ptr(actualClockCls->libObjPtr()),
261 actualClockCls->name());
fca1d0f5 262
df8863ad
SM
263 case Type::EXPECTING_ORIGIN_UNIX_GOT_NONE:
264 case Type::EXPECTING_ORIGIN_UUID_GOT_NONE:
265 case Type::EXPECTING_ORIGIN_NO_UUID_GOT_NONE:
266 {
267 const auto streamCls = *error.streamCls();
fca1d0f5
PP
268
269 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
270 "Expecting a clock class, but got none: "
271 "stream-class-addr={}, stream-class-name=\"{}\", "
272 "stream-class-id={}",
df8863ad
SM
273 fmt::ptr(streamCls.libObjPtr()), streamCls.name(),
274 streamCls.id());
fca1d0f5
PP
275 }
276
df8863ad 277 case Type::EXPECTING_ORIGIN_UNIX_GOT_OTHER:
fca1d0f5
PP
278 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
279 "Expecting a clock class having a Unix epoch origin, "
280 "but got one not having a Unix epoch origin: "
281 "clock-class-addr={}, clock-class-name={}",
df8863ad
SM
282 fmt::ptr(actualClockCls->libObjPtr()),
283 actualClockCls->name());
fca1d0f5 284
df8863ad 285 case Type::EXPECTING_ORIGIN_UUID_GOT_UNIX:
fca1d0f5
PP
286 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
287 bt2::Error,
288 "Expecting a clock class not having a Unix epoch origin, "
289 "but got one having a Unix epoch origin: "
290 "clock-class-addr={}, clock-class-name={}",
df8863ad 291 fmt::ptr(actualClockCls->libObjPtr()), actualClockCls->name());
fca1d0f5 292
df8863ad 293 case Type::EXPECTING_ORIGIN_UUID_GOT_NO_UUID:
fca1d0f5
PP
294 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
295 bt2::Error,
296 "Expecting a clock class with a UUID, but got one without a UUID: "
297 "clock-class-addr={}, clock-class-name={}",
df8863ad 298 fmt::ptr(actualClockCls->libObjPtr()), actualClockCls->name());
fca1d0f5 299
df8863ad 300 case Type::EXPECTING_ORIGIN_UUID_GOT_OTHER_UUID:
fca1d0f5
PP
301 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error,
302 "Expecting a clock class with a specific UUID, "
303 "but got one with a different UUID: "
304 "clock-class-addr={}, clock-class-name={}, "
305 "expected-uuid=\"{}\", uuid=\"{}\"",
df8863ad
SM
306 fmt::ptr(actualClockCls->libObjPtr()),
307 actualClockCls->name(), *error.expectedUuid(),
308 *actualClockCls->uuid());
fca1d0f5 309
df8863ad
SM
310 case Type::EXPECTING_ORIGIN_NO_UUID_GOT_OTHER:
311 {
312 const auto expectedClockCls = error.expectedClockCls();
fca1d0f5 313
df8863ad
SM
314 BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
315 bt2::Error,
316 "Unexpected clock class: "
317 "expected-clock-class-addr={}, expected-clock-class-name={}, "
318 "actual-clock-class-addr={}, actual-clock-class-name={}",
319 fmt::ptr(expectedClockCls->libObjPtr()), expectedClockCls->name(),
320 fmt::ptr(actualClockCls->libObjPtr()), actualClockCls->name());
321 }
fca1d0f5 322 }
fca1d0f5
PP
323 }
324}
325
326MsgIter::_HeapComparator::_HeapComparator(const bt2c::Logger& logger) : _mLogger {logger}
327{
328}
329
330bool MsgIter::_HeapComparator::operator()(
331 const UpstreamMsgIter * const upstreamMsgIterA,
332 const UpstreamMsgIter * const upstreamMsgIterB) const noexcept
333{
334 /* The two messages to compare */
335 const auto msgA = upstreamMsgIterA->msg();
336 const auto msgB = upstreamMsgIterB->msg();
337 auto& msgTsA = upstreamMsgIterA->msgTs();
338 auto& msgTsB = upstreamMsgIterB->msgTs();
339
340 if (_mLogger.wouldLogT()) {
341 BT_CPPLOGT("Comparing two messages: "
342 "port-name-a={}, msg-a-type={}, msg-a-ts={}, "
343 "port-name-b={}, msg-b-type={}, msg-b-ts={}",
62e8199a
SM
344 upstreamMsgIterA->portName(), msgA.type(), optMsgTsStr(msgTsA),
345 upstreamMsgIterB->portName(), msgB.type(), optMsgTsStr(msgTsB));
fca1d0f5
PP
346 }
347
348 /*
349 * Try to compare using timestamps.
350 *
351 * If both timestamps are set and their values are different, then
352 * use this to establish the ordering of the two messages.
353 *
354 * If one timestamp is set, but not the other, the latter always
355 * wins. This is because, for a given upstream message iterator, we
356 * need to consume all the messages having no timestamp so that we
357 * can reach a message with a timestamp to compare it.
358 *
359 * Otherwise, we'll fall back to using
360 * common_muxing_compare_messages().
361 */
362 if (G_LIKELY(msgTsA && msgTsB)) {
363 if (*msgTsA < *msgTsB) {
364 /*
365 * Return `true` because `_mHeap.top()` provides the
366 * "greatest" element. For us, the "greatest" message is
367 * the oldest one, that is, the one having the smallest
368 * timestamp.
369 */
370 BT_CPPLOGT_STR("Timestamp of message A is less than timestamp of message B: oldest=A");
371 return true;
372 } else if (*msgTsA > *msgTsB) {
373 BT_CPPLOGT_STR(
374 "Timestamp of message A is greater than timestamp of message B: oldest=B");
375 return false;
376 }
377 } else if (msgTsA && !msgTsB) {
378 BT_CPPLOGT_STR("Message A has a timestamp, but message B has none: oldest=B");
379 return false;
380 } else if (!msgTsA && msgTsB) {
381 BT_CPPLOGT_STR("Message B has a timestamp, but message A has none: oldest=A");
382 return true;
383 }
384
385 /*
386 * Comparison failed using timestamps: determine an ordering using
387 * arbitrary properties, but in a deterministic way.
388 *
389 * common_muxing_compare_messages() returns less than 0 if the first
390 * message is considered older than the second, which corresponds to
391 * this comparator returning `true`.
392 */
393 const auto res = common_muxing_compare_messages(msgA.libObjPtr(), msgB.libObjPtr()) < 0;
394
395 BT_CPPLOGT("Timestamps are considered equal; comparing other properties: oldest={}",
396 res ? "A" : "B");
397 return res;
398}
399
400} /* namespace bt2mux */
This page took 0.040722 seconds and 4 git commands to generate.