src.ctf.lttng-live: make lttng_live_stream_iterator::name an std::string
[deliverable/babeltrace.git] / src / plugins / ctf / lttng-live / data-stream.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Philippe Proulx <pproulx@efficios.com>
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7cdc2bab
MD
8 */
9
27a14e13
SM
10#define BT_CLOG_CFG logCfg
11#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/DS"
020bc26f 12
3c22a242 13#include <inttypes.h>
7cdc2bab 14#include <stdio.h>
7cdc2bab 15#include <stdlib.h>
dff1c223 16#include <sstream>
3c22a242 17
7cdc2bab 18#include <glib.h>
3c22a242 19
3fadfbc0 20#include <babeltrace2/babeltrace.h>
3c22a242 21
27a14e13 22#include "cpp-common/cfg-logging.hpp"
364f5320 23#include "../common/src/msg-iter/msg-iter.hpp"
578e048b 24#include "common/assert.h"
3c22a242 25#include "compat/mman.h"
087cd0f5 26#include "data-stream.hpp"
27a14e13 27#include "cpp-common/cfg-logging-error-reporting.hpp"
7cdc2bab 28
4164020e 29#define STREAM_NAME_PREFIX "stream-"
14f28187 30
4164020e
SM
31static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
32 size_t *buffer_sz, void *data)
7cdc2bab 33{
4164020e
SM
34 enum ctf_msg_iter_medium_status status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
35 lttng_live_stream_iterator *stream = (lttng_live_stream_iterator *) data;
36 struct lttng_live_trace *trace = stream->trace;
37 struct lttng_live_session *session = trace->session;
38 struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter;
39 uint64_t recv_len = 0;
40 uint64_t len_left;
41 uint64_t read_len;
42
43 BT_ASSERT(request_sz);
44
45 if (stream->has_stream_hung_up) {
46 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
47 goto end;
48 }
49
50 len_left = stream->base_offset + stream->len - stream->offset;
51 if (!len_left) {
52 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
53 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
54 goto end;
55 }
56
dba2a2af 57 read_len = MIN(request_sz, stream->buf.size());
4164020e 58 read_len = MIN(read_len, len_left);
dba2a2af 59 status = lttng_live_get_stream_bytes(live_msg_iter, stream, stream->buf.data(), stream->offset,
4164020e 60 read_len, &recv_len);
dba2a2af 61 *buffer_addr = stream->buf.data();
4164020e
SM
62 *buffer_sz = recv_len;
63 stream->offset += recv_len;
4a39caef 64end:
4164020e 65 return status;
7cdc2bab
MD
66}
67
4164020e 68static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id, void *data)
7cdc2bab 69{
4164020e 70 lttng_live_stream_iterator *lttng_live_stream = (lttng_live_stream_iterator *) data;
27a14e13 71 const bt2_common::LogCfg& logCfg = lttng_live_stream->logCfg;
4164020e
SM
72
73 if (!lttng_live_stream->stream) {
74 uint64_t stream_class_id = bt_stream_class_get_id(stream_class);
75
27a14e13
SM
76 BT_CLOGI("Creating stream %s (ID: %" PRIu64 ") out of stream "
77 "class %" PRId64,
dff1c223 78 lttng_live_stream->name.c_str(), stream_id, stream_class_id);
4164020e 79
5f16c381
SM
80 bt_stream *stream;
81
4164020e
SM
82 if (stream_id < 0) {
83 /*
84 * No stream instance ID in the stream. It's possible
85 * to encounter this situation with older version of
86 * LTTng. In these cases, use the viewer_stream_id that
87 * is unique for a live viewer session.
88 */
5f16c381
SM
89 stream = bt_stream_create_with_id(stream_class,
90 (*lttng_live_stream->trace->trace)->libObjPtr(),
91 lttng_live_stream->viewer_stream_id);
4164020e 92 } else {
5f16c381
SM
93 stream = bt_stream_create_with_id(stream_class,
94 (*lttng_live_stream->trace->trace)->libObjPtr(),
95 (uint64_t) stream_id);
4164020e
SM
96 }
97
5f16c381 98 if (!stream) {
27a14e13
SM
99 BT_CLOGE_APPEND_CAUSE("Cannot create stream %s (stream class ID "
100 "%" PRId64 ", stream ID %" PRIu64 ")",
dff1c223 101 lttng_live_stream->name.c_str(), stream_class_id, stream_id);
5f16c381 102 return nullptr;
4164020e
SM
103 }
104
5f16c381
SM
105 lttng_live_stream->stream = bt2::Stream::Shared::createWithoutRef(stream);
106
dff1c223 107 (*lttng_live_stream->stream)->name(lttng_live_stream->name);
4164020e 108 }
7cdc2bab 109
5f16c381 110 return (*lttng_live_stream->stream)->libObjPtr();
7cdc2bab
MD
111}
112
18a1979b 113static struct ctf_msg_iter_medium_ops medops = {
4164020e
SM
114 medop_request_bytes,
115 nullptr,
116 nullptr,
117 medop_borrow_stream,
7cdc2bab
MD
118};
119
120BT_HIDDEN
4164020e
SM
121enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
122 bt_self_message_iterator *self_msg_iter)
7cdc2bab 123{
4164020e
SM
124 struct lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
125 uint64_t trace_idx, stream_iter_idx;
27a14e13 126 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e
SM
127
128 if (!session->lazy_stream_msg_init) {
129 return LTTNG_LIVE_ITERATOR_STATUS_OK;
130 }
131
27a14e13
SM
132 BT_CLOGD("Lazily initializing self message iterator for live session: "
133 "session-id=%" PRIu64 ", self-msg-iter-addr=%p",
134 session->id, self_msg_iter);
4164020e
SM
135
136 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
137 struct lttng_live_trace *trace =
138 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
139
140 for (stream_iter_idx = 0; stream_iter_idx < trace->stream_iterators->len;
141 stream_iter_idx++) {
142 struct ctf_trace_class *ctf_tc;
143 struct lttng_live_stream_iterator *stream_iter =
144 (lttng_live_stream_iterator *) g_ptr_array_index(trace->stream_iterators,
145 stream_iter_idx);
146
147 if (stream_iter->msg_iter) {
148 continue;
149 }
65381cf3 150 ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
27a14e13
SM
151 BT_CLOGD("Creating CTF message iterator: "
152 "session-id=%" PRIu64 ", ctf-tc-addr=%p, "
153 "stream-iter-name=%s, self-msg-iter-addr=%p",
dff1c223 154 session->id, ctf_tc, stream_iter->name.c_str(), self_msg_iter);
27a14e13 155 stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops,
8469ed41 156 stream_iter, self_msg_iter, logCfg);
4164020e 157 if (!stream_iter->msg_iter) {
27a14e13 158 BT_CLOGE_APPEND_CAUSE("Failed to create CTF message iterator");
4164020e
SM
159 goto error;
160 }
161 }
162 }
163
164 session->lazy_stream_msg_init = false;
165
166 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
167
168error:
4164020e 169 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
7cdc2bab
MD
170}
171
172BT_HIDDEN
4164020e
SM
173struct lttng_live_stream_iterator *
174lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
175 uint64_t stream_id, bt_self_message_iterator *self_msg_iter)
7cdc2bab 176{
4164020e
SM
177 struct lttng_live_component *lttng_live;
178 struct lttng_live_trace *trace;
dff1c223 179 std::stringstream nameSs;
4164020e
SM
180
181 BT_ASSERT(session);
182 BT_ASSERT(session->lttng_live_msg_iter);
183 BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp);
27a14e13
SM
184
185 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e
SM
186
187 lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
188
27a14e13 189 lttng_live_stream_iterator *stream_iter = new lttng_live_stream_iterator {logCfg};
4164020e
SM
190 trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
191 if (!trace) {
27a14e13 192 BT_CLOGE_APPEND_CAUSE("Failed to borrow CTF trace.");
4164020e
SM
193 goto error;
194 }
195
196 stream_iter->trace = trace;
197 stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
198 stream_iter->viewer_stream_id = stream_id;
199
200 stream_iter->ctf_stream_class_id.is_set = false;
201 stream_iter->ctf_stream_class_id.value = UINT64_MAX;
202
203 stream_iter->last_inactivity_ts.is_set = false;
204 stream_iter->last_inactivity_ts.value = 0;
205
206 if (trace->trace) {
207 struct ctf_trace_class *ctf_tc =
65381cf3 208 ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
4164020e 209 BT_ASSERT(!stream_iter->msg_iter);
27a14e13 210 stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops,
8469ed41 211 stream_iter, self_msg_iter, logCfg);
4164020e 212 if (!stream_iter->msg_iter) {
27a14e13 213 BT_CLOGE_APPEND_CAUSE("Failed to create CTF message iterator");
4164020e
SM
214 goto error;
215 }
216 }
dba2a2af 217 stream_iter->buf.resize(lttng_live->max_query_size);
4164020e 218
dff1c223
SM
219 nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
220 stream_iter->name = nameSs.str();
4164020e
SM
221 g_ptr_array_add(trace->stream_iterators, stream_iter);
222
223 /* Track the number of active stream iterator. */
224 session->lttng_live_msg_iter->active_stream_iter++;
225
226 goto end;
7cdc2bab 227error:
4164020e
SM
228 lttng_live_stream_iterator_destroy(stream_iter);
229 stream_iter = NULL;
7cdc2bab 230end:
4164020e 231 return stream_iter;
7cdc2bab
MD
232}
233
234BT_HIDDEN
4164020e 235void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream_iter)
7cdc2bab 236{
4164020e
SM
237 if (!stream_iter) {
238 return;
239 }
7cdc2bab 240
4164020e 241 bt_message_put_ref(stream_iter->current_msg);
14f28187 242
4164020e
SM
243 /* Track the number of active stream iterator. */
244 stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--;
14f28187 245
6269f212 246 delete stream_iter;
7cdc2bab 247}
This page took 0.071554 seconds and 5 git commands to generate.