src.ctf.lttng-live: use the new metadata stream parser and message iterator
[deliverable/babeltrace.git] / src / plugins / ctf / lttng-live / data-stream.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Philippe Proulx <pproulx@efficios.com>
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7cdc2bab
MD
8 */
9
27a14e13
SM
10#define BT_CLOG_CFG logCfg
11#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/DS"
020bc26f 12
3c22a242 13#include <inttypes.h>
7cdc2bab 14#include <stdio.h>
7cdc2bab 15#include <stdlib.h>
dff1c223 16#include <sstream>
3c22a242 17
7cdc2bab 18#include <glib.h>
3c22a242 19
3fadfbc0 20#include <babeltrace2/babeltrace.h>
3c22a242 21
1908a0bc 22#include "cpp-common/make-unique.hpp"
578e048b 23#include "common/assert.h"
3c22a242 24#include "compat/mman.h"
373a30a3 25#include "../common/src/pkt-props.hpp"
2dbef7b6 26#include "cpp-common/make-unique.hpp"
087cd0f5 27#include "data-stream.hpp"
373a30a3 28#include "cpp-common/exc.hpp"
27a14e13 29#include "cpp-common/cfg-logging-error-reporting.hpp"
373a30a3 30#include "cpp-common/cfg-logging-error-reporting-throw.hpp"
7cdc2bab 31
4164020e 32#define STREAM_NAME_PREFIX "stream-"
14f28187 33
373a30a3
SM
34namespace ctf {
35namespace src {
36namespace live {
37
38Buf CtfLiveMedium::buf(bt2_common::DataLen requestedOffsetInStream, bt2_common::DataLen minSize)
7cdc2bab 39{
373a30a3
SM
40 const bt2_common::LogCfg& logCfg = _mLiveStreamIter.logCfg;
41 BT_CLOGD("CtfLiveMedium::buf called: stream-id=%" PRId64
42 ", offset-bytes=%llu, min-size-bytes=%llu",
43 _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1,
44 requestedOffsetInStream.bytes(), minSize.bytes());
45
46 if (_mLiveStreamIter.has_stream_hung_up)
47 throw NoData {};
48
49 BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream);
50 bt2_common::DataLen requestedOffsetInPacket =
51 requestedOffsetInStream - _mCurPktBegOffsetInStream;
52
53 BT_ASSERT(_mLiveStreamIter.curPktInfo);
54
55 if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) {
56 _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len;
57 _mLiveStreamIter.curPktInfo.reset();
58 lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
59 throw bt2_common::TryAgain {};
4164020e
SM
60 }
61
373a30a3
SM
62 bt2_common::DataLen requestedOffsetInRelay =
63 _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket;
64 bt2_common::DataLen lenUntilEndOfPacket =
65 _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket;
66
67 bt2_common::DataLen maxReqLen = bt2_common::DataLen::fromBytes(
68 _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size);
69 bt2_common::DataLen reqLen = std::min(lenUntilEndOfPacket, maxReqLen);
70 uint64_t recvLen;
71
72 _mBuf.resize(reqLen.bytes());
73
74 lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes(
75 _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(),
76 requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen);
77 switch (status) {
78 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK:
79 _mBuf.resize(recvLen);
80 break;
81
82 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN:
83 BT_CLOGD("CtfLiveMedium::buf try again");
84 throw bt2_common::TryAgain();
85
86 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF:
87 BT_CLOGD("CtfLiveMedium::buf eof");
88 throw NoData();
89
90 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR:
91 BT_CLOGD("CtfLiveMedium::buf error");
92 throw bt2_common::Error();
4164020e
SM
93 }
94
373a30a3 95 Buf buf {_mBuf.data(), bt2_common::DataLen::fromBytes(_mBuf.size())};
a77756b9 96
373a30a3
SM
97 BT_CLOGD("CtfLiveMedium::buf returns: stream-id=%" PRId64 ", buf-addr=%p, buf-size-bytes=%llu",
98 _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1, buf.addr(),
99 buf.size().bytes());
a77756b9 100
373a30a3 101 return buf;
7cdc2bab
MD
102}
103
373a30a3
SM
104} /* namespace live */
105} /* namespace src */
106} /* namespace ctf */
5f16c381 107
373a30a3
SM
108lttng_live_iterator_status
109lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter)
110{
111 BT_ASSERT(!liveStreamIter->msg_iter);
112 BT_ASSERT(!liveStreamIter->stream);
113 lttng_live_trace *trace = liveStreamIter->trace;
114 lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter;
115
116 ctf::src::Medium::UP tempMedium =
117 bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
118 const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls();
119 BT_ASSERT(ctfTc);
120 ctf::src::PktProps pktProps = ctf::src::readPktProps(
121 *ctfTc, std::move(tempMedium), bt2_common::DataLen::fromBytes(0), liveStreamIter->logCfg);
122
123 nonstd::optional<bt2::TraceClass> tc = ctfTc->libCls();
124 BT_ASSERT(tc);
125 BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set);
126 BT_ASSERT(trace->trace);
127
128 const bt2_common::LogCfg& logCfg = liveStreamIter->logCfg;
129 nonstd::optional<bt2::StreamClass> sc =
130 tc->streamClassById(liveStreamIter->ctf_stream_class_id.value);
131 if (!sc) {
132 BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "No stream class with id %" PRId64,
133 liveStreamIter->ctf_stream_class_id.value);
4164020e 134 }
7cdc2bab 135
373a30a3
SM
136 // FIXME: in the original, there is a fall back if the data stream id is not available.
137 bt_stream *streamPtr = bt_stream_create_with_id(sc->libObjPtr(), (*trace->trace)->libObjPtr(),
138 *pktProps.dataStreamId);
139 BT_ASSERT(streamPtr);
140 liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr);
141 (*liveStreamIter->stream)->name(liveStreamIter->name);
142
143 ctf::src::Medium::UP medium =
144 bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
145 liveStreamIter->msg_iter.emplace(
146 liveMsgIter->self_msg_iter, *ctfTc, liveStreamIter->trace->metadata->metadataStreamUuid(),
147 **liveStreamIter->stream, std::move(medium), ctf::src::MsgIterQuirks {}, logCfg);
148 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
149}
150
7cdc2bab 151BT_HIDDEN
4164020e
SM
152enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
153 bt_self_message_iterator *self_msg_iter)
7cdc2bab 154{
27a14e13 155 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e
SM
156
157 if (!session->lazy_stream_msg_init) {
158 return LTTNG_LIVE_ITERATOR_STATUS_OK;
159 }
160
27a14e13
SM
161 BT_CLOGD("Lazily initializing self message iterator for live session: "
162 "session-id=%" PRIu64 ", self-msg-iter-addr=%p",
163 session->id, self_msg_iter);
4164020e 164
caec6d6f 165 for (lttng_live_trace::UP& trace : session->traces) {
2dbef7b6 166 for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) {
4164020e
SM
167 if (stream_iter->msg_iter) {
168 continue;
169 }
2dbef7b6 170
373a30a3 171 const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls();
27a14e13
SM
172 BT_CLOGD("Creating CTF message iterator: "
173 "session-id=%" PRIu64 ", ctf-tc-addr=%p, "
174 "stream-iter-name=%s, self-msg-iter-addr=%p",
373a30a3 175 session->id, ctfTraceCls, stream_iter->name.c_str(), self_msg_iter);
4164020e
SM
176 }
177 }
178
179 session->lazy_stream_msg_init = false;
180
181 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
182}
183
184BT_HIDDEN
4164020e
SM
185struct lttng_live_stream_iterator *
186lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
187 uint64_t stream_id, bt_self_message_iterator *self_msg_iter)
7cdc2bab 188{
4164020e 189 struct lttng_live_trace *trace;
dff1c223 190 std::stringstream nameSs;
4164020e
SM
191
192 BT_ASSERT(session);
193 BT_ASSERT(session->lttng_live_msg_iter);
194 BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp);
27a14e13
SM
195
196 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e 197
1908a0bc
SM
198 lttng_live_stream_iterator::UP stream_iter =
199 bt2_common::makeUnique<lttng_live_stream_iterator>(logCfg);
4164020e
SM
200 trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
201 if (!trace) {
27a14e13 202 BT_CLOGE_APPEND_CAUSE("Failed to borrow CTF trace.");
1908a0bc 203 return nullptr;
4164020e
SM
204 }
205
206 stream_iter->trace = trace;
207 stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
208 stream_iter->viewer_stream_id = stream_id;
209
210 stream_iter->ctf_stream_class_id.is_set = false;
211 stream_iter->ctf_stream_class_id.value = UINT64_MAX;
212
213 stream_iter->last_inactivity_ts.is_set = false;
214 stream_iter->last_inactivity_ts.value = 0;
215
dff1c223
SM
216 nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
217 stream_iter->name = nameSs.str();
1908a0bc
SM
218
219 lttng_live_stream_iterator *ret = stream_iter.get();
2dbef7b6 220 trace->stream_iterators.emplace_back(std::move(stream_iter));
4164020e
SM
221
222 /* Track the number of active stream iterator. */
223 session->lttng_live_msg_iter->active_stream_iter++;
224
1908a0bc 225 return ret;
7cdc2bab
MD
226}
227
373a30a3
SM
228void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
229 uint64_t ctfStreamClsId)
230{
231 if (streamIter->ctf_stream_class_id.is_set) {
232 BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId);
233 return;
234 } else {
235 streamIter->ctf_stream_class_id.value = ctfStreamClsId;
236 streamIter->ctf_stream_class_id.is_set = true;
237 }
238}
239
f58d2476 240lttng_live_stream_iterator::~lttng_live_stream_iterator()
7cdc2bab 241{
4164020e 242 /* Track the number of active stream iterator. */
f58d2476
SM
243 this->trace->session->lttng_live_msg_iter->active_stream_iter--;
244}
This page took 0.0715479999999999 seconds and 5 git commands to generate.