src.ctf.lttng-live: change lttng_live_component::params::url to std::string
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
c802cacb 11#include <glib.h>
14f28187
FD
12#include <unistd.h>
13
578e048b 14#include "common/assert.h"
0f5c5d5c
SM
15#include "cpp-common/bt2c/fmt.hpp"
16#include "cpp-common/vendor/fmt/format.h"
f3bc2010 17
376fc2bd 18#include "plugins/common/muxing/muxing.h"
80aff5ef 19#include "plugins/common/param-validation/param-validation.h"
376fc2bd 20
087cd0f5 21#include "data-stream.hpp"
087cd0f5 22#include "lttng-live.hpp"
c802cacb 23#include "metadata.hpp"
7cdc2bab 24
4164020e
SM
25#define MAX_QUERY_SIZE (256 * 1024)
26#define URL_PARAM "url"
27#define INPUTS_PARAM "inputs"
28#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
29#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
30#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
31#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 32
34533ae0 33void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 34 enum lttng_live_stream_state new_state)
34533ae0 35{
0f5c5d5c
SM
36 BT_CPPLOGD_SPEC(stream_iter->logger,
37 "Setting live stream iterator state: viewer-stream-id={}, "
38 "old-state={}, new-state={}",
39 stream_iter->viewer_stream_id, stream_iter->state, new_state);
34533ae0 40
4164020e 41 stream_iter->state = new_state;
34533ae0
FD
42}
43
4164020e
SM
44#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
45 do { \
0f5c5d5c
SM
46 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
47 "Live stream iterator state={}, " \
48 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
49 "curr-inact-ts={}", \
50 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
51 (live_stream_iter)->last_inactivity_ts.value, \
52 (live_stream_iter)->current_inactivity_ts); \
4164020e 53 } while (0);
6f79a7cf 54
9b4f9b42 55bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 56{
4164020e 57 bool ret;
6f79a7cf 58
4164020e
SM
59 if (!msg_iter) {
60 ret = false;
61 goto end;
62 }
7cdc2bab 63
4164020e 64 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 65
14f28187 66end:
4164020e 67 return ret;
7cdc2bab
MD
68}
69
4164020e
SM
70static struct lttng_live_trace *
71lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 72{
4164020e
SM
73 uint64_t trace_idx;
74 struct lttng_live_trace *ret_trace = NULL;
75
76 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
77 struct lttng_live_trace *trace =
78 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
79 if (trace->id == trace_id) {
80 ret_trace = trace;
81 goto end;
82 }
83 }
14f28187
FD
84
85end:
4164020e 86 return ret_trace;
d3eb6e8f
PP
87}
88
4164020e 89static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 90{
0f5c5d5c 91 BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
7cdc2bab 92
4164020e
SM
93 BT_ASSERT(trace->stream_iterators);
94 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 95
4164020e 96 lttng_live_metadata_fini(trace);
afb0f12b 97 delete trace;
7cdc2bab
MD
98}
99
4164020e
SM
100static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
101 uint64_t trace_id)
7cdc2bab 102{
0f5c5d5c
SM
103 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
104 trace_id);
4164020e 105
0f5c5d5c 106 lttng_live_trace *trace = new lttng_live_trace {session->logger};
4164020e
SM
107 trace->session = session;
108 trace->id = trace_id;
4164020e
SM
109 trace->stream_iterators =
110 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
111 BT_ASSERT(trace->stream_iterators);
112 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
113 g_ptr_array_add(session->traces, trace);
114
4164020e 115 return trace;
7cdc2bab
MD
116}
117
4164020e
SM
118struct lttng_live_trace *
119lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
120 uint64_t trace_id)
7cdc2bab 121{
4164020e 122 struct lttng_live_trace *trace;
7cdc2bab 123
4164020e
SM
124 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
125 if (trace) {
126 goto end;
127 }
7cdc2bab 128
4164020e
SM
129 /* The session is the owner of the newly created trace. */
130 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 131
14f28187 132end:
4164020e 133 return trace;
7cdc2bab
MD
134}
135
4164020e
SM
136int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
137 const char *hostname, const char *session_name)
7cdc2bab 138{
0f5c5d5c
SM
139 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
140 "Adding live session: "
141 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
142 session_id, hostname, session_name);
4164020e 143
0f5c5d5c 144 lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
4164020e
SM
145 session->self_comp = lttng_live_msg_iter->self_comp;
146 session->id = session_id;
147 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
148 BT_ASSERT(session->traces);
149 session->lttng_live_msg_iter = lttng_live_msg_iter;
150 session->new_streams_needed = true;
151 session->hostname = g_string_new(hostname);
152 BT_ASSERT(session->hostname);
153
154 session->session_name = g_string_new(session_name);
155 BT_ASSERT(session->session_name);
156
157 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
afb0f12b
SM
158
159 return 0;
7cdc2bab
MD
160}
161
4164020e 162static void lttng_live_destroy_session(struct lttng_live_session *session)
7cdc2bab 163{
4164020e
SM
164 if (!session) {
165 goto end;
166 }
167
0f5c5d5c
SM
168 BT_CPPLOGD_SPEC(session->logger,
169 "Destroying live session: "
170 "session-id={}, session-name=\"{}\"",
171 session->id, session->session_name->str);
4164020e
SM
172 if (session->id != -1ULL) {
173 if (lttng_live_session_detach(session)) {
174 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
175 /* Old relayd cannot detach sessions. */
0f5c5d5c
SM
176 BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
177 session->id);
4164020e
SM
178 }
179 }
180 session->id = -1ULL;
181 }
182
183 if (session->traces) {
184 g_ptr_array_free(session->traces, TRUE);
185 }
186
187 if (session->hostname) {
188 g_string_free(session->hostname, TRUE);
189 }
afb0f12b 190
4164020e
SM
191 if (session->session_name) {
192 g_string_free(session->session_name, TRUE);
193 }
afb0f12b
SM
194
195 delete session;
14f28187
FD
196
197end:
4164020e 198 return;
7cdc2bab
MD
199}
200
4164020e 201static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 202{
4164020e
SM
203 if (!lttng_live_msg_iter) {
204 goto end;
205 }
7cdc2bab 206
4164020e
SM
207 if (lttng_live_msg_iter->sessions) {
208 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
209 }
14f28187 210
4164020e
SM
211 if (lttng_live_msg_iter->viewer_connection) {
212 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
213 }
214 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
215 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 216
4164020e
SM
217 /* All stream iterators must be destroyed at this point. */
218 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
219 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 220
afb0f12b 221 delete lttng_live_msg_iter;
14f28187
FD
222
223end:
4164020e 224 return;
14f28187
FD
225}
226
14f28187
FD
227void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
228{
4164020e 229 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 230
4164020e 231 BT_ASSERT(self_msg_iter);
14f28187 232
4164020e
SM
233 lttng_live_msg_iter =
234 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
235 BT_ASSERT(lttng_live_msg_iter);
236 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
237}
238
4164020e
SM
239static enum lttng_live_iterator_status
240lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 241{
4164020e
SM
242 switch (lttng_live_stream->state) {
243 case LTTNG_LIVE_STREAM_QUIESCENT:
244 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
245 break;
246 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
247 /* Invalid state. */
0f5c5d5c 248 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
4164020e
SM
249 bt_common_abort();
250 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
251 /* Invalid state. */
0f5c5d5c 252 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
4164020e
SM
253 bt_common_abort();
254 case LTTNG_LIVE_STREAM_EOF:
255 break;
256 }
257 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
258}
259
260/*
f93afbf9
FD
261 * For active no data stream, fetch next index. As a result of that it can
262 * become either:
263 * - quiescent: won't have events for a bit,
264 * - have data: need to get that data and produce the event,
265 * - have no data on this stream at this point: need to retry (AGAIN) or return
266 * EOF.
7cdc2bab 267 */
4164020e
SM
268static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
269 struct lttng_live_msg_iter *lttng_live_msg_iter,
270 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 271{
4164020e
SM
272 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
273 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
274 struct packet_index index;
275
276 if (lttng_live_stream->trace->metadata_stream_state ==
277 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
0f5c5d5c
SM
278 BT_CPPLOGD_SPEC(
279 lttng_live_msg_iter->logger,
4164020e 280 "Need to get an update for the metadata stream before proceeding further with this stream: "
0f5c5d5c 281 "stream-name=\"{}\"",
4164020e
SM
282 lttng_live_stream->name->str);
283 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
284 goto end;
285 }
286
287 if (lttng_live_stream->trace->session->new_streams_needed) {
0f5c5d5c
SM
288 BT_CPPLOGD_SPEC(
289 lttng_live_msg_iter->logger,
4164020e 290 "Need to get an update of all streams before proceeding further with this stream: "
0f5c5d5c 291 "stream-name=\"{}\"",
4164020e
SM
292 lttng_live_stream->name->str);
293 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
294 goto end;
295 }
296
297 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
298 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
299 goto end;
300 }
301 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
302 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
303 goto end;
304 }
305
306 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
307
308 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
309 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
310 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
311
312 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
313 /*
5cb27175
JG
314 * Because the stream is in the QUIESCENT_NO_DATA
315 * state, we can assert that the last_inactivity_ts was
316 * set and can be safely used in the `if` above.
317 */
4164020e
SM
318 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
319
320 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
321 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
322 } else {
323 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
324 }
325 goto end;
326 }
327
328 lttng_live_stream->base_offset = index.offset;
329 lttng_live_stream->offset = index.offset;
330 lttng_live_stream->len = index.packet_size / CHAR_BIT;
331
0f5c5d5c
SM
332 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
333 "Setting live stream reading info: stream-name=\"{}\", "
334 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
335 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
336 lttng_live_stream->base_offset, lttng_live_stream->offset,
337 lttng_live_stream->len);
f93afbf9 338
7cdc2bab 339end:
4164020e
SM
340 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
341 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
342 }
343 return ret;
7cdc2bab
MD
344}
345
346/*
14f28187
FD
347 * Creation of the message requires the ctf trace class to be created
348 * beforehand, but the live protocol gives us all streams (including metadata)
349 * at once. So we split it in three steps: getting streams, getting metadata
350 * (which creates the ctf trace class), and then creating the per-stream
351 * messages.
7cdc2bab 352 */
4164020e
SM
353static enum lttng_live_iterator_status
354lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
355 struct lttng_live_session *session)
7cdc2bab 356{
4164020e
SM
357 enum lttng_live_iterator_status status;
358 uint64_t trace_idx;
359
360 if (!session->attached) {
0f5c5d5c
SM
361 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
362 session->id);
4164020e
SM
363 enum lttng_live_viewer_status attach_status =
364 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
365 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
366 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
367 /*
368 * Clear any causes appended in
369 * `lttng_live_attach_session()` as we want to
370 * return gracefully since the graph was
371 * cancelled.
372 */
373 bt_current_thread_clear_error();
374 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
375 } else {
376 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
377 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
378 "Error attaching to LTTng live session");
4164020e
SM
379 }
380 goto end;
381 }
382 }
383
0f5c5d5c
SM
384 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
385 "Updating all data streams: "
386 "session-id={}, session-name=\"{}\"",
387 session->id, session->session_name->str);
4164020e
SM
388
389 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
390 switch (status) {
391 case LTTNG_LIVE_ITERATOR_STATUS_OK:
392 break;
393 case LTTNG_LIVE_ITERATOR_STATUS_END:
394 /*
395 * We received a `_END` from the `_get_new_streams()` function,
396 * which means no more data will ever be received from the data
397 * streams of this session. But it's possible that the metadata
398 * is incomplete.
399 * The live protocol guarantees that we receive all the
400 * metadata needed before we receive data streams needing it.
401 * But it's possible to receive metadata NOT needed by
402 * data streams after the session was closed. For example, this
403 * could happen if a new event is registered and the session is
404 * stopped before any tracepoint for that event is actually
405 * fired.
406 */
0f5c5d5c
SM
407 BT_CPPLOGD_SPEC(
408 lttng_live_msg_iter->logger,
a4f118a3 409 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
0f5c5d5c 410 "session-id={}, session-name=\"{}\"",
a4f118a3
FD
411 session->id, session->session_name->str);
412 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
413 break;
414 default:
4164020e
SM
415 goto end;
416 }
417
0f5c5d5c
SM
418 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
419 "Updating metadata stream for session: "
420 "session-id={}, session-name=\"{}\"",
421 session->id, session->session_name->str);
a4f118a3 422
4164020e
SM
423 trace_idx = 0;
424 while (trace_idx < session->traces->len) {
425 struct lttng_live_trace *trace =
426 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
427
428 status = lttng_live_metadata_update(trace);
429 switch (status) {
430 case LTTNG_LIVE_ITERATOR_STATUS_END:
431 case LTTNG_LIVE_ITERATOR_STATUS_OK:
432 trace_idx++;
433 break;
434 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
435 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
436 goto end;
437 default:
0f5c5d5c
SM
438 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
439 "Error updating trace metadata: "
440 "stream-iter-status={}, trace-id={}",
441 status, trace->id);
4164020e
SM
442 goto end;
443 }
444 }
445
446 /*
447 * Now that we have the metadata we can initialize the downstream
448 * iterator.
449 */
450 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
451
452end:
4164020e 453 return status;
7cdc2bab
MD
454}
455
4164020e
SM
456static void
457lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 458{
4164020e 459 uint64_t session_idx, trace_idx;
4164020e
SM
460
461 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
462 struct lttng_live_session *session =
463 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
0f5c5d5c
SM
464 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
465 "Force marking session as needing new streams: "
466 "session-id={}",
467 session->id);
4164020e
SM
468 session->new_streams_needed = true;
469 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
470 struct lttng_live_trace *trace =
471 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
0f5c5d5c
SM
472 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
473 "Force marking trace metadata state as needing an update: "
474 "session-id={}, trace-id={}",
475 session->id, trace->id);
4164020e
SM
476
477 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
478
479 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
480 }
481 }
7cdc2bab
MD
482}
483
4164020e
SM
484static enum lttng_live_iterator_status
485lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 486{
4164020e
SM
487 enum lttng_live_iterator_status status;
488 enum lttng_live_viewer_status viewer_status;
4164020e
SM
489 uint64_t session_idx = 0, nr_sessions_opened = 0;
490 struct lttng_live_session *session;
491 enum session_not_found_action sess_not_found_act =
492 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
493
0f5c5d5c
SM
494 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
495 "Update data and metadata of all sessions: "
496 "live-msg-iter-addr={}",
497 fmt::ptr(lttng_live_msg_iter));
4164020e
SM
498 /*
499 * In a remotely distant future, we could add a "new
500 * session" flag to the protocol, which would tell us that we
501 * need to query for new sessions even though we have sessions
502 * currently ongoing.
503 */
504 if (lttng_live_msg_iter->sessions->len == 0) {
505 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
0f5c5d5c
SM
506 BT_CPPLOGD_SPEC(
507 lttng_live_msg_iter->logger,
4164020e
SM
508 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
509 status = LTTNG_LIVE_ITERATOR_STATUS_END;
510 goto end;
511 } else {
0f5c5d5c
SM
512 BT_CPPLOGD_SPEC(
513 lttng_live_msg_iter->logger,
4164020e
SM
514 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
515 /*
516 * Retry to create a viewer session for the requested
517 * session name.
518 */
519 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
520 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
521 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
522 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
523 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
524 "Error creating LTTng live viewer session");
4164020e
SM
525 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
526 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
527 } else {
528 bt_common_abort();
529 }
530 goto end;
531 }
532 }
533 }
534
535 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
536 session =
537 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
538 status = lttng_live_get_session(lttng_live_msg_iter, session);
539 switch (status) {
540 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 541 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
542 /*
543 * A session returned `_END`. Other sessions may still
544 * be active so we override the status and continue
545 * looping if needed.
546 */
4164020e
SM
547 break;
548 default:
549 goto end;
550 }
551 if (!session->closed) {
552 nr_sessions_opened++;
553 }
554 }
555
556 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
557 status = LTTNG_LIVE_ITERATOR_STATUS_END;
558 } else {
559 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
560 }
f79c2d7a
FD
561
562end:
4164020e 563 return status;
7cdc2bab
MD
564}
565
4164020e
SM
566static enum lttng_live_iterator_status
567emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
568 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
569 uint64_t timestamp)
7cdc2bab 570{
4164020e 571 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e
SM
572 bt_message *msg = NULL;
573
574 BT_ASSERT(stream_iter->trace->clock_class);
575
0f5c5d5c
SM
576 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
577 "Emitting inactivity message for stream: ctf-stream-id={}, "
578 "viewer-stream-id={}, timestamp={}",
579 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
580 timestamp);
4164020e
SM
581
582 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
583 stream_iter->trace->clock_class, timestamp);
584 if (!msg) {
0f5c5d5c
SM
585 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
586 "Error emitting message iterator inactivity message");
4164020e
SM
587 goto error;
588 }
589
590 *message = msg;
7cdc2bab 591end:
4164020e 592 return ret;
7cdc2bab
MD
593
594error:
4164020e
SM
595 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
596 bt_message_put_ref(msg);
597 goto end;
7cdc2bab
MD
598}
599
4164020e
SM
600static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
601 struct lttng_live_msg_iter *lttng_live_msg_iter,
602 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 603{
4164020e
SM
604 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
605
606 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
607 return LTTNG_LIVE_ITERATOR_STATUS_OK;
608 }
609
610 /*
611 * Check if we already sent an inactivty message downstream for this
612 * `current_inactivity_ts` value.
613 */
614 if (lttng_live_stream->last_inactivity_ts.is_set &&
615 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
616 lttng_live_stream_iterator_set_state(lttng_live_stream,
617 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
618
619 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
620 goto end;
621 }
622
623 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
624 lttng_live_stream->current_inactivity_ts);
625
626 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
627 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 628end:
4164020e 629 return ret;
14f28187
FD
630}
631
7d91f1ac 632static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
4164020e 633 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 634{
4164020e
SM
635 const bt_clock_snapshot *clock_snapshot = NULL;
636 int ret = 0;
4164020e
SM
637
638 BT_ASSERT_DBG(msg);
639 BT_ASSERT_DBG(ts_ns);
640
0f5c5d5c
SM
641 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
642 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
643 "last-msg-ts={}",
644 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
4164020e
SM
645
646 switch (bt_message_get_type(msg)) {
647 case BT_MESSAGE_TYPE_EVENT:
4164020e
SM
648 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
649 break;
650 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
4164020e
SM
651 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
652 break;
653 case BT_MESSAGE_TYPE_PACKET_END:
4164020e
SM
654 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
655 break;
656 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
4164020e
SM
657 clock_snapshot =
658 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
659 break;
660 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
4164020e
SM
661 clock_snapshot =
662 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
663 break;
664 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
665 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
666 break;
667 default:
668 /* All the other messages have a higher priority */
0f5c5d5c
SM
669 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
670 "Message has no timestamp: using the last message timestamp.");
4164020e
SM
671 *ts_ns = last_msg_ts_ns;
672 goto end;
673 }
674
4164020e
SM
675 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
676 if (ret) {
0f5c5d5c
SM
677 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
678 "Cannot get nanoseconds from Epoch of clock snapshot: "
679 "clock-snapshot-addr={}",
680 fmt::ptr(clock_snapshot));
4164020e
SM
681 goto error;
682 }
683
684 goto end;
14f28187 685
14f28187 686error:
4164020e 687 ret = -1;
7cdc2bab 688
7cdc2bab 689end:
4164020e 690 if (ret == 0) {
0f5c5d5c
SM
691 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
692 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
693 "last-msg-ts={}, ts={}",
694 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
4164020e
SM
695 }
696
697 return ret;
7cdc2bab
MD
698}
699
4164020e
SM
700static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
701 struct lttng_live_msg_iter *lttng_live_msg_iter,
702 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 703{
4164020e 704 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e
SM
705 enum ctf_msg_iter_status status;
706 uint64_t session_idx, trace_idx;
707
708 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
709 struct lttng_live_session *session =
710 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
711
712 if (session->new_streams_needed) {
0f5c5d5c
SM
713 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
714 "Need an update for streams: "
715 "session-id={}",
716 session->id);
4164020e
SM
717 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
718 goto end;
719 }
720 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
721 struct lttng_live_trace *trace =
722 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
723 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
0f5c5d5c
SM
724 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
725 "Need an update for metadata stream: "
726 "session-id={}, trace-id={}",
727 session->id, trace->id);
4164020e
SM
728 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
729 goto end;
730 }
731 }
732 }
733
734 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
735 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
736 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
737 "Invalid state of live stream iterator"
738 "stream-iter-status={}",
739 lttng_live_stream->state);
4164020e
SM
740 goto end;
741 }
742
743 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
744 switch (status) {
745 case CTF_MSG_ITER_STATUS_EOF:
746 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
747 break;
748 case CTF_MSG_ITER_STATUS_OK:
749 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
750 break;
751 case CTF_MSG_ITER_STATUS_AGAIN:
752 /*
753 * Continue immediately (end of packet). The next
754 * get_index may return AGAIN to delay the following
755 * attempt.
756 */
757 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
758 break;
759 case CTF_MSG_ITER_STATUS_ERROR:
760 default:
761 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
762 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
763 "CTF message iterator failed to get next message: "
764 "msg-iter={}, msg-iter-status={}",
765 fmt::ptr(lttng_live_stream->msg_iter), status);
4164020e
SM
766 break;
767 }
14f28187
FD
768
769end:
4164020e 770 return ret;
7cdc2bab
MD
771}
772
4164020e
SM
773static enum lttng_live_iterator_status
774lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
775 struct lttng_live_stream_iterator *stream_iter,
776 const bt_message **curr_msg)
4a39caef 777{
4164020e 778 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 779
0f5c5d5c
SM
780 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
781 "Closing live stream iterator: stream-name=\"{}\", "
782 "viewer-stream-id={}",
783 stream_iter->name->str, stream_iter->viewer_stream_id);
4164020e
SM
784
785 /*
786 * The viewer has hung up on us so we are closing the stream. The
787 * `ctf_msg_iter` should simply realize that it needs to close the
788 * stream properly by emitting the necessary stream end message.
789 */
790 enum ctf_msg_iter_status status =
791 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
792
793 if (status == CTF_MSG_ITER_STATUS_ERROR) {
0f5c5d5c
SM
794 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
795 "Error getting the next message from CTF message iterator");
4164020e
SM
796 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
797 goto end;
798 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
0f5c5d5c
SM
799 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
800 "Reached the end of the live stream iterator.");
4164020e
SM
801 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
802 goto end;
803 }
804
805 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef
FD
806
807end:
4164020e 808 return live_status;
4a39caef
FD
809}
810
7cdc2bab
MD
811/*
812 * helper function:
813 * handle_no_data_streams()
814 * retry:
815 * - for each ACTIVE_NO_DATA stream:
816 * - query relayd for stream data, or quiescence info.
817 * - if need metadata, get metadata, goto retry.
818 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
819 * - if quiescent, move to QUIESCENT streams
820 * - if fetched data, move to ACTIVE_DATA streams
821 * (at this point each stream either has data, or is quiescent)
822 *
823 *
824 * iterator_next:
825 * handle_new_streams_and_metadata()
826 * - query relayd for known streams, add them as ACTIVE_NO_DATA
827 * - query relayd for metadata
828 *
829 * call handle_active_no_data_streams()
830 *
831 * handle_quiescent_streams()
832 * - if at least one stream is ACTIVE_DATA:
833 * - peek stream event with lowest timestamp -> next_ts
834 * - for each quiescent stream
835 * - if next_ts >= quiescent end
836 * - set state to ACTIVE_NO_DATA
837 * - else
838 * - for each quiescent stream
839 * - set state to ACTIVE_NO_DATA
840 *
841 * call handle_active_no_data_streams()
842 *
843 * handle_active_data_streams()
844 * - if at least one stream is ACTIVE_DATA:
845 * - get stream event with lowest timestamp from heap
d6e69534 846 * - make that stream event the current message.
7cdc2bab
MD
847 * - move this stream heap position to its next event
848 * - if we need to fetch data from relayd, move
849 * stream to ACTIVE_NO_DATA.
850 * - return OK
851 * - return AGAIN
852 *
853 * end criterion: ctrl-c on client. If relayd exits or the session
854 * closes on the relay daemon side, we keep on waiting for streams.
855 * Eventually handle --end timestamp (also an end criterion).
856 *
857 * When disconnected from relayd: try to re-connect endlessly.
858 */
4164020e
SM
859static enum lttng_live_iterator_status
860lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
861 struct lttng_live_stream_iterator *stream_iter,
862 const bt_message **curr_msg)
7cdc2bab 863{
4164020e
SM
864 enum lttng_live_iterator_status live_status;
865
0f5c5d5c
SM
866 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
867 "Advancing live stream iterator until next message if possible: "
868 "stream-name=\"{}\", viewer-stream-id={}",
869 stream_iter->name->str, stream_iter->viewer_stream_id);
4164020e
SM
870
871 if (stream_iter->has_stream_hung_up) {
872 /*
873 * The stream has hung up and the stream was properly closed
874 * during the last call to the current function. Return _END
875 * status now so that this stream iterator is removed for the
876 * stream iterator list.
877 */
878 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
879 goto end;
880 }
4a39caef 881
7cdc2bab 882retry:
4164020e
SM
883 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
884
885 /*
886 * Make sure we have the most recent metadata and possibly some new
887 * streams.
888 */
889 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
890 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
891 goto end;
892 }
893
894 live_status =
895 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
896 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
897 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
898 /*
899 * We overwrite `live_status` since `curr_msg` is
900 * likely set to a valid message in this function.
901 */
902 live_status =
903 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
904 }
905 goto end;
906 }
907
908 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
909 stream_iter, curr_msg);
910 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
911 BT_ASSERT(!*curr_msg);
912 goto end;
913 }
914 if (*curr_msg) {
915 goto end;
916 }
917 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
918 stream_iter, curr_msg);
919 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
920 BT_ASSERT(!*curr_msg);
921 }
7cdc2bab
MD
922
923end:
4164020e 924 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
0f5c5d5c
SM
925 BT_CPPLOGD_SPEC(
926 lttng_live_msg_iter->logger,
927 "Ask the relay daemon for an updated view of the data and metadata streams");
4164020e
SM
928 goto retry;
929 }
14f28187 930
0f5c5d5c
SM
931 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
932 "Returning from advancing live stream iterator: status={}, "
933 "stream-name=\"{}\", viewer-stream-id={}",
934 live_status, stream_iter->name->str, stream_iter->viewer_stream_id);
f93afbf9 935
4164020e 936 return live_status;
7cdc2bab
MD
937}
938
4164020e 939static bool is_discarded_packet_or_event_message(const bt_message *msg)
8ec4d5ff 940{
4164020e 941 const enum bt_message_type msg_type = bt_message_get_type(msg);
8ec4d5ff 942
4164020e
SM
943 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
944 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
8ec4d5ff
JG
945}
946
4164020e
SM
947static enum lttng_live_iterator_status
948adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
949 const bt_message *msg_in, bt_message **msg_out,
950 uint64_t new_begin_ts)
8ec4d5ff 951{
4164020e
SM
952 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
953 enum bt_property_availability availability;
954 const bt_clock_snapshot *clock_snapshot;
955 uint64_t end_ts;
956 uint64_t count;
957
958 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
959 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
960
961 availability = bt_message_discarded_packets_get_count(msg_in, &count);
962 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
963
964 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
965 iter, stream, new_begin_ts, end_ts);
966 if (!*msg_out) {
967 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
968 goto end;
969 }
970
971 bt_message_discarded_packets_set_count(*msg_out, count);
8ec4d5ff 972end:
4164020e 973 return status;
8ec4d5ff
JG
974}
975
4164020e
SM
976static enum lttng_live_iterator_status
977adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
978 const bt_message *msg_in, bt_message **msg_out,
979 uint64_t new_begin_ts)
8ec4d5ff 980{
4164020e
SM
981 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
982 enum bt_property_availability availability;
983 const bt_clock_snapshot *clock_snapshot;
984 uint64_t end_ts;
985 uint64_t count;
986
987 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
988 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
989
990 availability = bt_message_discarded_events_get_count(msg_in, &count);
991 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
992
993 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
994 iter, stream, new_begin_ts, end_ts);
995 if (!*msg_out) {
996 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
997 goto end;
998 }
999
1000 bt_message_discarded_events_set_count(*msg_out, count);
8ec4d5ff 1001end:
4164020e 1002 return status;
8ec4d5ff
JG
1003}
1004
4164020e
SM
1005static enum lttng_live_iterator_status
1006handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1007 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1008 const bt_message *late_msg)
285951be 1009{
4164020e
SM
1010 const bt_clock_class *clock_class;
1011 const bt_stream_class *stream_class;
1012 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1013 int64_t last_inactivity_ts_ns;
1014 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1015 enum lttng_live_iterator_status adjust_status;
1016 bt_message *adjusted_message;
1017
1018 /*
1019 * The timestamp of the current message is before the last message sent
1020 * by this component. We CANNOT send it as is.
1021 *
1022 * The only expected scenario in which that could happen is the
e7401568 1023 * following, everything else is a bug in this component, relay daemon,
4164020e
SM
1024 * or CTF parser.
1025 *
1026 * Expected scenario: The CTF message iterator emitted discarded
1027 * packets and discarded events with synthesized beginning and end
1028 * timestamps from the bounds of the last known packet and the newly
1029 * decoded packet header. The CTF message iterator is not aware of
1030 * stream inactivity beacons. Hence, we have to adjust the beginning
1031 * timestamp of those types of messages if a stream signalled its
1032 * inactivity up until _after_ the last known packet's beginning
1033 * timestamp.
1034 *
1035 * Otherwise, the monotonicity guarantee of message timestamps would
1036 * not be preserved.
1037 *
1038 * In short, the only scenario in which it's okay and fixable to
1039 * received a late message is when:
1040 * 1. the late message is a discarded packets or discarded events
1041 * message,
1042 * 2. this stream produced an inactivity message downstream, and
1043 * 3. the timestamp of the late message is within the inactivity
1044 * timespan we sent downstream through the inactivity message.
1045 */
1046
0f5c5d5c
SM
1047 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1048 "Handling late message on live stream iterator: "
1049 "stream-name=\"{}\", viewer-stream-id={}",
1050 stream_iter->name->str, stream_iter->viewer_stream_id);
4164020e
SM
1051
1052 if (!stream_iter->last_inactivity_ts.is_set) {
0f5c5d5c
SM
1053 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1054 "Invalid live stream state: "
1055 "have a late message when no inactivity message "
1056 "was ever sent for that stream.");
4164020e
SM
1057 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1058 goto end;
1059 }
1060
1061 if (!is_discarded_packet_or_event_message(late_msg)) {
0f5c5d5c
SM
1062 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1063 "Invalid live stream state: "
1064 "have a late message that is not a packet discarded or "
1065 "event discarded message: late-msg-type={}",
1066 static_cast<bt2::MessageType>(bt_message_get_type(late_msg)));
4164020e
SM
1067 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1068 goto end;
1069 }
1070
1071 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1072 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1073
1074 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1075 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1076 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
0f5c5d5c
SM
1077 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1078 "Error converting last "
1079 "inactivity message timestamp to nanoseconds");
4164020e
SM
1080 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1081 goto end;
1082 }
1083
1084 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
0f5c5d5c
SM
1085 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1086 "Invalid live stream state: "
1087 "have a late message that is none included in a stream "
1088 "inactivity timespan: last-inactivity-ts-ns={}, "
1089 "late-msg-ts-ns={}",
1090 last_inactivity_ts_ns, late_msg_ts_ns);
4164020e
SM
1091 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1092 goto end;
1093 }
1094
1095 /*
1096 * We now know that it's okay for this message to be late, we can now
1097 * adjust its timestamp to ensure monotonicity.
1098 */
0f5c5d5c
SM
1099 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1100 "Adjusting the timestamp of late message: late-msg-type={}, "
1101 "msg-new-ts-ns={}",
1102 static_cast<bt2::MessageType>(bt_message_get_type(late_msg)),
1103 stream_iter->last_inactivity_ts.value);
4164020e
SM
1104 switch (bt_message_get_type(late_msg)) {
1105 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1106 adjust_status = adjust_discarded_events_message(
1107 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1108 stream_iter->last_inactivity_ts.value);
1109 break;
1110 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1111 adjust_status = adjust_discarded_packets_message(
1112 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1113 stream_iter->last_inactivity_ts.value);
1114 break;
1115 default:
1116 bt_common_abort();
1117 }
1118
1119 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1120 stream_iter_status = adjust_status;
1121 goto end;
1122 }
1123
1124 BT_ASSERT_DBG(adjusted_message);
1125 stream_iter->current_msg = adjusted_message;
1126 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1127 bt_message_put_ref(late_msg);
cefd84f8 1128
285951be 1129end:
4164020e 1130 return stream_iter_status;
285951be
FD
1131}
1132
4164020e
SM
1133static enum lttng_live_iterator_status
1134next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1135 struct lttng_live_trace *live_trace,
1136 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1137{
4164020e 1138 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
4164020e 1139 enum lttng_live_iterator_status stream_iter_status;
4164020e
SM
1140 int64_t youngest_candidate_msg_ts = INT64_MAX;
1141 uint64_t stream_iter_idx;
1142
1143 BT_ASSERT_DBG(live_trace);
1144 BT_ASSERT_DBG(live_trace->stream_iterators);
1145
0f5c5d5c
SM
1146 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1147 "Finding the next stream iterator for trace: "
1148 "trace-id={}",
1149 live_trace->id);
4164020e
SM
1150 /*
1151 * Update the current message of every stream iterators of this trace.
1152 * The current msg of every stream must have a timestamp equal or
1153 * larger than the last message returned by this iterator. We must
1154 * ensure monotonicity.
1155 */
1156 stream_iter_idx = 0;
1157 while (stream_iter_idx < live_trace->stream_iterators->len) {
1158 bool stream_iter_is_ended = false;
1159 struct lttng_live_stream_iterator *stream_iter =
1160 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1161 stream_iter_idx);
1162
1163 /*
1164 * If there is no current message for this stream, go fetch
1165 * one.
1166 */
1167 while (!stream_iter->current_msg) {
1168 const bt_message *msg = NULL;
1169 int64_t curr_msg_ts_ns = INT64_MAX;
1170
1171 stream_iter_status =
1172 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1173
1174 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1175 stream_iter_is_ended = true;
1176 break;
1177 }
1178
1179 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1180 goto end;
1181 }
1182
1183 BT_ASSERT_DBG(msg);
1184
0f5c5d5c
SM
1185 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1186 "Live stream iterator returned message: msg-type={}, "
1187 "stream-name=\"{}\", viewer-stream-id={}",
1188 static_cast<bt2::MessageType>(bt_message_get_type(msg)),
1189 stream_iter->name->str, stream_iter->viewer_stream_id);
4164020e
SM
1190
1191 /*
1192 * Get the timestamp in nanoseconds from origin of this
e7401568 1193 * message.
4164020e 1194 */
7d91f1ac
SM
1195 live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
1196 &curr_msg_ts_ns);
4164020e
SM
1197
1198 /*
1199 * Check if the message of the current live stream
1200 * iterator occurred at the exact same time or after the
1201 * last message returned by this component's message
1202 * iterator. If not, we need to handle it with care.
1203 */
1204 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1205 stream_iter->current_msg = msg;
1206 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1207 } else {
1208 /*
1209 * We received a message from the past. This
1210 * may be fixable but it can also be an error.
1211 */
1212 stream_iter_status =
1213 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1214 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
0f5c5d5c
SM
1215 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1216 "Late message could not be handled correctly: "
1217 "lttng-live-msg-iter-addr={}, "
1218 "stream-name=\"{}\", "
1219 "curr-msg-ts={}, last-msg-ts={}",
1220 fmt::ptr(lttng_live_msg_iter),
1221 stream_iter->name->str, curr_msg_ts_ns,
1222 lttng_live_msg_iter->last_msg_ts_ns);
4164020e
SM
1223 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1224 goto end;
1225 }
1226 }
1227 }
1228
1229 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1230
1231 if (!stream_iter_is_ended) {
1232 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1233 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1234 /*
1235 * Update the current best candidate message
1236 * for the stream iterator of this live trace
1237 * to be forwarded downstream.
1238 */
1239 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1240 youngest_candidate_stream_iter = stream_iter;
1241 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1242 /*
1243 * Order the messages in an arbitrary but
1244 * deterministic way.
1245 */
1246 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1247 int ret = common_muxing_compare_messages(
1248 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1249 if (ret < 0) {
1250 /*
1251 * The `youngest_candidate_stream_iter->current_msg`
1252 * should go first. Update the next
1253 * iterator and the current timestamp.
1254 */
1255 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1256 youngest_candidate_stream_iter = stream_iter;
1257 } else if (ret == 0) {
1258 /*
1259 * Unable to pick which one should go
1260 * first.
1261 */
0f5c5d5c
SM
1262 BT_CPPLOGW_SPEC(
1263 lttng_live_msg_iter->logger,
4164020e 1264 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1265 "stream-iter-addr={}"
1266 "stream-iter-addr={}",
1267 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1268 }
1269 }
1270
1271 stream_iter_idx++;
1272 } else {
1273 /*
1274 * The live stream iterator has ended. That
1275 * iterator is removed from the array, but
1276 * there is no need to increment
1277 * stream_iter_idx as
1278 * g_ptr_array_remove_index_fast replaces the
1279 * removed element with the array's last
1280 * element.
1281 */
1282 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1283 }
1284 }
1285
1286 if (youngest_candidate_stream_iter) {
1287 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1288 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1289 } else {
1290 /*
1291 * The only case where we don't have a candidate for this trace
1292 * is if we reached the end of all the iterators.
1293 */
1294 BT_ASSERT(live_trace->stream_iterators->len == 0);
1295 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1296 }
14f28187
FD
1297
1298end:
4164020e 1299 return stream_iter_status;
14f28187
FD
1300}
1301
4164020e
SM
1302static enum lttng_live_iterator_status
1303next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1304 struct lttng_live_session *session,
1305 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1306{
4164020e
SM
1307 enum lttng_live_iterator_status stream_iter_status;
1308 uint64_t trace_idx = 0;
1309 int64_t youngest_candidate_msg_ts = INT64_MAX;
1310 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1311
0f5c5d5c
SM
1312 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1313 "Finding the next stream iterator for session: "
1314 "session-id={}",
1315 session->id);
4164020e
SM
1316 /*
1317 * Make sure we are attached to the session and look for new streams
1318 * and metadata.
1319 */
1320 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1321 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1322 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1323 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1324 goto end;
1325 }
1326
1327 BT_ASSERT_DBG(session->traces);
1328
1329 while (trace_idx < session->traces->len) {
1330 bool trace_is_ended = false;
1331 struct lttng_live_stream_iterator *stream_iter;
1332 struct lttng_live_trace *trace =
1333 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1334
1335 stream_iter_status =
1336 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1337 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1338 /*
1339 * All the live stream iterators for this trace are
1340 * ENDed. Remove the trace from this session.
1341 */
1342 trace_is_ended = true;
1343 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1344 goto end;
1345 }
1346
1347 if (!trace_is_ended) {
1348 BT_ASSERT_DBG(stream_iter);
1349
1350 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1351 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1352 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1353 youngest_candidate_stream_iter = stream_iter;
1354 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1355 /*
1356 * Order the messages in an arbitrary but
1357 * deterministic way.
1358 */
1359 int ret = common_muxing_compare_messages(
1360 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1361 if (ret < 0) {
1362 /*
1363 * The `youngest_candidate_stream_iter->current_msg`
1364 * should go first. Update the next iterator
1365 * and the current timestamp.
1366 */
1367 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1368 youngest_candidate_stream_iter = stream_iter;
1369 } else if (ret == 0) {
1370 /* Unable to pick which one should go first. */
0f5c5d5c
SM
1371 BT_CPPLOGW_SPEC(
1372 lttng_live_msg_iter->logger,
4164020e 1373 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1374 "stream-iter-addr={}"
1375 "youngest-candidate-stream-iter-addr={}",
1376 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1377 }
1378 }
1379 trace_idx++;
1380 } else {
1381 /*
1382 * trace_idx is not incremented since
1383 * g_ptr_array_remove_index_fast replaces the
1384 * element at trace_idx with the array's last element.
1385 */
1386 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1387 }
1388 }
1389 if (youngest_candidate_stream_iter) {
1390 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1391 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1392 } else {
1393 /*
1394 * The only cases where we don't have a candidate for this
1395 * trace is:
1396 * 1. if we reached the end of all the iterators of all the
1397 * traces of this session,
1398 * 2. if we never had live stream iterator in the first place.
1399 *
1400 * In either cases, we return END.
1401 */
1402 BT_ASSERT(session->traces->len == 0);
1403 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1404 }
7cdc2bab 1405end:
4164020e 1406 return stream_iter_status;
14f28187
FD
1407}
1408
4164020e 1409static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1410{
4164020e 1411 uint64_t i;
14f28187 1412
4164020e
SM
1413 for (i = 0; i < count; i++) {
1414 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1415 }
7cdc2bab
MD
1416}
1417
4164020e
SM
1418bt_message_iterator_class_next_method_status
1419lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1420 uint64_t capacity, uint64_t *count)
d3eb6e8f 1421{
1e690349
SM
1422 try {
1423 bt_message_iterator_class_next_method_status status;
1424 enum lttng_live_viewer_status viewer_status;
1425 struct lttng_live_msg_iter *lttng_live_msg_iter =
1426 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1427 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1428 enum lttng_live_iterator_status stream_iter_status;
1429 uint64_t session_idx;
4164020e 1430
1e690349 1431 *count = 0;
4164020e 1432
1e690349 1433 BT_ASSERT_DBG(lttng_live_msg_iter);
4164020e 1434
1e690349
SM
1435 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1436 /*
4164020e
SM
1437 * The iterator was interrupted in a previous call to the
1438 * `_next()` method. We currently do not support generating
1439 * messages after such event. The babeltrace2 CLI should never
1440 * be running the graph after being interrupted. So this check
1441 * is to prevent other graph users from using this live
1442 * iterator in an messed up internal state.
1443 */
1e690349
SM
1444 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1445 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1446 lttng_live_msg_iter->logger,
1447 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1448 goto end;
1449 }
4164020e 1450
1e690349 1451 /*
4164020e
SM
1452 * Clear all the invalid message reference that might be left over in
1453 * the output array.
1454 */
1e690349 1455 memset(msgs, 0, capacity * sizeof(*msgs));
4164020e 1456
1e690349 1457 /*
4164020e
SM
1458 * If no session are exposed on the relay found at the url provided by
1459 * the user, session count will be 0. In this case, we return status
1460 * end to return gracefully.
1461 */
1e690349
SM
1462 if (lttng_live_msg_iter->sessions->len == 0) {
1463 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1464 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1465 goto end;
1466 } else {
1467 /*
4164020e
SM
1468 * The are no more active session for this session
1469 * name. Retry to create a viewer session for the
1470 * requested session name.
1471 */
1e690349
SM
1472 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1473 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1474 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1475 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1476 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1477 "Error creating LTTng live viewer session");
1478 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1479 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1480 } else {
1481 bt_common_abort();
1482 }
1483 goto end;
4164020e 1484 }
4164020e
SM
1485 }
1486 }
4164020e 1487
1e690349
SM
1488 if (lttng_live_msg_iter->active_stream_iter == 0) {
1489 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1490 }
4164020e 1491
1e690349 1492 /*
4164020e
SM
1493 * Here the muxing of message is done.
1494 *
1495 * We need to iterate over all the streams of all the traces of all the
1496 * viewer sessions in order to get the message with the smallest
1497 * timestamp. In this case, a session is a viewer session and there is
1498 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1499 * kernel). Each viewer session can have multiple traces, for example,
1500 * 64bit UST viewer sessions could have multiple per-pid traces.
1501 *
1502 * We iterate over the streams of each traces to update and see what is
1503 * their next message's timestamp. From those timestamps, we select the
1504 * message with the smallest timestamp as the best candidate message
1505 * for that trace and do the same thing across all the sessions.
1506 *
1507 * We then compare the timestamp of best candidate message of all the
1508 * sessions to pick the message with the smallest timestamp and we
1509 * return it.
1510 */
1e690349
SM
1511 while (*count < capacity) {
1512 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1513 *candidate_stream_iter = NULL;
1514 int64_t youngest_msg_ts_ns = INT64_MAX;
1515
1516 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1517 session_idx = 0;
1518 while (session_idx < lttng_live_msg_iter->sessions->len) {
1519 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1520 lttng_live_msg_iter->sessions, session_idx);
1521
1522 /* Find the best candidate message to send downstream. */
1523 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1524 &candidate_stream_iter);
1525
1526 /* If we receive an END status, it means that either:
4164020e
SM
1527 * - Those traces never had active streams (UST with no
1528 * data produced yet),
1529 * - All live stream iterators have ENDed.*/
1e690349
SM
1530 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1531 if (session->closed && session->traces->len == 0) {
1532 /*
4164020e
SM
1533 * Remove the session from the list.
1534 * session_idx is not modified since
1535 * g_ptr_array_remove_index_fast
1536 * replaces the the removed element with
1537 * the array's last element.
1538 */
1e690349
SM
1539 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1540 } else {
1541 session_idx++;
1542 }
1543 continue;
4164020e 1544 }
4164020e 1545
1e690349
SM
1546 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1547 goto return_status;
1548 }
4164020e 1549
1e690349
SM
1550 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1551 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1552 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1553 youngest_stream_iter = candidate_stream_iter;
1554 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1555 /*
4164020e
SM
1556 * The currently selected message to be sent
1557 * downstream next has the exact same timestamp
1558 * that of the current candidate message. We
1559 * must break the tie in a predictable manner.
1560 */
1e690349
SM
1561 BT_CPPLOGD_STR_SPEC(
1562 lttng_live_msg_iter->logger,
1563 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1564 /*
4164020e
SM
1565 * Order the messages in an arbitrary but
1566 * deterministic way.
1567 */
1e690349
SM
1568 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1569 youngest_stream_iter->current_msg);
1570 if (ret < 0) {
1571 /*
4164020e
SM
1572 * The `candidate_stream_iter->current_msg`
1573 * should go first. Update the next
1574 * iterator and the current timestamp.
1575 */
1e690349
SM
1576 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1577 youngest_stream_iter = candidate_stream_iter;
1578 } else if (ret == 0) {
1579 /* Unable to pick which one should go first. */
1580 BT_CPPLOGW_SPEC(
1581 lttng_live_msg_iter->logger,
1582 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1583 "next-stream-iter-addr={}"
1584 "candidate-stream-iter-addr={}",
1585 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1586 }
4164020e 1587 }
4164020e 1588
1e690349
SM
1589 session_idx++;
1590 }
4164020e 1591
1e690349
SM
1592 if (!youngest_stream_iter) {
1593 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1594 goto return_status;
1595 }
4164020e 1596
1e690349
SM
1597 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1598 /* Ensure monotonicity. */
1599 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1600 youngest_stream_iter->current_msg_ts_ns);
4164020e 1601
1e690349 1602 /*
4164020e 1603 * Insert the next message to the message batch. This will set
e7401568 1604 * stream iterator current message to NULL so that next time
4164020e
SM
1605 * we fetch the next message of that stream iterator
1606 */
1e690349
SM
1607 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1608 (*count)++;
4164020e 1609
1e690349
SM
1610 /* Update the last timestamp in nanoseconds sent downstream. */
1611 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1612 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
4164020e 1613
1e690349
SM
1614 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1615 }
f79c2d7a
FD
1616
1617return_status:
1e690349
SM
1618 switch (stream_iter_status) {
1619 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1620 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1621 /*
4164020e
SM
1622 * If we gathered messages, return _OK even if the graph was
1623 * interrupted. This allows for the components downstream to at
e7401568 1624 * least get the those messages. If the graph was indeed
4164020e
SM
1625 * interrupted there should not be another _next() call as the
1626 * application will tear down the graph. This component class
1627 * doesn't support restarting after an interruption.
1628 */
1e690349
SM
1629 if (*count > 0) {
1630 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1631 } else {
1632 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1633 }
1634 break;
1635 case LTTNG_LIVE_ITERATOR_STATUS_END:
1636 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1637 break;
1638 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1639 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1640 "Memory error preparing the next batch of messages: "
1641 "live-iter-status={}",
1642 stream_iter_status);
1643 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1644 break;
1645 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1646 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1647 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1648 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1649 "Error preparing the next batch of messages: "
1650 "live-iter-status={}",
1651 stream_iter_status);
4164020e 1652
1e690349
SM
1653 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1654 /* Put all existing messages on error. */
1655 put_messages(msgs, *count);
1656 break;
1657 default:
1658 bt_common_abort();
1659 }
14f28187 1660
f79c2d7a 1661end:
1e690349
SM
1662 return status;
1663 } catch (const std::bad_alloc&) {
1664 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1665 } catch (const bt2::Error&) {
1666 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1667 }
7cdc2bab 1668}
41a2b7ae 1669
4164020e
SM
1670static struct lttng_live_msg_iter *
1671lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1672 bt_self_message_iterator *self_msg_it)
4f74db52 1673{
0f5c5d5c
SM
1674 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1675 msg_iter->self_comp = lttng_live_comp->self_comp;
1676 msg_iter->lttng_live_comp = lttng_live_comp;
1677 msg_iter->self_msg_iter = self_msg_it;
4164020e 1678
0f5c5d5c
SM
1679 msg_iter->active_stream_iter = 0;
1680 msg_iter->last_msg_ts_ns = INT64_MIN;
1681 msg_iter->was_interrupted = false;
4f74db52 1682
0f5c5d5c 1683 msg_iter->sessions =
4164020e 1684 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
0f5c5d5c 1685 BT_ASSERT(msg_iter->sessions);
4164020e 1686
0f5c5d5c 1687 return msg_iter;
4f74db52
FD
1688}
1689
4164020e
SM
1690bt_message_iterator_class_initialize_method_status
1691lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
7d91f1ac 1692 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
7cdc2bab 1693{
1e690349
SM
1694 try {
1695 bt_message_iterator_class_initialize_method_status status;
1696 struct lttng_live_component *lttng_live;
1697 struct lttng_live_msg_iter *lttng_live_msg_iter;
1698 enum lttng_live_viewer_status viewer_status;
1699 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1700
1701 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1702
1703 /* There can be only one downstream iterator at the same time. */
1704 BT_ASSERT(!lttng_live->has_msg_iter);
1705 lttng_live->has_msg_iter = true;
1706
1707 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1708 if (!lttng_live_msg_iter) {
1709 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1710 "Failed to create lttng_live_msg_iter");
1711 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1712 goto error;
1713 }
4164020e 1714
1e690349 1715 viewer_status = live_viewer_connection_create(
2780ec75 1716 lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1e690349
SM
1717 &lttng_live_msg_iter->viewer_connection);
1718 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1719 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1720 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1721 "Failed to create viewer connection");
1722 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1723 /*
4164020e
SM
1724 * Interruption in the _iter_init() method is not
1725 * supported. Return an error.
1726 */
1e690349
SM
1727 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1728 "Interrupted while creating viewer connection");
1729 }
10265ebe 1730
1e690349
SM
1731 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1732 goto error;
1733 }
4164020e 1734
1e690349
SM
1735 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1736 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1737 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1738 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1739 "Failed to create viewer session");
1740 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1741 /*
4164020e
SM
1742 * Interruption in the _iter_init() method is not
1743 * supported. Return an error.
1744 */
1e690349
SM
1745 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1746 "Interrupted when creating viewer session");
1747 }
4164020e 1748
10265ebe 1749 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e 1750 goto error;
4164020e 1751 }
4164020e 1752
1e690349
SM
1753 if (lttng_live_msg_iter->sessions->len == 0) {
1754 switch (lttng_live->params.sess_not_found_act) {
1755 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1756 BT_CPPLOGI_SPEC(
1757 lttng_live_msg_iter->logger,
1758 "Unable to connect to the requested live viewer session. "
1759 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1760 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2780ec75 1761 lttng_live->params.url);
1e690349
SM
1762 break;
1763 case SESSION_NOT_FOUND_ACTION_FAIL:
1764 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1765 lttng_live_msg_iter->logger,
1766 "Unable to connect to the requested live viewer session. "
1767 "Fail the message iterator initialization because of {}=\"{}\" "
1768 "component parameter: url =\"{}\"",
1769 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
2780ec75 1770 lttng_live->params.url);
1e690349
SM
1771 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1772 goto error;
1773 case SESSION_NOT_FOUND_ACTION_END:
1774 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1775 "Unable to connect to the requested live viewer session. "
1776 "End gracefully at the first _next() call because of {}=\"{}\""
1777 " component parameter: url=\"{}\"",
1778 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
2780ec75 1779 lttng_live->params.url);
1e690349
SM
1780 break;
1781 default:
1782 bt_common_abort();
1783 }
1784 }
1785
1786 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1787 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1788 goto end;
f79c2d7a 1789
14f28187 1790error:
1e690349 1791 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1792end:
1e690349
SM
1793 return status;
1794 } catch (const std::bad_alloc&) {
1795 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1796 } catch (const bt2::Error&) {
1797 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1798 }
7cdc2bab
MD
1799}
1800
80aff5ef 1801static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1802 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1803 bt_param_validation_value_descr::makeString()},
4164020e
SM
1804 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1805
1806static bt_component_class_query_method_status
1807lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
0f5c5d5c 1808 const bt2c::Logger& logger)
7cdc2bab 1809{
4164020e
SM
1810 bt_component_class_query_method_status status;
1811 const bt_value *url_value = NULL;
1812 const char *url;
1813 struct live_viewer_connection *viewer_connection = NULL;
1814 enum lttng_live_viewer_status viewer_status;
1815 enum bt_param_validation_status validation_status;
1816 gchar *validate_error = NULL;
1817
1818 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1819 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1820 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1821 goto error;
1822 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1823 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
0f5c5d5c 1824 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
4164020e
SM
1825 goto error;
1826 }
1827
1828 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1829 url = bt_value_string_get(url_value);
1830
0f5c5d5c 1831 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
4164020e
SM
1832 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1833 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c 1834 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
4164020e
SM
1835 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1836 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1837 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1838 } else {
1839 bt_common_abort();
1840 }
1841 goto error;
1842 }
1843
1844 status = live_viewer_connection_list_sessions(viewer_connection, result);
1845 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
0f5c5d5c 1846 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
4164020e
SM
1847 goto error;
1848 }
1849
1850 goto end;
c7eee084 1851
7cdc2bab 1852error:
4164020e 1853 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1854
4164020e
SM
1855 if (status >= 0) {
1856 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1857 }
c7eee084 1858
7cdc2bab 1859end:
4164020e
SM
1860 if (viewer_connection) {
1861 live_viewer_connection_destroy(viewer_connection);
1862 }
80aff5ef 1863
4164020e 1864 g_free(validate_error);
80aff5ef 1865
4164020e 1866 return status;
7cdc2bab
MD
1867}
1868
4164020e
SM
1869static bt_component_class_query_method_status
1870lttng_live_query_support_info(const bt_value *params, const bt_value **result,
0f5c5d5c 1871 const bt2c::Logger& logger)
312df793 1872{
4164020e
SM
1873 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1874 const bt_value *input_type_value;
1875 const bt_value *input_value;
1876 double weight = 0;
dd420a9b 1877 struct bt_common_lttng_live_url_parts parts = {};
4164020e
SM
1878
1879 /* Used by the logging macros */
1880 __attribute__((unused)) bt_self_component *self_comp = NULL;
1881
1882 *result = NULL;
1883 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1884 if (!input_type_value) {
0f5c5d5c 1885 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
4164020e
SM
1886 goto error;
1887 }
1888
1889 if (!bt_value_is_string(input_type_value)) {
0f5c5d5c 1890 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
4164020e
SM
1891 goto error;
1892 }
1893
1894 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1895 /* We don't handle file system paths */
1896 goto create_result;
1897 }
1898
1899 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1900 if (!input_value) {
0f5c5d5c 1901 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
4164020e
SM
1902 goto error;
1903 }
1904
1905 if (!bt_value_is_string(input_value)) {
0f5c5d5c 1906 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
4164020e
SM
1907 goto error;
1908 }
1909
1910 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1911 if (parts.session_name) {
1912 /*
1913 * Looks pretty much like an LTTng live URL: we got the
1914 * session name part, which forms a complete URL.
1915 */
1916 weight = .75;
1917 }
312df793
PP
1918
1919create_result:
4164020e
SM
1920 *result = bt_value_real_create_init(weight);
1921 if (!*result) {
1922 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1923 goto error;
1924 }
312df793 1925
4164020e 1926 goto end;
312df793
PP
1927
1928error:
4164020e
SM
1929 if (status >= 0) {
1930 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1931 }
312df793 1932
4164020e 1933 BT_ASSERT(!*result);
312df793
PP
1934
1935end:
4164020e
SM
1936 bt_common_destroy_lttng_live_url_parts(&parts);
1937 return status;
312df793
PP
1938}
1939
4164020e
SM
1940bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1941 bt_private_query_executor *priv_query_exec,
1942 const char *object, const bt_value *params,
1943 __attribute__((unused)) void *method_data,
1944 const bt_value **result)
7cdc2bab 1945{
1e690349
SM
1946 try {
1947 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1948 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1949 bt2::PrivateQueryExecutor {priv_query_exec},
1950 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1951
1952 if (strcmp(object, "sessions") == 0) {
1953 status = lttng_live_query_list_sessions(params, result, logger);
1954 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1955 status = lttng_live_query_support_info(params, result, logger);
1956 } else {
1957 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1958 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1959 goto end;
1960 }
14f28187
FD
1961
1962end:
1e690349
SM
1963 return status;
1964 } catch (const std::bad_alloc&) {
1965 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1966 } catch (const bt2::Error&) {
1967 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1968 }
7cdc2bab
MD
1969}
1970
4164020e 1971static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
7cdc2bab 1972{
afb0f12b 1973 delete lttng_live;
7cdc2bab
MD
1974}
1975
14f28187 1976void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1977{
4164020e
SM
1978 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
1979 bt_self_component_source_as_self_component(component));
1980
1981 if (!data) {
1982 return;
1983 }
1984 lttng_live_component_destroy_data(data);
7cdc2bab
MD
1985}
1986
4164020e
SM
1987static enum session_not_found_action
1988parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 1989{
4164020e
SM
1990 enum session_not_found_action action;
1991 const char *no_session_act_str = bt_value_string_get(no_session_param);
1992
1993 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1994 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1995 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1996 action = SESSION_NOT_FOUND_ACTION_FAIL;
1997 } else {
1998 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1999 action = SESSION_NOT_FOUND_ACTION_END;
2000 }
2001
2002 return action;
14f28187
FD
2003}
2004
88730e42
SM
2005static bt_param_validation_value_descr inputs_elem_descr =
2006 bt_param_validation_value_descr::makeString();
80aff5ef
SM
2007
2008static const char *sess_not_found_action_choices[] = {
4164020e
SM
2009 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2010 SESS_NOT_FOUND_ACTION_FAIL_STR,
2011 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
2012};
2013
2014static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
2015 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2016 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2017 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2018 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
2019 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2020
2021static bt_component_class_initialize_method_status
0f5c5d5c
SM
2022lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
2023 struct lttng_live_component **component)
7cdc2bab 2024{
4164020e
SM
2025 struct lttng_live_component *lttng_live = NULL;
2026 const bt_value *inputs_value;
2027 const bt_value *url_value;
2028 const bt_value *value;
4164020e
SM
2029 enum bt_param_validation_status validation_status;
2030 gchar *validation_error = NULL;
2031 bt_component_class_initialize_method_status status;
0f5c5d5c 2032 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
4164020e
SM
2033
2034 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2035 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2036 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2037 goto error;
2038 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
0f5c5d5c 2039 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
4164020e
SM
2040 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2041 goto error;
2042 }
2043
0f5c5d5c
SM
2044 lttng_live = new lttng_live_component {std::move(logger)};
2045 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
4164020e
SM
2046 lttng_live->max_query_size = MAX_QUERY_SIZE;
2047 lttng_live->has_msg_iter = false;
2048
2049 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2050 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2780ec75 2051 lttng_live->params.url = bt_value_string_get(url_value);
4164020e
SM
2052
2053 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2054 if (value) {
2055 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2056 } else {
0f5c5d5c
SM
2057 BT_CPPLOGI_SPEC(lttng_live->logger,
2058 "Optional `{}` parameter is missing: defaulting to `{}`.",
2059 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
4164020e
SM
2060 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2061 }
2062
2063 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2064 goto end;
7cdc2bab
MD
2065
2066error:
4164020e
SM
2067 lttng_live_component_destroy_data(lttng_live);
2068 lttng_live = NULL;
7cdc2bab 2069end:
4164020e 2070 g_free(validation_error);
80aff5ef 2071
4164020e
SM
2072 *component = lttng_live;
2073 return status;
d3e4dcd8
PP
2074}
2075
4164020e
SM
2076bt_component_class_initialize_method_status
2077lttng_live_component_init(bt_self_component_source *self_comp_src,
7d91f1ac 2078 bt_self_component_source_configuration *, const bt_value *params, void *)
f3bc2010 2079{
1e690349
SM
2080 try {
2081 struct lttng_live_component *lttng_live;
2082 bt_component_class_initialize_method_status ret;
2083 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2084 bt_self_component_add_port_status add_port_status;
2085
2086 ret = lttng_live_component_create(params, self_comp_src, &lttng_live);
2087 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2088 goto error;
2089 }
4164020e 2090
1e690349
SM
2091 add_port_status =
2092 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2093 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2094 ret = (bt_component_class_initialize_method_status) add_port_status;
2095 goto end;
2096 }
4164020e 2097
1e690349 2098 bt_self_component_set_data(self_comp, lttng_live);
4164020e 2099 goto end;
7cdc2bab 2100
19bdff20 2101error:
1e690349
SM
2102 lttng_live_component_destroy_data(lttng_live);
2103 lttng_live = NULL;
7cdc2bab 2104end:
1e690349
SM
2105 return ret;
2106 } catch (const std::bad_alloc&) {
2107 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2108 } catch (const bt2::Error&) {
2109 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2110 }
d85ef162 2111}
This page took 0.224873 seconds and 4 git commands to generate.