Commit | Line | Data |
---|---|---|
7cdc2bab | 1 | /* |
0235b0db | 2 | * SPDX-License-Identifier: MIT |
7cdc2bab | 3 | * |
0235b0db MJ |
4 | * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com> |
5 | * Copyright 2016 Philippe Proulx <pproulx@efficios.com> | |
6 | * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
7 | * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation | |
7cdc2bab MD |
8 | */ |
9 | ||
27a14e13 SM |
10 | #define BT_CLOG_CFG logCfg |
11 | #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/DS" | |
020bc26f | 12 | |
3c22a242 | 13 | #include <inttypes.h> |
7cdc2bab | 14 | #include <stdio.h> |
7cdc2bab | 15 | #include <stdlib.h> |
dff1c223 | 16 | #include <sstream> |
3c22a242 | 17 | |
7cdc2bab | 18 | #include <glib.h> |
3c22a242 | 19 | |
3fadfbc0 | 20 | #include <babeltrace2/babeltrace.h> |
3c22a242 | 21 | |
1908a0bc | 22 | #include "cpp-common/make-unique.hpp" |
578e048b | 23 | #include "common/assert.h" |
3c22a242 | 24 | #include "compat/mman.h" |
373a30a3 | 25 | #include "../common/src/pkt-props.hpp" |
2dbef7b6 | 26 | #include "cpp-common/make-unique.hpp" |
087cd0f5 | 27 | #include "data-stream.hpp" |
373a30a3 | 28 | #include "cpp-common/exc.hpp" |
27a14e13 | 29 | #include "cpp-common/cfg-logging-error-reporting.hpp" |
373a30a3 | 30 | #include "cpp-common/cfg-logging-error-reporting-throw.hpp" |
7cdc2bab | 31 | |
4164020e | 32 | #define STREAM_NAME_PREFIX "stream-" |
14f28187 | 33 | |
373a30a3 SM |
34 | namespace ctf { |
35 | namespace src { | |
36 | namespace live { | |
37 | ||
38 | Buf CtfLiveMedium::buf(bt2_common::DataLen requestedOffsetInStream, bt2_common::DataLen minSize) | |
7cdc2bab | 39 | { |
373a30a3 SM |
40 | const bt2_common::LogCfg& logCfg = _mLiveStreamIter.logCfg; |
41 | BT_CLOGD("CtfLiveMedium::buf called: stream-id=%" PRId64 | |
42 | ", offset-bytes=%llu, min-size-bytes=%llu", | |
43 | _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1, | |
44 | requestedOffsetInStream.bytes(), minSize.bytes()); | |
45 | ||
46 | if (_mLiveStreamIter.has_stream_hung_up) | |
47 | throw NoData {}; | |
48 | ||
49 | BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream); | |
50 | bt2_common::DataLen requestedOffsetInPacket = | |
51 | requestedOffsetInStream - _mCurPktBegOffsetInStream; | |
52 | ||
53 | BT_ASSERT(_mLiveStreamIter.curPktInfo); | |
54 | ||
55 | if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) { | |
56 | _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len; | |
57 | _mLiveStreamIter.curPktInfo.reset(); | |
58 | lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); | |
59 | throw bt2_common::TryAgain {}; | |
4164020e SM |
60 | } |
61 | ||
373a30a3 SM |
62 | bt2_common::DataLen requestedOffsetInRelay = |
63 | _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket; | |
64 | bt2_common::DataLen lenUntilEndOfPacket = | |
65 | _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket; | |
66 | ||
67 | bt2_common::DataLen maxReqLen = bt2_common::DataLen::fromBytes( | |
68 | _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size); | |
69 | bt2_common::DataLen reqLen = std::min(lenUntilEndOfPacket, maxReqLen); | |
70 | uint64_t recvLen; | |
71 | ||
72 | _mBuf.resize(reqLen.bytes()); | |
73 | ||
74 | lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes( | |
75 | _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(), | |
76 | requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen); | |
77 | switch (status) { | |
78 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK: | |
79 | _mBuf.resize(recvLen); | |
80 | break; | |
81 | ||
82 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN: | |
83 | BT_CLOGD("CtfLiveMedium::buf try again"); | |
84 | throw bt2_common::TryAgain(); | |
85 | ||
86 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF: | |
87 | BT_CLOGD("CtfLiveMedium::buf eof"); | |
88 | throw NoData(); | |
89 | ||
90 | case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR: | |
91 | BT_CLOGD("CtfLiveMedium::buf error"); | |
92 | throw bt2_common::Error(); | |
4164020e SM |
93 | } |
94 | ||
373a30a3 | 95 | Buf buf {_mBuf.data(), bt2_common::DataLen::fromBytes(_mBuf.size())}; |
a77756b9 | 96 | |
373a30a3 SM |
97 | BT_CLOGD("CtfLiveMedium::buf returns: stream-id=%" PRId64 ", buf-addr=%p, buf-size-bytes=%llu", |
98 | _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1, buf.addr(), | |
99 | buf.size().bytes()); | |
a77756b9 | 100 | |
373a30a3 | 101 | return buf; |
7cdc2bab MD |
102 | } |
103 | ||
373a30a3 SM |
104 | } /* namespace live */ |
105 | } /* namespace src */ | |
106 | } /* namespace ctf */ | |
5f16c381 | 107 | |
373a30a3 SM |
108 | lttng_live_iterator_status |
109 | lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter) | |
110 | { | |
111 | BT_ASSERT(!liveStreamIter->msg_iter); | |
112 | BT_ASSERT(!liveStreamIter->stream); | |
113 | lttng_live_trace *trace = liveStreamIter->trace; | |
114 | lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter; | |
115 | ||
116 | ctf::src::Medium::UP tempMedium = | |
117 | bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter); | |
118 | const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls(); | |
119 | BT_ASSERT(ctfTc); | |
120 | ctf::src::PktProps pktProps = ctf::src::readPktProps( | |
121 | *ctfTc, std::move(tempMedium), bt2_common::DataLen::fromBytes(0), liveStreamIter->logCfg); | |
122 | ||
123 | nonstd::optional<bt2::TraceClass> tc = ctfTc->libCls(); | |
124 | BT_ASSERT(tc); | |
125 | BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set); | |
126 | BT_ASSERT(trace->trace); | |
127 | ||
128 | const bt2_common::LogCfg& logCfg = liveStreamIter->logCfg; | |
129 | nonstd::optional<bt2::StreamClass> sc = | |
130 | tc->streamClassById(liveStreamIter->ctf_stream_class_id.value); | |
131 | if (!sc) { | |
132 | BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "No stream class with id %" PRId64, | |
133 | liveStreamIter->ctf_stream_class_id.value); | |
4164020e | 134 | } |
7cdc2bab | 135 | |
373a30a3 SM |
136 | // FIXME: in the original, there is a fall back if the data stream id is not available. |
137 | bt_stream *streamPtr = bt_stream_create_with_id(sc->libObjPtr(), (*trace->trace)->libObjPtr(), | |
138 | *pktProps.dataStreamId); | |
139 | BT_ASSERT(streamPtr); | |
140 | liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr); | |
141 | (*liveStreamIter->stream)->name(liveStreamIter->name); | |
142 | ||
143 | ctf::src::Medium::UP medium = | |
144 | bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter); | |
145 | liveStreamIter->msg_iter.emplace( | |
146 | liveMsgIter->self_msg_iter, *ctfTc, liveStreamIter->trace->metadata->metadataStreamUuid(), | |
147 | **liveStreamIter->stream, std::move(medium), ctf::src::MsgIterQuirks {}, logCfg); | |
148 | return LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab MD |
149 | } |
150 | ||
7cdc2bab | 151 | BT_HIDDEN |
4164020e SM |
152 | enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session, |
153 | bt_self_message_iterator *self_msg_iter) | |
7cdc2bab | 154 | { |
27a14e13 | 155 | const bt2_common::LogCfg& logCfg = session->logCfg; |
4164020e SM |
156 | |
157 | if (!session->lazy_stream_msg_init) { | |
158 | return LTTNG_LIVE_ITERATOR_STATUS_OK; | |
159 | } | |
160 | ||
27a14e13 SM |
161 | BT_CLOGD("Lazily initializing self message iterator for live session: " |
162 | "session-id=%" PRIu64 ", self-msg-iter-addr=%p", | |
163 | session->id, self_msg_iter); | |
4164020e | 164 | |
caec6d6f | 165 | for (lttng_live_trace::UP& trace : session->traces) { |
2dbef7b6 | 166 | for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) { |
4164020e SM |
167 | if (stream_iter->msg_iter) { |
168 | continue; | |
169 | } | |
2dbef7b6 | 170 | |
373a30a3 | 171 | const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls(); |
27a14e13 SM |
172 | BT_CLOGD("Creating CTF message iterator: " |
173 | "session-id=%" PRIu64 ", ctf-tc-addr=%p, " | |
174 | "stream-iter-name=%s, self-msg-iter-addr=%p", | |
373a30a3 | 175 | session->id, ctfTraceCls, stream_iter->name.c_str(), self_msg_iter); |
4164020e SM |
176 | } |
177 | } | |
178 | ||
179 | session->lazy_stream_msg_init = false; | |
180 | ||
181 | return LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab MD |
182 | } |
183 | ||
184 | BT_HIDDEN | |
4164020e SM |
185 | struct lttng_live_stream_iterator * |
186 | lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id, | |
187 | uint64_t stream_id, bt_self_message_iterator *self_msg_iter) | |
7cdc2bab | 188 | { |
4164020e | 189 | struct lttng_live_trace *trace; |
dff1c223 | 190 | std::stringstream nameSs; |
4164020e SM |
191 | |
192 | BT_ASSERT(session); | |
193 | BT_ASSERT(session->lttng_live_msg_iter); | |
194 | BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp); | |
27a14e13 SM |
195 | |
196 | const bt2_common::LogCfg& logCfg = session->logCfg; | |
4164020e | 197 | |
1908a0bc SM |
198 | lttng_live_stream_iterator::UP stream_iter = |
199 | bt2_common::makeUnique<lttng_live_stream_iterator>(logCfg); | |
4164020e SM |
200 | trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id); |
201 | if (!trace) { | |
27a14e13 | 202 | BT_CLOGE_APPEND_CAUSE("Failed to borrow CTF trace."); |
1908a0bc | 203 | return nullptr; |
4164020e SM |
204 | } |
205 | ||
206 | stream_iter->trace = trace; | |
207 | stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; | |
208 | stream_iter->viewer_stream_id = stream_id; | |
209 | ||
210 | stream_iter->ctf_stream_class_id.is_set = false; | |
211 | stream_iter->ctf_stream_class_id.value = UINT64_MAX; | |
212 | ||
213 | stream_iter->last_inactivity_ts.is_set = false; | |
214 | stream_iter->last_inactivity_ts.value = 0; | |
215 | ||
dff1c223 SM |
216 | nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id; |
217 | stream_iter->name = nameSs.str(); | |
1908a0bc SM |
218 | |
219 | lttng_live_stream_iterator *ret = stream_iter.get(); | |
2dbef7b6 | 220 | trace->stream_iterators.emplace_back(std::move(stream_iter)); |
4164020e SM |
221 | |
222 | /* Track the number of active stream iterator. */ | |
223 | session->lttng_live_msg_iter->active_stream_iter++; | |
224 | ||
1908a0bc | 225 | return ret; |
7cdc2bab MD |
226 | } |
227 | ||
373a30a3 SM |
228 | void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter, |
229 | uint64_t ctfStreamClsId) | |
230 | { | |
231 | if (streamIter->ctf_stream_class_id.is_set) { | |
232 | BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId); | |
233 | return; | |
234 | } else { | |
235 | streamIter->ctf_stream_class_id.value = ctfStreamClsId; | |
236 | streamIter->ctf_stream_class_id.is_set = true; | |
237 | } | |
238 | } | |
239 | ||
f58d2476 | 240 | lttng_live_stream_iterator::~lttng_live_stream_iterator() |
7cdc2bab | 241 | { |
4164020e | 242 | /* Track the number of active stream iterator. */ |
f58d2476 SM |
243 | this->trace->session->lttng_live_msg_iter->active_stream_iter--; |
244 | } |