cpp-common/bt2c/fmt.hpp: use `wise_enum::string_type` in `EnableIfIsWiseEnum` definition
[babeltrace.git] / src / plugins / utils / muxer / upstream-msg-iter.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #include <glib.h>
8
9 #include "cpp-common/bt2/optional-borrowed-object.hpp"
10 #include "cpp-common/bt2c/logging.hpp"
11 #include "cpp-common/vendor/fmt/core.h"
12 #include "cpp-common/vendor/fmt/format.h"
13
14 #include "upstream-msg-iter.hpp"
15
16 namespace bt2mux {
17
18 UpstreamMsgIter::UpstreamMsgIter(bt2::MessageIterator::Shared msgIter, std::string portName,
19 const bt2c::Logger& parentLogger) :
20 _mMsgIter {std::move(msgIter)},
21 _mLogger {parentLogger, fmt::format("{}/[{}]", parentLogger.tag(), portName)},
22 _mPortName {std::move(portName)}
23 {
24 BT_CPPLOGI("Created an upstream message iterator: this={}, port-name={}", fmt::ptr(this),
25 _mPortName);
26 }
27
28 namespace {
29
30 /*
31 * Returns the clock snapshot of `msg`, possibly missing.
32 */
33 bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> msgCs(const bt2::ConstMessage msg) noexcept
34 {
35 switch (msg.type()) {
36 case bt2::MessageType::Event:
37 if (msg.asEvent().streamClassDefaultClockClass()) {
38 return msg.asEvent().defaultClockSnapshot();
39 }
40
41 break;
42 case bt2::MessageType::PacketBeginning:
43 if (msg.asPacketBeginning().packet().stream().cls().packetsHaveBeginningClockSnapshot()) {
44 return msg.asPacketBeginning().defaultClockSnapshot();
45 }
46
47 break;
48 case bt2::MessageType::PacketEnd:
49 if (msg.asPacketEnd().packet().stream().cls().packetsHaveEndClockSnapshot()) {
50 return msg.asPacketEnd().defaultClockSnapshot();
51 }
52
53 break;
54 case bt2::MessageType::DiscardedEvents:
55 if (msg.asDiscardedEvents().stream().cls().discardedEventsHaveDefaultClockSnapshots()) {
56 return msg.asDiscardedEvents().beginningDefaultClockSnapshot();
57 }
58
59 break;
60 case bt2::MessageType::DiscardedPackets:
61 if (msg.asDiscardedPackets().stream().cls().discardedPacketsHaveDefaultClockSnapshots()) {
62 return msg.asDiscardedPackets().beginningDefaultClockSnapshot();
63 }
64
65 break;
66 case bt2::MessageType::MessageIteratorInactivity:
67 return msg.asMessageIteratorInactivity().clockSnapshot();
68 case bt2::MessageType::StreamBeginning:
69 if (msg.asStreamBeginning().streamClassDefaultClockClass()) {
70 return msg.asStreamBeginning().defaultClockSnapshot();
71 }
72
73 break;
74 case bt2::MessageType::StreamEnd:
75 if (msg.asStreamEnd().streamClassDefaultClockClass()) {
76 return msg.asStreamEnd().defaultClockSnapshot();
77 }
78
79 break;
80 default:
81 bt_common_abort();
82 }
83
84 return {};
85 }
86
87 } /* namespace */
88
89 UpstreamMsgIter::ReloadStatus UpstreamMsgIter::reload()
90 {
91 BT_ASSERT_DBG(!_mDiscardRequired);
92
93 if (G_UNLIKELY(!_mMsgs.msgs)) {
94 /*
95 * This will either:
96 *
97 * 1. Set `_mMsgs.msgs` to new messages (we'll return
98 * `ReloadStatus::MORE`).
99 *
100 * 2. Not set `_mMsgs.msgs` (ended, we'll return
101 * `ReloadStatus::NO_MORE`).
102 *
103 * 3. Throw.
104 */
105 this->_tryGetNewMsgs();
106 }
107
108 if (G_UNLIKELY(!_mMsgs.msgs)) {
109 /* Still none: no more */
110 _mMsgTs.reset();
111 return ReloadStatus::NoMore;
112 } else {
113 if (const auto cs = msgCs(this->msg())) {
114 _mMsgTs = cs->nsFromOrigin();
115 BT_CPPLOGD("Cached the timestamp of the current message: this={}, ts={}",
116 fmt::ptr(this), *_mMsgTs);
117 } else {
118 _mMsgTs.reset();
119 BT_CPPLOGD("Reset the timestamp of the current message: this={}", fmt::ptr(this));
120 }
121
122 _mDiscardRequired = true;
123 return ReloadStatus::More;
124 }
125 }
126
127 void UpstreamMsgIter::_tryGetNewMsgs()
128 {
129 BT_ASSERT_DBG(_mMsgIter);
130 BT_CPPLOGD("Calling the \"next\" method of the upstream message iterator: this={}",
131 fmt::ptr(this));
132
133 /*
134 * Replace with next batch!
135 *
136 * This may throw, in which case we'll keep our current
137 * `_mMsgs.msgs` (set), still requiring to get new messages the next
138 * time the user calls reload().
139 */
140 _mMsgs.msgs = _mMsgIter->next();
141
142 if (!_mMsgs.msgs) {
143 /*
144 * Don't destroy `*_mMsgIter` here because the user may still
145 * call seekBeginning() afterwards.
146 */
147 BT_CPPLOGD("End of upstream message iterator: this={}", fmt::ptr(this));
148 return;
149 }
150
151 _mMsgs.index = 0;
152 BT_CPPLOGD("Got {1} messages from upstream: this={0}, count={1}", fmt::ptr(this),
153 _mMsgs.msgs->length());
154 }
155
156 bool UpstreamMsgIter::canSeekBeginning()
157 {
158 return _mMsgIter->canSeekBeginning();
159 }
160
161 void UpstreamMsgIter::seekBeginning()
162 {
163 _mMsgIter->seekBeginning();
164 _mMsgs.msgs.reset();
165 _mMsgTs.reset();
166 _mDiscardRequired = false;
167 }
168
169 bool UpstreamMsgIter::canSeekForward() const noexcept
170 {
171 return _mMsgIter->canSeekForward();
172 }
173
174 } /* namespace bt2mux */
This page took 0.052146 seconds and 4 git commands to generate.