flt.utils.muxer: don't reset clock expectation when seeking beginning
[babeltrace.git] / src / plugins / utils / muxer / upstream-msg-iter.hpp
1
2 /*
3 * SPDX-License-Identifier: MIT
4 *
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
7 */
8
9 #ifndef BABELTRACE_PLUGINS_UTILS_MUXER_UPSTREAM_MSG_ITER_HPP
10 #define BABELTRACE_PLUGINS_UTILS_MUXER_UPSTREAM_MSG_ITER_HPP
11
12 #include <memory>
13
14 #include "common/assert.h"
15 #include "cpp-common/bt2/message-array.hpp"
16 #include "cpp-common/bt2/message-iterator.hpp"
17 #include "cpp-common/bt2c/logging.hpp"
18 #include "cpp-common/bt2s/optional.hpp"
19
20 namespace bt2mux {
21
22 /*
23 * An instance of this wraps an upstream libbabeltrace2 message
24 * iterator, keeping an internal array of receives messages, and making
25 * the oldest one available (msg() method).
26 */
27 class UpstreamMsgIter final
28 {
29 public:
30 /* Unique pointer to upstream message iterator */
31 using UP = std::unique_ptr<UpstreamMsgIter>;
32
33 /* Return type of reload() */
34 enum class ReloadStatus
35 {
36 MORE,
37 NO_MORE,
38 };
39
40 /*
41 * Builds an upstream message iterator wrapper using the
42 * libbabeltrace2 message iterator `msgIter`.
43 *
44 * This constructor doesn't immediately gets the next messages from
45 * `*msgIter` (you always need to call reload() before you call
46 * msg()), therefore it won't throw `bt2::Error` or `bt2::TryAgain`.
47 */
48 explicit UpstreamMsgIter(bt2::MessageIterator::Shared msgIter, std::string portName,
49 const bt2c::Logger& parentLogger);
50
51 /* Some protection */
52 UpstreamMsgIter(const UpstreamMsgIter&) = delete;
53 UpstreamMsgIter& operator=(const UpstreamMsgIter&) = delete;
54
55 /*
56 * Current message.
57 *
58 * Before you call this method:
59 *
60 * 1. If needed, you must call discard().
61 *
62 * This is not the case immediately after construction and
63 * immediately after seeking.
64 *
65 * 2. You must call reload() successfully (not ended).
66 *
67 * This is always the case.
68 *
69 * This makes it possible to build an `UpstreamMsgIter` instance
70 * without libbabeltrace2 message iterator exceptions.
71 */
72 bt2::ConstMessage msg() const noexcept
73 {
74 BT_ASSERT_DBG(_mMsgs.msgs && _mMsgs.index < _mMsgs.msgs->length());
75 return (*_mMsgs.msgs)[_mMsgs.index];
76 }
77
78 /*
79 * Timestamp, if any, of the current message.
80 *
81 * It must be valid to call msg() when you call this method.
82 */
83 const bt2s::optional<std::int64_t> msgTs() const noexcept
84 {
85 return _mMsgTs;
86 }
87
88 /*
89 * Discards the current message, making this upstream message
90 * iterator ready for a reload (reload()).
91 *
92 * You may only call reload() or seekBeginning() after having called
93 * this.
94 */
95 void discard() noexcept
96 {
97 BT_ASSERT_DBG(_mMsgs.msgs && _mMsgs.index < _mMsgs.msgs->length());
98 BT_ASSERT_DBG(_mDiscardRequired);
99 _mDiscardRequired = false;
100 ++_mMsgs.index;
101
102 if (_mMsgs.index == _mMsgs.msgs->length()) {
103 _mMsgs.msgs.reset();
104 }
105 }
106
107 /*
108 * Retrieves the next message, making it available afterwards
109 * through the msg() method.
110 *
111 * You must have called discard() to discard the current message, if
112 * any, before you call this method.
113 *
114 * This method may throw anything bt2::MessageIterator::next() may
115 * throw.
116 *
117 * If this method returns `ReloadStatus::NO_MORE`, then the
118 * underlying libbabeltrace2 message iterator is ended, meaning you
119 * may not call msg(), msgTs(), or reload() again for this message
120 * iterator until you successfully call seekBeginning().
121 */
122 ReloadStatus reload();
123
124 /*
125 * Forwards to bt2::MessageIterator::canSeekBeginning().
126 */
127 bool canSeekBeginning();
128
129 /*
130 * Forwards to bt2::MessageIterator::seekBeginning().
131 *
132 * On success, you may call reload() afterwards. With any exception,
133 * you must call this method again, successfully, before you may
134 * call reload().
135 */
136 void seekBeginning();
137
138 /*
139 * Forwards to bt2::MessageIterator::canSeekForward().
140 */
141 bool canSeekForward() const noexcept;
142
143 /*
144 * Name of the input port on which the libbabeltrace2 message
145 * iterator was created.
146 */
147 const std::string& portName() const noexcept
148 {
149 return _mPortName;
150 }
151
152 private:
153 /*
154 * Tries to get new messages into `_mMsgs.msgs`.
155 */
156 void _tryGetNewMsgs();
157
158 /* Actual upstream message iterator */
159 bt2::MessageIterator::Shared _mMsgIter;
160
161 /*
162 * Currently contained messages.
163 *
164 * `index` is the index of the current message (msg()/msgTs())
165 * within `msgs`.
166 */
167 struct
168 {
169 bt2s::optional<bt2::ConstMessageArray> msgs;
170 std::size_t index;
171 } _mMsgs;
172
173 /* Timestamp of the current message, if any */
174 bt2s::optional<std::int64_t> _mMsgTs;
175
176 /*
177 * Only relevant in debug mode: true if a call to discard() is
178 * required before calling reload().
179 */
180 bool _mDiscardRequired = false;
181
182 bt2c::Logger _mLogger;
183 std::string _mPortName;
184 };
185
186 } /* namespace bt2mux */
187
188 #endif /* BABELTRACE_PLUGINS_UTILS_MUXER_UPSTREAM_MSG_ITER_HPP */
This page took 0.034755 seconds and 4 git commands to generate.