Commit | Line | Data |
---|---|---|
7cdc2bab | 1 | /* |
0235b0db | 2 | * SPDX-License-Identifier: MIT |
7cdc2bab | 3 | * |
0235b0db MJ |
4 | * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com> |
5 | * Copyright 2016 Philippe Proulx <pproulx@efficios.com> | |
6 | * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
7 | * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation | |
7cdc2bab MD |
8 | */ |
9 | ||
e28ca558 SM |
10 | #include <sstream> |
11 | ||
5656cea5 PP |
12 | #include <babeltrace2/babeltrace.h> |
13 | ||
578e048b | 14 | #include "common/assert.h" |
83ad336c | 15 | #include "compat/mman.h" /* IWYU pragma: keep */ |
6eeff3ca | 16 | #include "cpp-common/bt2s/make-unique.hpp" |
0f5c5d5c | 17 | #include "cpp-common/vendor/fmt/format.h" |
c802cacb | 18 | |
81c7f242 | 19 | #include "../common/src/pkt-props.hpp" |
087cd0f5 | 20 | #include "data-stream.hpp" |
7cdc2bab | 21 | |
4164020e | 22 | #define STREAM_NAME_PREFIX "stream-" |
14f28187 | 23 | |
81c7f242 SM |
24 | using namespace bt2c::literals::datalen; |
25 | ||
26 | namespace ctf { | |
27 | namespace src { | |
28 | namespace live { | |
29 | ||
30 | Buf CtfLiveMedium::buf(bt2c::DataLen requestedOffsetInStream, bt2c::DataLen minSize) | |
7cdc2bab | 31 | { |
81c7f242 SM |
32 | BT_CPPLOGD("CtfLiveMedium::buf called: stream-id={}, offset-bytes={}, min-size-bytes={}", |
33 | _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, | |
34 | requestedOffsetInStream.bytes(), minSize.bytes()); | |
4164020e | 35 | |
81c7f242 SM |
36 | if (_mLiveStreamIter.has_stream_hung_up) |
37 | throw NoData {}; | |
4164020e | 38 | |
81c7f242 SM |
39 | BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream); |
40 | auto requestedOffsetInPacket = requestedOffsetInStream - _mCurPktBegOffsetInStream; | |
60323499 | 41 | |
81c7f242 | 42 | BT_ASSERT(_mLiveStreamIter.curPktInfo); |
60323499 | 43 | |
81c7f242 SM |
44 | if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) { |
45 | _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len; | |
46 | _mLiveStreamIter.curPktInfo.reset(); | |
47 | lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); | |
48 | throw bt2c::TryAgain {}; | |
60323499 SM |
49 | } |
50 | ||
81c7f242 SM |
51 | auto requestedOffsetInRelay = |
52 | _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket; | |
53 | auto lenUntilEndOfPacket = _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket; | |
60323499 | 54 | |
81c7f242 SM |
55 | auto maxReqLen = bt2c::DataLen::fromBytes( |
56 | _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size); | |
57 | auto reqLen = std::min(lenUntilEndOfPacket, maxReqLen); | |
58 | uint64_t recvLen; | |
7cdc2bab | 59 | |
81c7f242 | 60 | _mBuf.resize(reqLen.bytes()); |
4164020e | 61 | |
81c7f242 SM |
62 | lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes( |
63 | _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(), | |
64 | requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen); | |
65 | switch (status) { | |
66 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK: | |
67 | _mBuf.resize(recvLen); | |
68 | break; | |
69 | ||
70 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN: | |
71 | BT_CPPLOGD("CtfLiveMedium::buf try again"); | |
72 | throw bt2c::TryAgain(); | |
4164020e | 73 | |
81c7f242 SM |
74 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF: |
75 | BT_CPPLOGD("CtfLiveMedium::buf eof"); | |
76 | throw NoData(); | |
e28ca558 | 77 | |
81c7f242 SM |
78 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR: |
79 | BT_CPPLOGD("CtfLiveMedium::buf error"); | |
80 | throw bt2c::Error(); | |
4164020e | 81 | } |
7cdc2bab | 82 | |
81c7f242 SM |
83 | const Buf buf {_mBuf.data(), bt2c::DataLen::fromBytes(_mBuf.size())}; |
84 | ||
85 | BT_CPPLOGD("CtfLiveMedium::buf returns: stream-id={}, buf-addr={}, buf-size-bytes={}", | |
86 | _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, fmt::ptr(buf.addr()), | |
87 | buf.size().bytes()); | |
88 | ||
89 | return buf; | |
7cdc2bab MD |
90 | } |
91 | ||
81c7f242 SM |
92 | } /* namespace live */ |
93 | } /* namespace src */ | |
94 | } /* namespace ctf */ | |
95 | ||
96 | lttng_live_iterator_status | |
97 | lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter) | |
98 | { | |
99 | BT_ASSERT(!liveStreamIter->msg_iter); | |
100 | BT_ASSERT(!liveStreamIter->stream); | |
101 | lttng_live_trace *trace = liveStreamIter->trace; | |
102 | lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter; | |
103 | ||
104 | auto tempMedium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter); | |
105 | const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls(); | |
106 | BT_ASSERT(ctfTc); | |
107 | ctf::src::PktProps pktProps = | |
108 | ctf::src::readPktProps(*ctfTc, std::move(tempMedium), 0_bytes, liveStreamIter->logger); | |
109 | ||
110 | bt2::OptionalBorrowedObject<bt2::TraceClass> tc = ctfTc->libCls(); | |
111 | BT_ASSERT(tc); | |
112 | BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set); | |
113 | BT_ASSERT(trace->trace); | |
114 | ||
115 | auto sc = tc->streamClassById(liveStreamIter->ctf_stream_class_id.value); | |
116 | if (!sc) { | |
117 | BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(liveStreamIter->logger, bt2::Error, | |
118 | "No stream class with id {}", | |
119 | liveStreamIter->ctf_stream_class_id.value); | |
120 | } | |
121 | ||
122 | bt_stream *streamPtr; | |
123 | if (pktProps.dataStreamId) { | |
124 | streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(), | |
125 | *pktProps.dataStreamId); | |
126 | } else { | |
127 | /* | |
128 | * No stream instance ID in the stream. It's possible | |
129 | * to encounter this situation with older version of | |
130 | * LTTng. In these cases, use the viewer_stream_id that | |
131 | * is unique for a live viewer session. | |
132 | */ | |
133 | streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(), | |
134 | liveStreamIter->viewer_stream_id); | |
135 | } | |
136 | BT_ASSERT(streamPtr); | |
137 | liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr); | |
138 | liveStreamIter->stream->name(liveStreamIter->name); | |
139 | ||
140 | auto medium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter); | |
4d6634b8 | 141 | liveStreamIter->msg_iter.emplace(liveMsgIter->selfMsgIter, *ctfTc, |
81c7f242 SM |
142 | liveStreamIter->trace->metadata->metadataStreamUuid(), |
143 | *liveStreamIter->stream, std::move(medium), | |
144 | ctf::src::MsgIterQuirks {}, liveStreamIter->logger); | |
145 | return LTTNG_LIVE_ITERATOR_STATUS_OK; | |
146 | } | |
7cdc2bab | 147 | |
4164020e | 148 | enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session, |
4d6634b8 | 149 | const bt2::SelfMessageIterator selfMsgIter) |
7cdc2bab | 150 | { |
4164020e SM |
151 | if (!session->lazy_stream_msg_init) { |
152 | return LTTNG_LIVE_ITERATOR_STATUS_OK; | |
153 | } | |
154 | ||
0f5c5d5c SM |
155 | BT_CPPLOGD_SPEC(session->logger, |
156 | "Lazily initializing self message iterator for live session: " | |
157 | "session-id={}, self-msg-iter-addr={}", | |
4d6634b8 | 158 | session->id, fmt::ptr(selfMsgIter.libObjPtr())); |
4164020e | 159 | |
6dcda123 | 160 | for (lttng_live_trace::UP& trace : session->traces) { |
db00b877 | 161 | for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) { |
4164020e SM |
162 | if (stream_iter->msg_iter) { |
163 | continue; | |
164 | } | |
db00b877 | 165 | |
81c7f242 SM |
166 | const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls(); |
167 | BT_CPPLOGD_SPEC(session->logger, | |
0f5c5d5c SM |
168 | "Creating CTF message iterator: session-id={}, ctf-tc-addr={}, " |
169 | "stream-iter-name={}, self-msg-iter-addr={}", | |
81c7f242 | 170 | session->id, fmt::ptr(ctfTraceCls), stream_iter->name.c_str(), |
4d6634b8 | 171 | fmt::ptr(selfMsgIter.libObjPtr())); |
4164020e SM |
172 | } |
173 | } | |
174 | ||
175 | session->lazy_stream_msg_init = false; | |
176 | ||
177 | return LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab MD |
178 | } |
179 | ||
4164020e SM |
180 | struct lttng_live_stream_iterator * |
181 | lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id, | |
81c7f242 | 182 | uint64_t stream_id) |
7cdc2bab | 183 | { |
e28ca558 | 184 | std::stringstream nameSs; |
4164020e SM |
185 | |
186 | BT_ASSERT(session); | |
187 | BT_ASSERT(session->lttng_live_msg_iter); | |
188 | BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp); | |
4164020e | 189 | |
81c7f242 | 190 | const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id); |
4164020e | 191 | if (!trace) { |
0f5c5d5c | 192 | BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow CTF trace."); |
6eeff3ca | 193 | return nullptr; |
4164020e SM |
194 | } |
195 | ||
6eeff3ca SM |
196 | auto stream_iter = bt2s::make_unique<lttng_live_stream_iterator>(session->logger); |
197 | ||
4164020e SM |
198 | stream_iter->trace = trace; |
199 | stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; | |
200 | stream_iter->viewer_stream_id = stream_id; | |
201 | ||
202 | stream_iter->ctf_stream_class_id.is_set = false; | |
203 | stream_iter->ctf_stream_class_id.value = UINT64_MAX; | |
204 | ||
205 | stream_iter->last_inactivity_ts.is_set = false; | |
206 | stream_iter->last_inactivity_ts.value = 0; | |
207 | ||
e28ca558 SM |
208 | nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id; |
209 | stream_iter->name = nameSs.str(); | |
6eeff3ca SM |
210 | |
211 | const auto ret = stream_iter.get(); | |
db00b877 | 212 | trace->stream_iterators.emplace_back(std::move(stream_iter)); |
4164020e SM |
213 | |
214 | /* Track the number of active stream iterator. */ | |
215 | session->lttng_live_msg_iter->active_stream_iter++; | |
216 | ||
6eeff3ca | 217 | return ret; |
7cdc2bab MD |
218 | } |
219 | ||
81c7f242 SM |
220 | void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter, |
221 | uint64_t ctfStreamClsId) | |
222 | { | |
223 | if (streamIter->ctf_stream_class_id.is_set) { | |
224 | BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId); | |
225 | return; | |
226 | } else { | |
227 | streamIter->ctf_stream_class_id.value = ctfStreamClsId; | |
228 | streamIter->ctf_stream_class_id.is_set = true; | |
229 | } | |
230 | } | |
231 | ||
ce4ee876 | 232 | lttng_live_stream_iterator::~lttng_live_stream_iterator() |
7cdc2bab | 233 | { |
4164020e | 234 | /* Track the number of active stream iterator. */ |
ce4ee876 SM |
235 | this->trace->session->lttng_live_msg_iter->active_stream_iter--; |
236 | } |