| 1 | /* |
| 2 | * SPDX-License-Identifier: MIT |
| 3 | * |
| 4 | * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com> |
| 5 | */ |
| 6 | |
| 7 | #include <glib.h> |
| 8 | |
| 9 | #include "cpp-common/bt2/optional-borrowed-object.hpp" |
| 10 | #include "cpp-common/bt2c/logging.hpp" |
| 11 | #include "cpp-common/vendor/fmt/core.h" |
| 12 | #include "cpp-common/vendor/fmt/format.h" |
| 13 | |
| 14 | #include "upstream-msg-iter.hpp" |
| 15 | |
| 16 | namespace bt2mux { |
| 17 | |
| 18 | UpstreamMsgIter::UpstreamMsgIter(bt2::MessageIterator::Shared msgIter, std::string portName, |
| 19 | const bt2c::Logger& parentLogger) : |
| 20 | _mMsgIter {std::move(msgIter)}, |
| 21 | _mLogger {parentLogger, fmt::format("{}/[{}]", parentLogger.tag(), portName)}, |
| 22 | _mPortName {std::move(portName)} |
| 23 | { |
| 24 | BT_CPPLOGI("Created an upstream message iterator: this={}, port-name={}", fmt::ptr(this), |
| 25 | _mPortName); |
| 26 | } |
| 27 | |
| 28 | namespace { |
| 29 | |
| 30 | /* |
| 31 | * Returns the clock snapshot of `msg`, possibly missing. |
| 32 | */ |
| 33 | bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> msgCs(const bt2::ConstMessage msg) noexcept |
| 34 | { |
| 35 | switch (msg.type()) { |
| 36 | case bt2::MessageType::Event: |
| 37 | if (msg.asEvent().streamClassDefaultClockClass()) { |
| 38 | return msg.asEvent().defaultClockSnapshot(); |
| 39 | } |
| 40 | |
| 41 | break; |
| 42 | case bt2::MessageType::PacketBeginning: |
| 43 | if (msg.asPacketBeginning().packet().stream().cls().packetsHaveBeginningClockSnapshot()) { |
| 44 | return msg.asPacketBeginning().defaultClockSnapshot(); |
| 45 | } |
| 46 | |
| 47 | break; |
| 48 | case bt2::MessageType::PacketEnd: |
| 49 | if (msg.asPacketEnd().packet().stream().cls().packetsHaveEndClockSnapshot()) { |
| 50 | return msg.asPacketEnd().defaultClockSnapshot(); |
| 51 | } |
| 52 | |
| 53 | break; |
| 54 | case bt2::MessageType::DiscardedEvents: |
| 55 | if (msg.asDiscardedEvents().stream().cls().discardedEventsHaveDefaultClockSnapshots()) { |
| 56 | return msg.asDiscardedEvents().beginningDefaultClockSnapshot(); |
| 57 | } |
| 58 | |
| 59 | break; |
| 60 | case bt2::MessageType::DiscardedPackets: |
| 61 | if (msg.asDiscardedPackets().stream().cls().discardedPacketsHaveDefaultClockSnapshots()) { |
| 62 | return msg.asDiscardedPackets().beginningDefaultClockSnapshot(); |
| 63 | } |
| 64 | |
| 65 | break; |
| 66 | case bt2::MessageType::MessageIteratorInactivity: |
| 67 | return msg.asMessageIteratorInactivity().clockSnapshot(); |
| 68 | case bt2::MessageType::StreamBeginning: |
| 69 | if (msg.asStreamBeginning().streamClassDefaultClockClass()) { |
| 70 | return msg.asStreamBeginning().defaultClockSnapshot(); |
| 71 | } |
| 72 | |
| 73 | break; |
| 74 | case bt2::MessageType::StreamEnd: |
| 75 | if (msg.asStreamEnd().streamClassDefaultClockClass()) { |
| 76 | return msg.asStreamEnd().defaultClockSnapshot(); |
| 77 | } |
| 78 | |
| 79 | break; |
| 80 | default: |
| 81 | bt_common_abort(); |
| 82 | } |
| 83 | |
| 84 | return {}; |
| 85 | } |
| 86 | |
| 87 | } /* namespace */ |
| 88 | |
| 89 | UpstreamMsgIter::ReloadStatus UpstreamMsgIter::reload() |
| 90 | { |
| 91 | BT_ASSERT_DBG(!_mDiscardRequired); |
| 92 | |
| 93 | if (G_UNLIKELY(!_mMsgs.msgs)) { |
| 94 | /* |
| 95 | * This will either: |
| 96 | * |
| 97 | * 1. Set `_mMsgs.msgs` to new messages (we'll return |
| 98 | * `ReloadStatus::MORE`). |
| 99 | * |
| 100 | * 2. Not set `_mMsgs.msgs` (ended, we'll return |
| 101 | * `ReloadStatus::NO_MORE`). |
| 102 | * |
| 103 | * 3. Throw. |
| 104 | */ |
| 105 | this->_tryGetNewMsgs(); |
| 106 | } |
| 107 | |
| 108 | if (G_UNLIKELY(!_mMsgs.msgs)) { |
| 109 | /* Still none: no more */ |
| 110 | _mMsgTs.reset(); |
| 111 | return ReloadStatus::NoMore; |
| 112 | } else { |
| 113 | if (const auto cs = msgCs(this->msg())) { |
| 114 | _mMsgTs = cs->nsFromOrigin(); |
| 115 | BT_CPPLOGD("Cached the timestamp of the current message: this={}, ts={}", |
| 116 | fmt::ptr(this), *_mMsgTs); |
| 117 | } else { |
| 118 | _mMsgTs.reset(); |
| 119 | BT_CPPLOGD("Reset the timestamp of the current message: this={}", fmt::ptr(this)); |
| 120 | } |
| 121 | |
| 122 | _mDiscardRequired = true; |
| 123 | return ReloadStatus::More; |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | void UpstreamMsgIter::_tryGetNewMsgs() |
| 128 | { |
| 129 | BT_ASSERT_DBG(_mMsgIter); |
| 130 | BT_CPPLOGD("Calling the \"next\" method of the upstream message iterator: this={}", |
| 131 | fmt::ptr(this)); |
| 132 | |
| 133 | /* |
| 134 | * Replace with next batch! |
| 135 | * |
| 136 | * This may throw, in which case we'll keep our current |
| 137 | * `_mMsgs.msgs` (set), still requiring to get new messages the next |
| 138 | * time the user calls reload(). |
| 139 | */ |
| 140 | _mMsgs.msgs = _mMsgIter->next(); |
| 141 | |
| 142 | if (!_mMsgs.msgs) { |
| 143 | /* |
| 144 | * Don't destroy `*_mMsgIter` here because the user may still |
| 145 | * call seekBeginning() afterwards. |
| 146 | */ |
| 147 | BT_CPPLOGD("End of upstream message iterator: this={}", fmt::ptr(this)); |
| 148 | return; |
| 149 | } |
| 150 | |
| 151 | _mMsgs.index = 0; |
| 152 | BT_CPPLOGD("Got {1} messages from upstream: this={0}, count={1}", fmt::ptr(this), |
| 153 | _mMsgs.msgs->length()); |
| 154 | } |
| 155 | |
| 156 | bool UpstreamMsgIter::canSeekBeginning() |
| 157 | { |
| 158 | return _mMsgIter->canSeekBeginning(); |
| 159 | } |
| 160 | |
| 161 | void UpstreamMsgIter::seekBeginning() |
| 162 | { |
| 163 | _mMsgIter->seekBeginning(); |
| 164 | _mMsgs.msgs.reset(); |
| 165 | _mMsgTs.reset(); |
| 166 | _mDiscardRequired = false; |
| 167 | } |
| 168 | |
| 169 | bool UpstreamMsgIter::canSeekForward() const noexcept |
| 170 | { |
| 171 | return _mMsgIter->canSeekForward(); |
| 172 | } |
| 173 | |
| 174 | } /* namespace bt2mux */ |