2 * SPDX-License-Identifier: MIT
4 * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com>
9 #include "cpp-common/bt2/optional-borrowed-object.hpp"
10 #include "cpp-common/bt2c/logging.hpp"
11 #include "cpp-common/vendor/fmt/core.h"
12 #include "cpp-common/vendor/fmt/format.h"
14 #include "upstream-msg-iter.hpp"
18 UpstreamMsgIter::UpstreamMsgIter(bt2::MessageIterator::Shared msgIter
, std::string portName
,
19 const bt2c::Logger
& parentLogger
) :
20 _mMsgIter
{std::move(msgIter
)},
21 _mLogger
{parentLogger
, fmt::format("{}/[{}]", parentLogger
.tag(), portName
)},
22 _mPortName
{std::move(portName
)}
24 BT_CPPLOGI("Created an upstream message iterator: this={}, port-name={}", fmt::ptr(this),
31 * Returns the clock snapshot of `msg`, possibly missing.
33 bt2::OptionalBorrowedObject
<bt2::ConstClockSnapshot
> msgCs(const bt2::ConstMessage msg
) noexcept
36 case bt2::MessageType::EVENT
:
37 if (msg
.asEvent().streamClassDefaultClockClass()) {
38 return msg
.asEvent().defaultClockSnapshot();
42 case bt2::MessageType::PACKET_BEGINNING
:
43 if (msg
.asPacketBeginning().packet().stream().cls().packetsHaveBeginningClockSnapshot()) {
44 return msg
.asPacketBeginning().defaultClockSnapshot();
48 case bt2::MessageType::PACKET_END
:
49 if (msg
.asPacketEnd().packet().stream().cls().packetsHaveEndClockSnapshot()) {
50 return msg
.asPacketEnd().defaultClockSnapshot();
54 case bt2::MessageType::DISCARDED_EVENTS
:
55 if (msg
.asDiscardedEvents().stream().cls().discardedEventsHaveDefaultClockSnapshots()) {
56 return msg
.asDiscardedEvents().beginningDefaultClockSnapshot();
60 case bt2::MessageType::DISCARDED_PACKETS
:
61 if (msg
.asDiscardedPackets().stream().cls().discardedPacketsHaveDefaultClockSnapshots()) {
62 return msg
.asDiscardedPackets().beginningDefaultClockSnapshot();
66 case bt2::MessageType::MESSAGE_ITERATOR_INACTIVITY
:
67 return msg
.asMessageIteratorInactivity().clockSnapshot();
68 case bt2::MessageType::STREAM_BEGINNING
:
69 if (msg
.asStreamBeginning().stream().cls().defaultClockClass()) {
70 return msg
.asStreamBeginning().defaultClockSnapshot();
74 case bt2::MessageType::STREAM_END
:
75 if (msg
.asStreamEnd().stream().cls().defaultClockClass()) {
76 return msg
.asStreamEnd().defaultClockSnapshot();
89 UpstreamMsgIter::ReloadStatus
UpstreamMsgIter::reload()
91 BT_ASSERT_DBG(!_mDiscardRequired
);
93 if (G_UNLIKELY(!_mMsgs
.msgs
)) {
97 * 1. Set `_mMsgs.msgs` to new messages (we'll return
98 * `ReloadStatus::MORE`).
100 * 2. Not set `_mMsgs.msgs` (ended, we'll return
101 * `ReloadStatus::NO_MORE`).
105 this->_tryGetNewMsgs();
108 if (G_UNLIKELY(!_mMsgs
.msgs
)) {
109 /* Still none: no more */
111 return ReloadStatus::NO_MORE
;
113 if (const auto cs
= msgCs(this->msg())) {
114 _mMsgTs
= cs
->nsFromOrigin();
115 BT_CPPLOGD("Cached the timestamp of the current message: this={}, ts={}",
116 fmt::ptr(this), *_mMsgTs
);
119 BT_CPPLOGD("Reset the timestamp of the current message: this={}", fmt::ptr(this));
122 _mDiscardRequired
= true;
123 return ReloadStatus::MORE
;
127 void UpstreamMsgIter::_tryGetNewMsgs()
129 BT_ASSERT_DBG(_mMsgIter
);
130 BT_CPPLOGD("Calling the \"next\" method of the upstream message iterator: this={}",
134 * Replace with next batch!
136 * This may throw, in which case we'll keep our current
137 * `_mMsgs.msgs` (set), still requiring to get new messages the next
138 * time the user calls reload().
140 _mMsgs
.msgs
= _mMsgIter
->next();
144 * Don't destroy `*_mMsgIter` here because the user may still
145 * call seekBeginning() afterwards.
147 BT_CPPLOGD("End of upstream message iterator: this={}", fmt::ptr(this));
152 BT_CPPLOGD("Got {1} messages from upstream: this={0}, count={1}", fmt::ptr(this),
153 _mMsgs
.msgs
->length());
156 bool UpstreamMsgIter::canSeekBeginning()
158 return _mMsgIter
->canSeekBeginning();
161 void UpstreamMsgIter::seekBeginning()
163 _mMsgIter
->seekBeginning();
166 _mDiscardRequired
= false;
169 bool UpstreamMsgIter::canSeekForward() const noexcept
171 return _mMsgIter
->canSeekForward();
174 } /* namespace bt2mux */