src.ctf.lttng-live: make lttng_live_session::traces an std::vector
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
c802cacb 11#include <glib.h>
14f28187
FD
12#include <unistd.h>
13
578e048b 14#include "common/assert.h"
0f5c5d5c 15#include "cpp-common/bt2c/fmt.hpp"
6838d8aa 16#include "cpp-common/bt2c/glib-up.hpp"
db00b877 17#include "cpp-common/bt2c/vector.hpp"
ef9e5f5d 18#include "cpp-common/bt2s/make-unique.hpp"
0f5c5d5c 19#include "cpp-common/vendor/fmt/format.h"
f3bc2010 20
376fc2bd 21#include "plugins/common/muxing/muxing.h"
80aff5ef 22#include "plugins/common/param-validation/param-validation.h"
376fc2bd 23
087cd0f5 24#include "data-stream.hpp"
087cd0f5 25#include "lttng-live.hpp"
c802cacb 26#include "metadata.hpp"
7cdc2bab 27
4164020e
SM
28#define MAX_QUERY_SIZE (256 * 1024)
29#define URL_PARAM "url"
30#define INPUTS_PARAM "inputs"
31#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
32#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
33#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
34#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 35
34533ae0 36void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 37 enum lttng_live_stream_state new_state)
34533ae0 38{
0f5c5d5c
SM
39 BT_CPPLOGD_SPEC(stream_iter->logger,
40 "Setting live stream iterator state: viewer-stream-id={}, "
41 "old-state={}, new-state={}",
42 stream_iter->viewer_stream_id, stream_iter->state, new_state);
34533ae0 43
4164020e 44 stream_iter->state = new_state;
34533ae0
FD
45}
46
4164020e
SM
47#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
48 do { \
0f5c5d5c
SM
49 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
50 "Live stream iterator state={}, " \
51 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
52 "curr-inact-ts={}", \
53 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
54 (live_stream_iter)->last_inactivity_ts.value, \
55 (live_stream_iter)->current_inactivity_ts); \
4164020e 56 } while (0);
6f79a7cf 57
9b4f9b42 58bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 59{
4164020e 60 bool ret;
6f79a7cf 61
4164020e
SM
62 if (!msg_iter) {
63 ret = false;
64 goto end;
65 }
7cdc2bab 66
4164020e 67 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 68
14f28187 69end:
4164020e 70 return ret;
7cdc2bab
MD
71}
72
4164020e
SM
73static struct lttng_live_trace *
74lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 75{
6dcda123 76 for (lttng_live_trace::UP& trace : session->traces) {
4164020e 77 if (trace->id == trace_id) {
6dcda123 78 return trace.get();
4164020e
SM
79 }
80 }
14f28187 81
6dcda123 82 return nullptr;
7cdc2bab
MD
83}
84
4164020e
SM
85static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
86 uint64_t trace_id)
7cdc2bab 87{
0f5c5d5c
SM
88 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
89 trace_id);
4164020e 90
deb86334
SM
91 auto trace = bt2s::make_unique<lttng_live_trace>(session->logger);
92
4164020e
SM
93 trace->session = session;
94 trace->id = trace_id;
4164020e 95 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
4164020e 96
deb86334 97 const auto ret = trace.get();
6dcda123 98 session->traces.emplace_back(std::move(trace));
deb86334 99 return ret;
7cdc2bab
MD
100}
101
4164020e
SM
102struct lttng_live_trace *
103lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
104 uint64_t trace_id)
7cdc2bab 105{
4164020e 106 struct lttng_live_trace *trace;
7cdc2bab 107
4164020e
SM
108 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
109 if (trace) {
110 goto end;
111 }
7cdc2bab 112
4164020e
SM
113 /* The session is the owner of the newly created trace. */
114 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 115
14f28187 116end:
4164020e 117 return trace;
7cdc2bab
MD
118}
119
4164020e
SM
120int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
121 const char *hostname, const char *session_name)
7cdc2bab 122{
0f5c5d5c
SM
123 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
124 "Adding live session: "
125 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
126 session_id, hostname, session_name);
4164020e 127
0f5c5d5c 128 lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
4164020e
SM
129 session->self_comp = lttng_live_msg_iter->self_comp;
130 session->id = session_id;
4164020e
SM
131 session->lttng_live_msg_iter = lttng_live_msg_iter;
132 session->new_streams_needed = true;
133 session->hostname = g_string_new(hostname);
134 BT_ASSERT(session->hostname);
135
136 session->session_name = g_string_new(session_name);
137 BT_ASSERT(session->session_name);
138
139 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
afb0f12b
SM
140
141 return 0;
7cdc2bab
MD
142}
143
4164020e 144static void lttng_live_destroy_session(struct lttng_live_session *session)
7cdc2bab 145{
4164020e
SM
146 if (!session) {
147 goto end;
148 }
149
0f5c5d5c
SM
150 BT_CPPLOGD_SPEC(session->logger,
151 "Destroying live session: "
152 "session-id={}, session-name=\"{}\"",
153 session->id, session->session_name->str);
4164020e
SM
154 if (session->id != -1ULL) {
155 if (lttng_live_session_detach(session)) {
156 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
157 /* Old relayd cannot detach sessions. */
0f5c5d5c
SM
158 BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
159 session->id);
4164020e
SM
160 }
161 }
162 session->id = -1ULL;
163 }
164
4164020e
SM
165 if (session->hostname) {
166 g_string_free(session->hostname, TRUE);
167 }
afb0f12b 168
4164020e
SM
169 if (session->session_name) {
170 g_string_free(session->session_name, TRUE);
171 }
afb0f12b
SM
172
173 delete session;
14f28187
FD
174
175end:
4164020e 176 return;
7cdc2bab
MD
177}
178
4164020e 179static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 180{
4164020e
SM
181 if (!lttng_live_msg_iter) {
182 goto end;
183 }
7cdc2bab 184
4164020e
SM
185 if (lttng_live_msg_iter->sessions) {
186 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
187 }
14f28187 188
4164020e
SM
189 if (lttng_live_msg_iter->viewer_connection) {
190 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
191 }
192 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
193 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 194
4164020e
SM
195 /* All stream iterators must be destroyed at this point. */
196 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
197 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 198
afb0f12b 199 delete lttng_live_msg_iter;
14f28187
FD
200
201end:
4164020e 202 return;
14f28187
FD
203}
204
14f28187
FD
205void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
206{
4164020e 207 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 208
4164020e 209 BT_ASSERT(self_msg_iter);
14f28187 210
4164020e
SM
211 lttng_live_msg_iter =
212 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
213 BT_ASSERT(lttng_live_msg_iter);
214 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
215}
216
4164020e
SM
217static enum lttng_live_iterator_status
218lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 219{
4164020e
SM
220 switch (lttng_live_stream->state) {
221 case LTTNG_LIVE_STREAM_QUIESCENT:
222 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
223 break;
224 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
225 /* Invalid state. */
0f5c5d5c 226 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
4164020e
SM
227 bt_common_abort();
228 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
229 /* Invalid state. */
0f5c5d5c 230 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
4164020e
SM
231 bt_common_abort();
232 case LTTNG_LIVE_STREAM_EOF:
233 break;
234 }
235 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
236}
237
238/*
f93afbf9
FD
239 * For active no data stream, fetch next index. As a result of that it can
240 * become either:
241 * - quiescent: won't have events for a bit,
242 * - have data: need to get that data and produce the event,
243 * - have no data on this stream at this point: need to retry (AGAIN) or return
244 * EOF.
7cdc2bab 245 */
4164020e
SM
246static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
247 struct lttng_live_msg_iter *lttng_live_msg_iter,
248 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 249{
4164020e
SM
250 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
251 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
252 struct packet_index index;
253
254 if (lttng_live_stream->trace->metadata_stream_state ==
255 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
e28ca558
SM
256 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
257 "Need to get an update for the metadata stream before proceeding "
258 "further with this stream: stream-name=\"{}\"",
259 lttng_live_stream->name);
4164020e
SM
260 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
261 goto end;
262 }
263
264 if (lttng_live_stream->trace->session->new_streams_needed) {
e28ca558
SM
265 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
266 "Need to get an update of all streams before proceeding further "
267 "with this stream: stream-name=\"{}\"",
268 lttng_live_stream->name);
4164020e
SM
269 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
270 goto end;
271 }
272
273 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
274 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
275 goto end;
276 }
277 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
278 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
279 goto end;
280 }
281
282 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
283
284 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
285 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
286 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
287
288 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
289 /*
5cb27175
JG
290 * Because the stream is in the QUIESCENT_NO_DATA
291 * state, we can assert that the last_inactivity_ts was
292 * set and can be safely used in the `if` above.
293 */
4164020e
SM
294 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
295
296 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
297 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
298 } else {
299 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
300 }
301 goto end;
302 }
303
304 lttng_live_stream->base_offset = index.offset;
305 lttng_live_stream->offset = index.offset;
306 lttng_live_stream->len = index.packet_size / CHAR_BIT;
307
0f5c5d5c
SM
308 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
309 "Setting live stream reading info: stream-name=\"{}\", "
310 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
e28ca558 311 lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
0f5c5d5c
SM
312 lttng_live_stream->base_offset, lttng_live_stream->offset,
313 lttng_live_stream->len);
f93afbf9 314
7cdc2bab 315end:
4164020e
SM
316 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
317 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
318 }
319 return ret;
7cdc2bab
MD
320}
321
322/*
14f28187
FD
323 * Creation of the message requires the ctf trace class to be created
324 * beforehand, but the live protocol gives us all streams (including metadata)
325 * at once. So we split it in three steps: getting streams, getting metadata
326 * (which creates the ctf trace class), and then creating the per-stream
327 * messages.
7cdc2bab 328 */
4164020e
SM
329static enum lttng_live_iterator_status
330lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
331 struct lttng_live_session *session)
7cdc2bab 332{
4164020e 333 enum lttng_live_iterator_status status;
4164020e
SM
334
335 if (!session->attached) {
0f5c5d5c
SM
336 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
337 session->id);
4164020e
SM
338 enum lttng_live_viewer_status attach_status =
339 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
340 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
341 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
342 /*
343 * Clear any causes appended in
344 * `lttng_live_attach_session()` as we want to
345 * return gracefully since the graph was
346 * cancelled.
347 */
348 bt_current_thread_clear_error();
349 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
350 } else {
351 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
352 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
353 "Error attaching to LTTng live session");
4164020e
SM
354 }
355 goto end;
356 }
357 }
358
0f5c5d5c
SM
359 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
360 "Updating all data streams: "
361 "session-id={}, session-name=\"{}\"",
362 session->id, session->session_name->str);
4164020e
SM
363
364 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
365 switch (status) {
366 case LTTNG_LIVE_ITERATOR_STATUS_OK:
367 break;
368 case LTTNG_LIVE_ITERATOR_STATUS_END:
369 /*
f8a11f24
SM
370 * We received a `_END` from the `_get_new_streams()` function,
371 * which means no more data will ever be received from the data
372 * streams of this session. But it's possible that the metadata
373 * is incomplete.
374 * The live protocol guarantees that we receive all the
375 * metadata needed before we receive data streams needing it.
376 * But it's possible to receive metadata NOT needed by
377 * data streams after the session was closed. For example, this
378 * could happen if a new event is registered and the session is
379 * stopped before any tracepoint for that event is actually
380 * fired.
381 */
0f5c5d5c
SM
382 BT_CPPLOGD_SPEC(
383 lttng_live_msg_iter->logger,
a4f118a3 384 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
0f5c5d5c 385 "session-id={}, session-name=\"{}\"",
a4f118a3
FD
386 session->id, session->session_name->str);
387 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
388 break;
389 default:
4164020e
SM
390 goto end;
391 }
392
0f5c5d5c
SM
393 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
394 "Updating metadata stream for session: "
395 "session-id={}, session-name=\"{}\"",
396 session->id, session->session_name->str);
a4f118a3 397
6dcda123
SM
398 for (lttng_live_trace::UP& trace : session->traces) {
399 status = lttng_live_metadata_update(trace.get());
4164020e
SM
400 switch (status) {
401 case LTTNG_LIVE_ITERATOR_STATUS_END:
402 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e
SM
403 break;
404 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
405 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
406 goto end;
407 default:
0f5c5d5c
SM
408 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
409 "Error updating trace metadata: "
410 "stream-iter-status={}, trace-id={}",
411 status, trace->id);
4164020e
SM
412 goto end;
413 }
414 }
415
416 /*
417 * Now that we have the metadata we can initialize the downstream
418 * iterator.
419 */
420 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
421
422end:
4164020e 423 return status;
7cdc2bab
MD
424}
425
4164020e
SM
426static void
427lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 428{
6dcda123 429 uint64_t session_idx;
4164020e
SM
430
431 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
432 struct lttng_live_session *session =
433 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
0f5c5d5c
SM
434 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
435 "Force marking session as needing new streams: "
436 "session-id={}",
437 session->id);
4164020e 438 session->new_streams_needed = true;
6dcda123 439 for (lttng_live_trace::UP& trace : session->traces) {
0f5c5d5c
SM
440 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
441 "Force marking trace metadata state as needing an update: "
442 "session-id={}, trace-id={}",
443 session->id, trace->id);
4164020e
SM
444
445 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
446
447 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
448 }
449 }
7cdc2bab
MD
450}
451
4164020e
SM
452static enum lttng_live_iterator_status
453lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 454{
4164020e
SM
455 enum lttng_live_iterator_status status;
456 enum lttng_live_viewer_status viewer_status;
4164020e
SM
457 uint64_t session_idx = 0, nr_sessions_opened = 0;
458 struct lttng_live_session *session;
459 enum session_not_found_action sess_not_found_act =
460 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
461
0f5c5d5c
SM
462 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
463 "Update data and metadata of all sessions: "
464 "live-msg-iter-addr={}",
465 fmt::ptr(lttng_live_msg_iter));
4164020e
SM
466 /*
467 * In a remotely distant future, we could add a "new
468 * session" flag to the protocol, which would tell us that we
469 * need to query for new sessions even though we have sessions
470 * currently ongoing.
471 */
472 if (lttng_live_msg_iter->sessions->len == 0) {
473 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
0f5c5d5c
SM
474 BT_CPPLOGD_SPEC(
475 lttng_live_msg_iter->logger,
4164020e
SM
476 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
477 status = LTTNG_LIVE_ITERATOR_STATUS_END;
478 goto end;
479 } else {
0f5c5d5c
SM
480 BT_CPPLOGD_SPEC(
481 lttng_live_msg_iter->logger,
4164020e
SM
482 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
483 /*
484 * Retry to create a viewer session for the requested
485 * session name.
486 */
487 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
488 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
489 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
490 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
491 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
492 "Error creating LTTng live viewer session");
4164020e
SM
493 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
494 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
495 } else {
496 bt_common_abort();
497 }
498 goto end;
499 }
500 }
501 }
502
503 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
504 session =
505 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
506 status = lttng_live_get_session(lttng_live_msg_iter, session);
507 switch (status) {
508 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 509 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
510 /*
511 * A session returned `_END`. Other sessions may still
512 * be active so we override the status and continue
513 * looping if needed.
514 */
4164020e
SM
515 break;
516 default:
517 goto end;
518 }
519 if (!session->closed) {
520 nr_sessions_opened++;
521 }
522 }
523
524 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
525 status = LTTNG_LIVE_ITERATOR_STATUS_END;
526 } else {
527 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
528 }
f79c2d7a
FD
529
530end:
4164020e 531 return status;
7cdc2bab
MD
532}
533
4164020e
SM
534static enum lttng_live_iterator_status
535emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7
SM
536 struct lttng_live_stream_iterator *stream_iter,
537 bt2::ConstMessage::Shared& message, uint64_t timestamp)
7cdc2bab 538{
4164020e 539 BT_ASSERT(stream_iter->trace->clock_class);
0f5c5d5c
SM
540 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
541 "Emitting inactivity message for stream: ctf-stream-id={}, "
542 "viewer-stream-id={}, timestamp={}",
543 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
544 timestamp);
4164020e 545
35e432d7
SM
546 const auto msg = bt_message_message_iterator_inactivity_create(
547 lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
548
4164020e 549 if (!msg) {
0f5c5d5c
SM
550 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
551 "Error emitting message iterator inactivity message");
35e432d7 552 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
553 }
554
35e432d7
SM
555 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
556 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
557}
558
4164020e
SM
559static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
560 struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7 561 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
7cdc2bab 562{
4164020e
SM
563 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
564
565 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
566 return LTTNG_LIVE_ITERATOR_STATUS_OK;
567 }
568
569 /*
570 * Check if we already sent an inactivty message downstream for this
571 * `current_inactivity_ts` value.
572 */
573 if (lttng_live_stream->last_inactivity_ts.is_set &&
574 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
575 lttng_live_stream_iterator_set_state(lttng_live_stream,
576 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
577
578 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
579 goto end;
580 }
581
582 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
583 lttng_live_stream->current_inactivity_ts);
584
585 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
586 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 587end:
4164020e 588 return ret;
14f28187
FD
589}
590
7d91f1ac 591static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
4164020e 592 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 593{
4164020e
SM
594 const bt_clock_snapshot *clock_snapshot = NULL;
595 int ret = 0;
4164020e
SM
596
597 BT_ASSERT_DBG(msg);
598 BT_ASSERT_DBG(ts_ns);
599
0f5c5d5c
SM
600 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
601 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
602 "last-msg-ts={}",
603 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
4164020e
SM
604
605 switch (bt_message_get_type(msg)) {
606 case BT_MESSAGE_TYPE_EVENT:
4164020e
SM
607 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
608 break;
609 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
4164020e
SM
610 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
611 break;
612 case BT_MESSAGE_TYPE_PACKET_END:
4164020e
SM
613 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
614 break;
615 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
4164020e
SM
616 clock_snapshot =
617 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
618 break;
619 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
4164020e
SM
620 clock_snapshot =
621 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
622 break;
623 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
624 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
625 break;
626 default:
627 /* All the other messages have a higher priority */
0f5c5d5c
SM
628 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
629 "Message has no timestamp: using the last message timestamp.");
4164020e
SM
630 *ts_ns = last_msg_ts_ns;
631 goto end;
632 }
633
4164020e
SM
634 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
635 if (ret) {
0f5c5d5c
SM
636 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
637 "Cannot get nanoseconds from Epoch of clock snapshot: "
638 "clock-snapshot-addr={}",
639 fmt::ptr(clock_snapshot));
4164020e
SM
640 goto error;
641 }
642
643 goto end;
14f28187 644
14f28187 645error:
4164020e 646 ret = -1;
7cdc2bab 647
7cdc2bab 648end:
4164020e 649 if (ret == 0) {
0f5c5d5c
SM
650 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
651 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
652 "last-msg-ts={}, ts={}",
653 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
4164020e
SM
654 }
655
656 return ret;
7cdc2bab
MD
657}
658
4164020e
SM
659static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
660 struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7 661 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
7cdc2bab 662{
4164020e 663 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 664 enum ctf_msg_iter_status status;
6dcda123 665 uint64_t session_idx;
4164020e
SM
666
667 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
668 struct lttng_live_session *session =
669 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
670
671 if (session->new_streams_needed) {
0f5c5d5c
SM
672 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
673 "Need an update for streams: "
674 "session-id={}",
675 session->id);
4164020e
SM
676 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
677 goto end;
678 }
6dcda123 679 for (lttng_live_trace::UP& trace : session->traces) {
4164020e 680 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
0f5c5d5c
SM
681 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
682 "Need an update for metadata stream: "
683 "session-id={}, trace-id={}",
684 session->id, trace->id);
4164020e
SM
685 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
686 goto end;
687 }
688 }
689 }
690
691 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
692 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
693 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
694 "Invalid state of live stream iterator"
695 "stream-iter-status={}",
696 lttng_live_stream->state);
4164020e
SM
697 goto end;
698 }
699
35e432d7
SM
700 const bt_message *msg;
701 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
4164020e
SM
702 switch (status) {
703 case CTF_MSG_ITER_STATUS_EOF:
704 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
705 break;
706 case CTF_MSG_ITER_STATUS_OK:
35e432d7 707 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
4164020e
SM
708 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
709 break;
710 case CTF_MSG_ITER_STATUS_AGAIN:
711 /*
712 * Continue immediately (end of packet). The next
713 * get_index may return AGAIN to delay the following
714 * attempt.
715 */
716 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
717 break;
718 case CTF_MSG_ITER_STATUS_ERROR:
719 default:
720 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
721 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
722 "CTF message iterator failed to get next message: "
723 "msg-iter={}, msg-iter-status={}",
724 fmt::ptr(lttng_live_stream->msg_iter), status);
4164020e
SM
725 break;
726 }
14f28187
FD
727
728end:
4164020e 729 return ret;
7cdc2bab
MD
730}
731
4164020e
SM
732static enum lttng_live_iterator_status
733lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
734 struct lttng_live_stream_iterator *stream_iter,
35e432d7 735 bt2::ConstMessage::Shared& curr_msg)
4a39caef 736{
4164020e 737 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 738
0f5c5d5c
SM
739 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
740 "Closing live stream iterator: stream-name=\"{}\", "
741 "viewer-stream-id={}",
e28ca558 742 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
743
744 /*
745 * The viewer has hung up on us so we are closing the stream. The
746 * `ctf_msg_iter` should simply realize that it needs to close the
747 * stream properly by emitting the necessary stream end message.
748 */
35e432d7 749 const bt_message *msg;
4164020e 750 enum ctf_msg_iter_status status =
35e432d7 751 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
4164020e
SM
752
753 if (status == CTF_MSG_ITER_STATUS_ERROR) {
0f5c5d5c
SM
754 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
755 "Error getting the next message from CTF message iterator");
4164020e
SM
756 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
757 goto end;
758 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
0f5c5d5c
SM
759 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
760 "Reached the end of the live stream iterator.");
4164020e
SM
761 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
762 goto end;
763 }
764
765 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef 766
35e432d7
SM
767 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
768
4a39caef 769end:
4164020e 770 return live_status;
4a39caef
FD
771}
772
7cdc2bab
MD
773/*
774 * helper function:
775 * handle_no_data_streams()
776 * retry:
777 * - for each ACTIVE_NO_DATA stream:
778 * - query relayd for stream data, or quiescence info.
779 * - if need metadata, get metadata, goto retry.
780 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
781 * - if quiescent, move to QUIESCENT streams
782 * - if fetched data, move to ACTIVE_DATA streams
783 * (at this point each stream either has data, or is quiescent)
784 *
785 *
786 * iterator_next:
787 * handle_new_streams_and_metadata()
788 * - query relayd for known streams, add them as ACTIVE_NO_DATA
789 * - query relayd for metadata
790 *
791 * call handle_active_no_data_streams()
792 *
793 * handle_quiescent_streams()
794 * - if at least one stream is ACTIVE_DATA:
795 * - peek stream event with lowest timestamp -> next_ts
796 * - for each quiescent stream
797 * - if next_ts >= quiescent end
798 * - set state to ACTIVE_NO_DATA
799 * - else
800 * - for each quiescent stream
801 * - set state to ACTIVE_NO_DATA
802 *
803 * call handle_active_no_data_streams()
804 *
805 * handle_active_data_streams()
806 * - if at least one stream is ACTIVE_DATA:
807 * - get stream event with lowest timestamp from heap
d6e69534 808 * - make that stream event the current message.
7cdc2bab
MD
809 * - move this stream heap position to its next event
810 * - if we need to fetch data from relayd, move
811 * stream to ACTIVE_NO_DATA.
812 * - return OK
813 * - return AGAIN
814 *
815 * end criterion: ctrl-c on client. If relayd exits or the session
816 * closes on the relay daemon side, we keep on waiting for streams.
817 * Eventually handle --end timestamp (also an end criterion).
818 *
819 * When disconnected from relayd: try to re-connect endlessly.
820 */
4164020e
SM
821static enum lttng_live_iterator_status
822lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
823 struct lttng_live_stream_iterator *stream_iter,
35e432d7 824 bt2::ConstMessage::Shared& curr_msg)
7cdc2bab 825{
4164020e
SM
826 enum lttng_live_iterator_status live_status;
827
0f5c5d5c
SM
828 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
829 "Advancing live stream iterator until next message if possible: "
830 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 831 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
832
833 if (stream_iter->has_stream_hung_up) {
834 /*
835 * The stream has hung up and the stream was properly closed
836 * during the last call to the current function. Return _END
837 * status now so that this stream iterator is removed for the
838 * stream iterator list.
839 */
840 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
841 goto end;
842 }
4a39caef 843
7cdc2bab 844retry:
4164020e
SM
845 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
846
847 /*
848 * Make sure we have the most recent metadata and possibly some new
849 * streams.
850 */
851 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
852 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
853 goto end;
854 }
855
856 live_status =
857 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
858 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
859 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
860 /*
861 * We overwrite `live_status` since `curr_msg` is
862 * likely set to a valid message in this function.
863 */
864 live_status =
865 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
866 }
867 goto end;
868 }
869
870 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
871 stream_iter, curr_msg);
872 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
35e432d7 873 BT_ASSERT(!curr_msg);
4164020e
SM
874 goto end;
875 }
35e432d7 876 if (curr_msg) {
4164020e
SM
877 goto end;
878 }
879 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
880 stream_iter, curr_msg);
881 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
35e432d7 882 BT_ASSERT(!curr_msg);
4164020e 883 }
7cdc2bab
MD
884
885end:
4164020e 886 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
0f5c5d5c
SM
887 BT_CPPLOGD_SPEC(
888 lttng_live_msg_iter->logger,
889 "Ask the relay daemon for an updated view of the data and metadata streams");
4164020e
SM
890 goto retry;
891 }
14f28187 892
0f5c5d5c
SM
893 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
894 "Returning from advancing live stream iterator: status={}, "
895 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 896 live_status, stream_iter->name, stream_iter->viewer_stream_id);
f93afbf9 897
4164020e 898 return live_status;
7cdc2bab
MD
899}
900
35e432d7 901static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
8ec4d5ff 902{
35e432d7
SM
903 return msg.type() == bt2::MessageType::DiscardedEvents ||
904 msg.type() == bt2::MessageType::DiscardedPackets;
8ec4d5ff
JG
905}
906
4164020e
SM
907static enum lttng_live_iterator_status
908adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
35e432d7 909 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
4164020e 910 uint64_t new_begin_ts)
8ec4d5ff 911{
4164020e
SM
912 enum bt_property_availability availability;
913 const bt_clock_snapshot *clock_snapshot;
914 uint64_t end_ts;
915 uint64_t count;
916
917 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
918 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
919
920 availability = bt_message_discarded_packets_get_count(msg_in, &count);
921 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
922
35e432d7 923 const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
4164020e 924 iter, stream, new_begin_ts, end_ts);
35e432d7
SM
925
926 if (!msg) {
927 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
928 }
929
35e432d7
SM
930 bt_message_discarded_packets_set_count(msg, count);
931 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
932 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
933}
934
4164020e
SM
935static enum lttng_live_iterator_status
936adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
35e432d7 937 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
4164020e 938 uint64_t new_begin_ts)
8ec4d5ff 939{
4164020e
SM
940 enum bt_property_availability availability;
941 const bt_clock_snapshot *clock_snapshot;
942 uint64_t end_ts;
943 uint64_t count;
944
945 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
946 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
947
948 availability = bt_message_discarded_events_get_count(msg_in, &count);
949 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
950
35e432d7 951 const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
4164020e 952 iter, stream, new_begin_ts, end_ts);
35e432d7
SM
953
954 if (!msg) {
955 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
956 }
957
35e432d7
SM
958 bt_message_discarded_events_set_count(msg, count);
959 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
960 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
961}
962
4164020e
SM
963static enum lttng_live_iterator_status
964handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
965 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
35e432d7 966 const bt2::ConstMessage& late_msg)
285951be 967{
4164020e
SM
968 const bt_clock_class *clock_class;
969 const bt_stream_class *stream_class;
970 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
971 int64_t last_inactivity_ts_ns;
972 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
973 enum lttng_live_iterator_status adjust_status;
35e432d7 974 bt2::ConstMessage::Shared adjusted_message;
4164020e
SM
975
976 /*
977 * The timestamp of the current message is before the last message sent
978 * by this component. We CANNOT send it as is.
979 *
980 * The only expected scenario in which that could happen is the
e7401568 981 * following, everything else is a bug in this component, relay daemon,
4164020e
SM
982 * or CTF parser.
983 *
984 * Expected scenario: The CTF message iterator emitted discarded
985 * packets and discarded events with synthesized beginning and end
986 * timestamps from the bounds of the last known packet and the newly
987 * decoded packet header. The CTF message iterator is not aware of
988 * stream inactivity beacons. Hence, we have to adjust the beginning
989 * timestamp of those types of messages if a stream signalled its
990 * inactivity up until _after_ the last known packet's beginning
991 * timestamp.
992 *
993 * Otherwise, the monotonicity guarantee of message timestamps would
994 * not be preserved.
995 *
996 * In short, the only scenario in which it's okay and fixable to
997 * received a late message is when:
998 * 1. the late message is a discarded packets or discarded events
f8a11f24 999 * message,
4164020e
SM
1000 * 2. this stream produced an inactivity message downstream, and
1001 * 3. the timestamp of the late message is within the inactivity
f8a11f24 1002 * timespan we sent downstream through the inactivity message.
4164020e
SM
1003 */
1004
0f5c5d5c
SM
1005 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1006 "Handling late message on live stream iterator: "
1007 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 1008 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
1009
1010 if (!stream_iter->last_inactivity_ts.is_set) {
0f5c5d5c
SM
1011 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1012 "Invalid live stream state: "
1013 "have a late message when no inactivity message "
1014 "was ever sent for that stream.");
4164020e
SM
1015 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1016 goto end;
1017 }
1018
1019 if (!is_discarded_packet_or_event_message(late_msg)) {
0f5c5d5c
SM
1020 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1021 "Invalid live stream state: "
1022 "have a late message that is not a packet discarded or "
1023 "event discarded message: late-msg-type={}",
35e432d7 1024 late_msg.type());
4164020e
SM
1025 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1026 goto end;
1027 }
1028
0b68f2bc 1029 stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
4164020e
SM
1030 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1031
1032 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1033 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1034 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
0f5c5d5c
SM
1035 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1036 "Error converting last "
1037 "inactivity message timestamp to nanoseconds");
4164020e
SM
1038 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1039 goto end;
1040 }
1041
1042 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
0f5c5d5c
SM
1043 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1044 "Invalid live stream state: "
1045 "have a late message that is none included in a stream "
1046 "inactivity timespan: last-inactivity-ts-ns={}, "
1047 "late-msg-ts-ns={}",
1048 last_inactivity_ts_ns, late_msg_ts_ns);
4164020e
SM
1049 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1050 goto end;
1051 }
1052
1053 /*
1054 * We now know that it's okay for this message to be late, we can now
1055 * adjust its timestamp to ensure monotonicity.
1056 */
0f5c5d5c
SM
1057 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1058 "Adjusting the timestamp of late message: late-msg-type={}, "
1059 "msg-new-ts-ns={}",
35e432d7
SM
1060 late_msg.type(), stream_iter->last_inactivity_ts.value);
1061 switch (late_msg.type()) {
1062 case bt2::MessageType::DiscardedEvents:
4164020e 1063 adjust_status = adjust_discarded_events_message(
35e432d7
SM
1064 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1065 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e 1066 break;
35e432d7 1067 case bt2::MessageType::DiscardedPackets:
4164020e 1068 adjust_status = adjust_discarded_packets_message(
35e432d7
SM
1069 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1070 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e
SM
1071 break;
1072 default:
1073 bt_common_abort();
1074 }
1075
1076 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1077 stream_iter_status = adjust_status;
1078 goto end;
1079 }
1080
1081 BT_ASSERT_DBG(adjusted_message);
1082 stream_iter->current_msg = adjusted_message;
1083 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
cefd84f8 1084
285951be 1085end:
4164020e 1086 return stream_iter_status;
285951be
FD
1087}
1088
4164020e
SM
1089static enum lttng_live_iterator_status
1090next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1091 struct lttng_live_trace *live_trace,
1092 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1093{
4164020e 1094 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
4164020e 1095 enum lttng_live_iterator_status stream_iter_status;
4164020e
SM
1096 int64_t youngest_candidate_msg_ts = INT64_MAX;
1097 uint64_t stream_iter_idx;
1098
1099 BT_ASSERT_DBG(live_trace);
4164020e 1100
0f5c5d5c
SM
1101 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1102 "Finding the next stream iterator for trace: "
1103 "trace-id={}",
1104 live_trace->id);
4164020e
SM
1105 /*
1106 * Update the current message of every stream iterators of this trace.
1107 * The current msg of every stream must have a timestamp equal or
1108 * larger than the last message returned by this iterator. We must
1109 * ensure monotonicity.
1110 */
1111 stream_iter_idx = 0;
db00b877 1112 while (stream_iter_idx < live_trace->stream_iterators.size()) {
4164020e 1113 bool stream_iter_is_ended = false;
db00b877
SM
1114 lttng_live_stream_iterator *stream_iter =
1115 live_trace->stream_iterators[stream_iter_idx].get();
4164020e
SM
1116
1117 /*
1118 * If there is no current message for this stream, go fetch
1119 * one.
1120 */
1121 while (!stream_iter->current_msg) {
35e432d7 1122 bt2::ConstMessage::Shared msg;
4164020e
SM
1123 int64_t curr_msg_ts_ns = INT64_MAX;
1124
1125 stream_iter_status =
35e432d7 1126 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
4164020e
SM
1127
1128 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1129 stream_iter_is_ended = true;
1130 break;
1131 }
1132
1133 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1134 goto end;
1135 }
1136
1137 BT_ASSERT_DBG(msg);
1138
0f5c5d5c
SM
1139 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1140 "Live stream iterator returned message: msg-type={}, "
1141 "stream-name=\"{}\", viewer-stream-id={}",
35e432d7 1142 msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
1143
1144 /*
1145 * Get the timestamp in nanoseconds from origin of this
e7401568 1146 * message.
4164020e 1147 */
35e432d7
SM
1148 live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
1149 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
4164020e
SM
1150
1151 /*
1152 * Check if the message of the current live stream
1153 * iterator occurred at the exact same time or after the
1154 * last message returned by this component's message
1155 * iterator. If not, we need to handle it with care.
1156 */
1157 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
35e432d7 1158 stream_iter->current_msg = std::move(msg);
4164020e
SM
1159 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1160 } else {
1161 /*
1162 * We received a message from the past. This
1163 * may be fixable but it can also be an error.
1164 */
1165 stream_iter_status =
35e432d7 1166 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
4164020e 1167 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
0f5c5d5c
SM
1168 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1169 "Late message could not be handled correctly: "
1170 "lttng-live-msg-iter-addr={}, "
1171 "stream-name=\"{}\", "
1172 "curr-msg-ts={}, last-msg-ts={}",
e28ca558
SM
1173 fmt::ptr(lttng_live_msg_iter), stream_iter->name,
1174 curr_msg_ts_ns,
0f5c5d5c 1175 lttng_live_msg_iter->last_msg_ts_ns);
4164020e
SM
1176 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1177 goto end;
1178 }
1179 }
1180 }
1181
1182 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1183
1184 if (!stream_iter_is_ended) {
1185 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1186 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1187 /*
1188 * Update the current best candidate message
1189 * for the stream iterator of this live trace
1190 * to be forwarded downstream.
1191 */
1192 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1193 youngest_candidate_stream_iter = stream_iter;
1194 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1195 /*
1196 * Order the messages in an arbitrary but
1197 * deterministic way.
1198 */
1199 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1200 int ret = common_muxing_compare_messages(
35e432d7
SM
1201 stream_iter->current_msg->libObjPtr(),
1202 youngest_candidate_stream_iter->current_msg->libObjPtr());
4164020e
SM
1203 if (ret < 0) {
1204 /*
1205 * The `youngest_candidate_stream_iter->current_msg`
1206 * should go first. Update the next
1207 * iterator and the current timestamp.
1208 */
1209 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1210 youngest_candidate_stream_iter = stream_iter;
1211 } else if (ret == 0) {
1212 /*
1213 * Unable to pick which one should go
1214 * first.
1215 */
0f5c5d5c
SM
1216 BT_CPPLOGW_SPEC(
1217 lttng_live_msg_iter->logger,
4164020e 1218 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1219 "stream-iter-addr={}"
1220 "stream-iter-addr={}",
1221 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1222 }
1223 }
1224
1225 stream_iter_idx++;
1226 } else {
1227 /*
1228 * The live stream iterator has ended. That
1229 * iterator is removed from the array, but
1230 * there is no need to increment
1231 * stream_iter_idx as
1232 * g_ptr_array_remove_index_fast replaces the
1233 * removed element with the array's last
1234 * element.
1235 */
db00b877 1236 bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
4164020e
SM
1237 }
1238 }
1239
1240 if (youngest_candidate_stream_iter) {
1241 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1242 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1243 } else {
1244 /*
1245 * The only case where we don't have a candidate for this trace
1246 * is if we reached the end of all the iterators.
1247 */
db00b877 1248 BT_ASSERT(live_trace->stream_iterators.empty());
4164020e
SM
1249 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1250 }
14f28187
FD
1251
1252end:
4164020e 1253 return stream_iter_status;
14f28187
FD
1254}
1255
4164020e
SM
1256static enum lttng_live_iterator_status
1257next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1258 struct lttng_live_session *session,
1259 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1260{
4164020e
SM
1261 enum lttng_live_iterator_status stream_iter_status;
1262 uint64_t trace_idx = 0;
1263 int64_t youngest_candidate_msg_ts = INT64_MAX;
1264 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1265
0f5c5d5c
SM
1266 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1267 "Finding the next stream iterator for session: "
1268 "session-id={}",
1269 session->id);
4164020e
SM
1270 /*
1271 * Make sure we are attached to the session and look for new streams
1272 * and metadata.
1273 */
1274 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1275 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1276 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1277 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1278 goto end;
1279 }
1280
6dcda123 1281 while (trace_idx < session->traces.size()) {
4164020e
SM
1282 bool trace_is_ended = false;
1283 struct lttng_live_stream_iterator *stream_iter;
6dcda123 1284 lttng_live_trace *trace = session->traces[trace_idx].get();
4164020e
SM
1285
1286 stream_iter_status =
1287 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1288 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1289 /*
1290 * All the live stream iterators for this trace are
1291 * ENDed. Remove the trace from this session.
1292 */
1293 trace_is_ended = true;
1294 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1295 goto end;
1296 }
1297
1298 if (!trace_is_ended) {
1299 BT_ASSERT_DBG(stream_iter);
1300
1301 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1302 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1303 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1304 youngest_candidate_stream_iter = stream_iter;
1305 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1306 /*
1307 * Order the messages in an arbitrary but
1308 * deterministic way.
1309 */
1310 int ret = common_muxing_compare_messages(
35e432d7
SM
1311 stream_iter->current_msg->libObjPtr(),
1312 youngest_candidate_stream_iter->current_msg->libObjPtr());
4164020e
SM
1313 if (ret < 0) {
1314 /*
1315 * The `youngest_candidate_stream_iter->current_msg`
1316 * should go first. Update the next iterator
1317 * and the current timestamp.
1318 */
1319 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1320 youngest_candidate_stream_iter = stream_iter;
1321 } else if (ret == 0) {
1322 /* Unable to pick which one should go first. */
0f5c5d5c
SM
1323 BT_CPPLOGW_SPEC(
1324 lttng_live_msg_iter->logger,
4164020e 1325 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1326 "stream-iter-addr={}"
1327 "youngest-candidate-stream-iter-addr={}",
1328 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1329 }
1330 }
1331 trace_idx++;
1332 } else {
1333 /*
1334 * trace_idx is not incremented since
6dcda123 1335 * vectorFastRemove replaces the
4164020e
SM
1336 * element at trace_idx with the array's last element.
1337 */
6dcda123 1338 bt2c::vectorFastRemove(session->traces, trace_idx);
4164020e
SM
1339 }
1340 }
1341 if (youngest_candidate_stream_iter) {
1342 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1343 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1344 } else {
1345 /*
1346 * The only cases where we don't have a candidate for this
1347 * trace is:
1348 * 1. if we reached the end of all the iterators of all the
1349 * traces of this session,
1350 * 2. if we never had live stream iterator in the first place.
1351 *
1352 * In either cases, we return END.
1353 */
6dcda123 1354 BT_ASSERT(session->traces.empty());
4164020e
SM
1355 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1356 }
7cdc2bab 1357end:
4164020e 1358 return stream_iter_status;
14f28187
FD
1359}
1360
4164020e 1361static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1362{
4164020e 1363 uint64_t i;
14f28187 1364
4164020e
SM
1365 for (i = 0; i < count; i++) {
1366 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1367 }
7cdc2bab
MD
1368}
1369
4164020e
SM
1370bt_message_iterator_class_next_method_status
1371lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1372 uint64_t capacity, uint64_t *count)
d3eb6e8f 1373{
1e690349
SM
1374 try {
1375 bt_message_iterator_class_next_method_status status;
1376 enum lttng_live_viewer_status viewer_status;
1377 struct lttng_live_msg_iter *lttng_live_msg_iter =
1378 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1379 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1380 enum lttng_live_iterator_status stream_iter_status;
1381 uint64_t session_idx;
4164020e 1382
1e690349 1383 *count = 0;
4164020e 1384
1e690349 1385 BT_ASSERT_DBG(lttng_live_msg_iter);
4164020e 1386
1e690349
SM
1387 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1388 /*
f8a11f24
SM
1389 * The iterator was interrupted in a previous call to the
1390 * `_next()` method. We currently do not support generating
1391 * messages after such event. The babeltrace2 CLI should never
1392 * be running the graph after being interrupted. So this check
1393 * is to prevent other graph users from using this live
1394 * iterator in an messed up internal state.
1395 */
1e690349
SM
1396 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1397 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1398 lttng_live_msg_iter->logger,
1399 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1400 goto end;
1401 }
4164020e 1402
1e690349 1403 /*
f8a11f24
SM
1404 * Clear all the invalid message reference that might be left over in
1405 * the output array.
1406 */
1e690349 1407 memset(msgs, 0, capacity * sizeof(*msgs));
4164020e 1408
1e690349 1409 /*
f8a11f24
SM
1410 * If no session are exposed on the relay found at the url provided by
1411 * the user, session count will be 0. In this case, we return status
1412 * end to return gracefully.
1413 */
1e690349
SM
1414 if (lttng_live_msg_iter->sessions->len == 0) {
1415 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1416 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1417 goto end;
1418 } else {
1419 /*
f8a11f24
SM
1420 * The are no more active session for this session
1421 * name. Retry to create a viewer session for the
1422 * requested session name.
1423 */
1e690349
SM
1424 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1425 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1426 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1427 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1428 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1429 "Error creating LTTng live viewer session");
1430 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1431 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1432 } else {
1433 bt_common_abort();
1434 }
1435 goto end;
4164020e 1436 }
4164020e
SM
1437 }
1438 }
4164020e 1439
1e690349
SM
1440 if (lttng_live_msg_iter->active_stream_iter == 0) {
1441 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1442 }
4164020e 1443
1e690349 1444 /*
f8a11f24
SM
1445 * Here the muxing of message is done.
1446 *
1447 * We need to iterate over all the streams of all the traces of all the
1448 * viewer sessions in order to get the message with the smallest
1449 * timestamp. In this case, a session is a viewer session and there is
1450 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1451 * kernel). Each viewer session can have multiple traces, for example,
1452 * 64bit UST viewer sessions could have multiple per-pid traces.
1453 *
1454 * We iterate over the streams of each traces to update and see what is
1455 * their next message's timestamp. From those timestamps, we select the
1456 * message with the smallest timestamp as the best candidate message
1457 * for that trace and do the same thing across all the sessions.
1458 *
1459 * We then compare the timestamp of best candidate message of all the
1460 * sessions to pick the message with the smallest timestamp and we
1461 * return it.
1462 */
1e690349
SM
1463 while (*count < capacity) {
1464 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1465 *candidate_stream_iter = NULL;
1466 int64_t youngest_msg_ts_ns = INT64_MAX;
1467
1468 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1469 session_idx = 0;
1470 while (session_idx < lttng_live_msg_iter->sessions->len) {
1471 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1472 lttng_live_msg_iter->sessions, session_idx);
1473
1474 /* Find the best candidate message to send downstream. */
1475 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1476 &candidate_stream_iter);
1477
1478 /* If we receive an END status, it means that either:
f8a11f24
SM
1479 * - Those traces never had active streams (UST with no
1480 * data produced yet),
1481 * - All live stream iterators have ENDed.
1482 */
1e690349 1483 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
6dcda123 1484 if (session->closed && session->traces.empty()) {
1e690349 1485 /*
f8a11f24
SM
1486 * Remove the session from the list.
1487 * session_idx is not modified since
1488 * g_ptr_array_remove_index_fast
1489 * replaces the the removed element with
1490 * the array's last element.
1491 */
1e690349
SM
1492 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1493 } else {
1494 session_idx++;
1495 }
1496 continue;
4164020e 1497 }
4164020e 1498
1e690349
SM
1499 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1500 goto return_status;
1501 }
4164020e 1502
1e690349
SM
1503 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1504 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1505 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1506 youngest_stream_iter = candidate_stream_iter;
1507 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1508 /*
f8a11f24
SM
1509 * The currently selected message to be sent
1510 * downstream next has the exact same timestamp
1511 * that of the current candidate message. We
1512 * must break the tie in a predictable manner.
1513 */
1e690349
SM
1514 BT_CPPLOGD_STR_SPEC(
1515 lttng_live_msg_iter->logger,
1516 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1517 /*
f8a11f24
SM
1518 * Order the messages in an arbitrary but
1519 * deterministic way.
1520 */
35e432d7
SM
1521 int ret = common_muxing_compare_messages(
1522 candidate_stream_iter->current_msg->libObjPtr(),
1523 youngest_stream_iter->current_msg->libObjPtr());
1e690349
SM
1524 if (ret < 0) {
1525 /*
f8a11f24
SM
1526 * The `candidate_stream_iter->current_msg`
1527 * should go first. Update the next
1528 * iterator and the current timestamp.
1529 */
1e690349
SM
1530 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1531 youngest_stream_iter = candidate_stream_iter;
1532 } else if (ret == 0) {
1533 /* Unable to pick which one should go first. */
1534 BT_CPPLOGW_SPEC(
1535 lttng_live_msg_iter->logger,
1536 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1537 "next-stream-iter-addr={}"
1538 "candidate-stream-iter-addr={}",
1539 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1540 }
4164020e 1541 }
4164020e 1542
1e690349
SM
1543 session_idx++;
1544 }
4164020e 1545
1e690349
SM
1546 if (!youngest_stream_iter) {
1547 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1548 goto return_status;
1549 }
4164020e 1550
1e690349
SM
1551 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1552 /* Ensure monotonicity. */
1553 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1554 youngest_stream_iter->current_msg_ts_ns);
4164020e 1555
1e690349 1556 /*
f8a11f24
SM
1557 * Insert the next message to the message batch. This will set
1558 * stream iterator current message to NULL so that next time
1559 * we fetch the next message of that stream iterator
1560 */
35e432d7 1561 msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
1e690349 1562 (*count)++;
4164020e 1563
1e690349
SM
1564 /* Update the last timestamp in nanoseconds sent downstream. */
1565 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1566 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
4164020e 1567
1e690349
SM
1568 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1569 }
f79c2d7a
FD
1570
1571return_status:
1e690349
SM
1572 switch (stream_iter_status) {
1573 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1574 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1575 /*
f8a11f24
SM
1576 * If we gathered messages, return _OK even if the graph was
1577 * interrupted. This allows for the components downstream to at
1578 * least get the those messages. If the graph was indeed
1579 * interrupted there should not be another _next() call as the
1580 * application will tear down the graph. This component class
1581 * doesn't support restarting after an interruption.
1582 */
1e690349
SM
1583 if (*count > 0) {
1584 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1585 } else {
1586 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1587 }
1588 break;
1589 case LTTNG_LIVE_ITERATOR_STATUS_END:
1590 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1591 break;
1592 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1593 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1594 "Memory error preparing the next batch of messages: "
1595 "live-iter-status={}",
1596 stream_iter_status);
1597 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1598 break;
1599 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1600 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1601 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1602 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1603 "Error preparing the next batch of messages: "
1604 "live-iter-status={}",
1605 stream_iter_status);
4164020e 1606
1e690349
SM
1607 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1608 /* Put all existing messages on error. */
1609 put_messages(msgs, *count);
1610 break;
1611 default:
1612 bt_common_abort();
1613 }
14f28187 1614
f79c2d7a 1615end:
1e690349
SM
1616 return status;
1617 } catch (const std::bad_alloc&) {
1618 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1619 } catch (const bt2::Error&) {
1620 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1621 }
7cdc2bab 1622}
41a2b7ae 1623
4164020e
SM
1624static struct lttng_live_msg_iter *
1625lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1626 bt_self_message_iterator *self_msg_it)
4f74db52 1627{
0f5c5d5c
SM
1628 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1629 msg_iter->self_comp = lttng_live_comp->self_comp;
1630 msg_iter->lttng_live_comp = lttng_live_comp;
1631 msg_iter->self_msg_iter = self_msg_it;
4164020e 1632
0f5c5d5c
SM
1633 msg_iter->active_stream_iter = 0;
1634 msg_iter->last_msg_ts_ns = INT64_MIN;
1635 msg_iter->was_interrupted = false;
4f74db52 1636
0f5c5d5c 1637 msg_iter->sessions =
4164020e 1638 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
0f5c5d5c 1639 BT_ASSERT(msg_iter->sessions);
4164020e 1640
0f5c5d5c 1641 return msg_iter;
4f74db52
FD
1642}
1643
4164020e
SM
1644bt_message_iterator_class_initialize_method_status
1645lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
7d91f1ac 1646 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
7cdc2bab 1647{
1e690349
SM
1648 try {
1649 bt_message_iterator_class_initialize_method_status status;
1650 struct lttng_live_component *lttng_live;
1651 struct lttng_live_msg_iter *lttng_live_msg_iter;
1652 enum lttng_live_viewer_status viewer_status;
1653 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1654
1655 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1656
1657 /* There can be only one downstream iterator at the same time. */
1658 BT_ASSERT(!lttng_live->has_msg_iter);
1659 lttng_live->has_msg_iter = true;
1660
1661 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1662 if (!lttng_live_msg_iter) {
1663 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1664 "Failed to create lttng_live_msg_iter");
1665 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1666 goto error;
1667 }
4164020e 1668
1e690349 1669 viewer_status = live_viewer_connection_create(
2780ec75 1670 lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1e690349
SM
1671 &lttng_live_msg_iter->viewer_connection);
1672 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1673 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1674 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1675 "Failed to create viewer connection");
1676 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1677 /*
f8a11f24
SM
1678 * Interruption in the _iter_init() method is not
1679 * supported. Return an error.
1680 */
1e690349
SM
1681 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1682 "Interrupted while creating viewer connection");
1683 }
10265ebe 1684
1e690349
SM
1685 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1686 goto error;
1687 }
4164020e 1688
1e690349
SM
1689 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1690 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1691 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1692 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1693 "Failed to create viewer session");
1694 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1695 /*
f8a11f24
SM
1696 * Interruption in the _iter_init() method is not
1697 * supported. Return an error.
1698 */
1e690349
SM
1699 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1700 "Interrupted when creating viewer session");
1701 }
4164020e 1702
10265ebe 1703 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e 1704 goto error;
4164020e 1705 }
4164020e 1706
1e690349
SM
1707 if (lttng_live_msg_iter->sessions->len == 0) {
1708 switch (lttng_live->params.sess_not_found_act) {
1709 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1710 BT_CPPLOGI_SPEC(
1711 lttng_live_msg_iter->logger,
1712 "Unable to connect to the requested live viewer session. "
1713 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1714 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2780ec75 1715 lttng_live->params.url);
1e690349
SM
1716 break;
1717 case SESSION_NOT_FOUND_ACTION_FAIL:
1718 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1719 lttng_live_msg_iter->logger,
1720 "Unable to connect to the requested live viewer session. "
1721 "Fail the message iterator initialization because of {}=\"{}\" "
1722 "component parameter: url =\"{}\"",
1723 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
2780ec75 1724 lttng_live->params.url);
1e690349
SM
1725 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1726 goto error;
1727 case SESSION_NOT_FOUND_ACTION_END:
1728 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1729 "Unable to connect to the requested live viewer session. "
1730 "End gracefully at the first _next() call because of {}=\"{}\""
1731 " component parameter: url=\"{}\"",
1732 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
2780ec75 1733 lttng_live->params.url);
1e690349
SM
1734 break;
1735 default:
1736 bt_common_abort();
1737 }
1738 }
1739
1740 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1741 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1742 goto end;
f79c2d7a 1743
14f28187 1744error:
1e690349 1745 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1746end:
1e690349
SM
1747 return status;
1748 } catch (const std::bad_alloc&) {
1749 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1750 } catch (const bt2::Error&) {
1751 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1752 }
7cdc2bab
MD
1753}
1754
80aff5ef 1755static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1756 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1757 bt_param_validation_value_descr::makeString()},
4164020e
SM
1758 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1759
1760static bt_component_class_query_method_status
1761lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
0f5c5d5c 1762 const bt2c::Logger& logger)
7cdc2bab 1763{
4164020e
SM
1764 bt_component_class_query_method_status status;
1765 const bt_value *url_value = NULL;
1766 const char *url;
1767 struct live_viewer_connection *viewer_connection = NULL;
1768 enum lttng_live_viewer_status viewer_status;
1769 enum bt_param_validation_status validation_status;
1770 gchar *validate_error = NULL;
1771
1772 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1773 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1774 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1775 goto error;
1776 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1777 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
0f5c5d5c 1778 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
4164020e
SM
1779 goto error;
1780 }
1781
1782 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1783 url = bt_value_string_get(url_value);
1784
0f5c5d5c 1785 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
4164020e
SM
1786 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1787 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c 1788 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
4164020e
SM
1789 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1790 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1791 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1792 } else {
1793 bt_common_abort();
1794 }
1795 goto error;
1796 }
1797
1798 status = live_viewer_connection_list_sessions(viewer_connection, result);
1799 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
0f5c5d5c 1800 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
4164020e
SM
1801 goto error;
1802 }
1803
1804 goto end;
c7eee084 1805
7cdc2bab 1806error:
4164020e 1807 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1808
4164020e
SM
1809 if (status >= 0) {
1810 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1811 }
c7eee084 1812
7cdc2bab 1813end:
4164020e
SM
1814 if (viewer_connection) {
1815 live_viewer_connection_destroy(viewer_connection);
1816 }
80aff5ef 1817
4164020e 1818 g_free(validate_error);
80aff5ef 1819
4164020e 1820 return status;
7cdc2bab
MD
1821}
1822
4164020e
SM
1823static bt_component_class_query_method_status
1824lttng_live_query_support_info(const bt_value *params, const bt_value **result,
0f5c5d5c 1825 const bt2c::Logger& logger)
312df793 1826{
4164020e
SM
1827 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1828 const bt_value *input_type_value;
1829 const bt_value *input_value;
1830 double weight = 0;
dd420a9b 1831 struct bt_common_lttng_live_url_parts parts = {};
4164020e
SM
1832
1833 /* Used by the logging macros */
1834 __attribute__((unused)) bt_self_component *self_comp = NULL;
1835
1836 *result = NULL;
1837 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1838 if (!input_type_value) {
0f5c5d5c 1839 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
4164020e
SM
1840 goto error;
1841 }
1842
1843 if (!bt_value_is_string(input_type_value)) {
0f5c5d5c 1844 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
4164020e
SM
1845 goto error;
1846 }
1847
1848 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1849 /* We don't handle file system paths */
1850 goto create_result;
1851 }
1852
1853 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1854 if (!input_value) {
0f5c5d5c 1855 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
4164020e
SM
1856 goto error;
1857 }
1858
1859 if (!bt_value_is_string(input_value)) {
0f5c5d5c 1860 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
4164020e
SM
1861 goto error;
1862 }
1863
1864 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1865 if (parts.session_name) {
1866 /*
1867 * Looks pretty much like an LTTng live URL: we got the
1868 * session name part, which forms a complete URL.
1869 */
1870 weight = .75;
1871 }
312df793
PP
1872
1873create_result:
4164020e
SM
1874 *result = bt_value_real_create_init(weight);
1875 if (!*result) {
1876 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1877 goto error;
1878 }
312df793 1879
4164020e 1880 goto end;
312df793
PP
1881
1882error:
4164020e
SM
1883 if (status >= 0) {
1884 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1885 }
312df793 1886
4164020e 1887 BT_ASSERT(!*result);
312df793
PP
1888
1889end:
4164020e
SM
1890 bt_common_destroy_lttng_live_url_parts(&parts);
1891 return status;
312df793
PP
1892}
1893
4164020e
SM
1894bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1895 bt_private_query_executor *priv_query_exec,
1896 const char *object, const bt_value *params,
1897 __attribute__((unused)) void *method_data,
1898 const bt_value **result)
7cdc2bab 1899{
1e690349
SM
1900 try {
1901 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1902 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1903 bt2::PrivateQueryExecutor {priv_query_exec},
1904 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1905
1906 if (strcmp(object, "sessions") == 0) {
1907 status = lttng_live_query_list_sessions(params, result, logger);
1908 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1909 status = lttng_live_query_support_info(params, result, logger);
1910 } else {
1911 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1912 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1913 goto end;
1914 }
14f28187
FD
1915
1916end:
1e690349
SM
1917 return status;
1918 } catch (const std::bad_alloc&) {
1919 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1920 } catch (const bt2::Error&) {
1921 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1922 }
7cdc2bab
MD
1923}
1924
14f28187 1925void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1926{
ef9e5f5d
SM
1927 lttng_live_component::UP {static_cast<lttng_live_component *>(
1928 bt_self_component_get_data(bt_self_component_source_as_self_component(component)))};
7cdc2bab
MD
1929}
1930
4164020e
SM
1931static enum session_not_found_action
1932parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 1933{
4164020e
SM
1934 enum session_not_found_action action;
1935 const char *no_session_act_str = bt_value_string_get(no_session_param);
1936
1937 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1938 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1939 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1940 action = SESSION_NOT_FOUND_ACTION_FAIL;
1941 } else {
1942 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1943 action = SESSION_NOT_FOUND_ACTION_END;
1944 }
1945
1946 return action;
14f28187
FD
1947}
1948
88730e42
SM
1949static bt_param_validation_value_descr inputs_elem_descr =
1950 bt_param_validation_value_descr::makeString();
80aff5ef
SM
1951
1952static const char *sess_not_found_action_choices[] = {
4164020e
SM
1953 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1954 SESS_NOT_FOUND_ACTION_FAIL_STR,
1955 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
1956};
1957
1958static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
1959 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1960 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1961 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1962 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
1963 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1964
1965static bt_component_class_initialize_method_status
0f5c5d5c 1966lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
ef9e5f5d 1967 lttng_live_component::UP& component)
7cdc2bab 1968{
4164020e
SM
1969 const bt_value *inputs_value;
1970 const bt_value *url_value;
1971 const bt_value *value;
4164020e
SM
1972 enum bt_param_validation_status validation_status;
1973 gchar *validation_error = NULL;
0f5c5d5c 1974 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
4164020e
SM
1975
1976 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
1977 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
ef9e5f5d 1978 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e 1979 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
6838d8aa 1980 bt2c::GCharUP errorFreer {validation_error};
0f5c5d5c 1981 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
ef9e5f5d 1982 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1983 }
1984
ef9e5f5d 1985 auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
0f5c5d5c 1986 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
4164020e
SM
1987 lttng_live->max_query_size = MAX_QUERY_SIZE;
1988 lttng_live->has_msg_iter = false;
1989
1990 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
1991 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2780ec75 1992 lttng_live->params.url = bt_value_string_get(url_value);
4164020e
SM
1993
1994 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
1995 if (value) {
1996 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
1997 } else {
0f5c5d5c
SM
1998 BT_CPPLOGI_SPEC(lttng_live->logger,
1999 "Optional `{}` parameter is missing: defaulting to `{}`.",
2000 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
4164020e
SM
2001 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2002 }
2003
ef9e5f5d
SM
2004 component = std::move(lttng_live);
2005 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
d3e4dcd8
PP
2006}
2007
4164020e
SM
2008bt_component_class_initialize_method_status
2009lttng_live_component_init(bt_self_component_source *self_comp_src,
7d91f1ac 2010 bt_self_component_source_configuration *, const bt_value *params, void *)
f3bc2010 2011{
1e690349 2012 try {
ef9e5f5d 2013 lttng_live_component::UP lttng_live;
1e690349
SM
2014 bt_component_class_initialize_method_status ret;
2015 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2016 bt_self_component_add_port_status add_port_status;
2017
ef9e5f5d 2018 ret = lttng_live_component_create(params, self_comp_src, lttng_live);
1e690349 2019 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
ef9e5f5d 2020 return ret;
1e690349 2021 }
4164020e 2022
1e690349
SM
2023 add_port_status =
2024 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2025 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2026 ret = (bt_component_class_initialize_method_status) add_port_status;
ef9e5f5d 2027 return ret;
1e690349 2028 }
4164020e 2029
ef9e5f5d 2030 bt_self_component_set_data(self_comp, lttng_live.release());
7cdc2bab 2031
ef9e5f5d 2032 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
1e690349
SM
2033 } catch (const std::bad_alloc&) {
2034 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2035 } catch (const bt2::Error&) {
2036 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2037 }
d85ef162 2038}
This page took 0.246998 seconds and 4 git commands to generate.