Clean-up: lttng-live: left-over tabs in comment
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
2ece7dd0 11#define BT_COMP_LOG_SELF_COMP self_comp
4164020e
SM
12#define BT_LOG_OUTPUT_LEVEL log_level
13#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
d9c39b0a 14#include "logging/comp-logging.h"
020bc26f 15
14f28187 16#include <inttypes.h>
c4f23e30 17#include <stdbool.h>
14f28187
FD
18#include <unistd.h>
19
3c22a242
FD
20#include <glib.h>
21
578e048b 22#include "common/assert.h"
3fadfbc0 23#include <babeltrace2/babeltrace.h>
578e048b 24#include "compat/compiler.h"
3fadfbc0 25#include <babeltrace2/types.h>
f3bc2010 26
376fc2bd 27#include "plugins/common/muxing/muxing.h"
80aff5ef 28#include "plugins/common/param-validation/param-validation.h"
376fc2bd 29
087cd0f5
SM
30#include "data-stream.hpp"
31#include "metadata.hpp"
32#include "lttng-live.hpp"
7cdc2bab 33
4164020e
SM
34#define MAX_QUERY_SIZE (256 * 1024)
35#define URL_PARAM "url"
36#define INPUTS_PARAM "inputs"
37#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
38#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
39#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
40#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 41
4164020e 42#define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__)
7cdc2bab 43
4164020e 44static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
14f28187 45{
4164020e
SM
46 switch (status) {
47 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
48 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
49 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
50 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
51 case LTTNG_LIVE_ITERATOR_STATUS_END:
52 return "LTTNG_LIVE_ITERATOR_STATUS_END";
53 case LTTNG_LIVE_ITERATOR_STATUS_OK:
54 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
55 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
56 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
57 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
58 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
59 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
60 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
61 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
62 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
63 default:
64 bt_common_abort();
65 }
14f28187
FD
66}
67
4164020e 68static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
7cdc2bab 69{
4164020e
SM
70 switch (state) {
71 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
72 return "ACTIVE_NO_DATA";
73 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
74 return "QUIESCENT_NO_DATA";
75 case LTTNG_LIVE_STREAM_QUIESCENT:
76 return "QUIESCENT";
77 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
78 return "ACTIVE_DATA";
79 case LTTNG_LIVE_STREAM_EOF:
80 return "EOF";
81 default:
82 return "ERROR";
83 }
7cdc2bab 84}
7cdc2bab 85
34533ae0 86void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 87 enum lttng_live_stream_state new_state)
34533ae0 88{
4164020e
SM
89 bt_self_component *self_comp = stream_iter->self_comp;
90 bt_logging_level log_level = stream_iter->log_level;
34533ae0 91
4164020e
SM
92 BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
93 ", old-state=%s, new-state=%s",
94 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
95 lttng_live_stream_state_string(new_state));
34533ae0 96
4164020e 97 stream_iter->state = new_state;
34533ae0
FD
98}
99
4164020e
SM
100#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
101 do { \
102 BT_COMP_LOGD("Live stream iterator state=%s, " \
103 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
104 "curr-inact-ts=%" PRId64, \
105 lttng_live_stream_state_string(live_stream_iter->state), \
106 live_stream_iter->last_inactivity_ts.is_set, \
107 live_stream_iter->last_inactivity_ts.value, \
108 live_stream_iter->current_inactivity_ts); \
109 } while (0);
6f79a7cf
MD
110
111BT_HIDDEN
9b4f9b42 112bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 113{
4164020e 114 bool ret;
6f79a7cf 115
4164020e
SM
116 if (!msg_iter) {
117 ret = false;
118 goto end;
119 }
7cdc2bab 120
4164020e 121 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 122
14f28187 123end:
4164020e 124 return ret;
7cdc2bab
MD
125}
126
4164020e
SM
127static struct lttng_live_trace *
128lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 129{
4164020e
SM
130 uint64_t trace_idx;
131 struct lttng_live_trace *ret_trace = NULL;
132
133 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
134 struct lttng_live_trace *trace =
135 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
136 if (trace->id == trace_id) {
137 ret_trace = trace;
138 goto end;
139 }
140 }
14f28187
FD
141
142end:
4164020e 143 return ret_trace;
d3eb6e8f
PP
144}
145
4164020e 146static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 147{
4164020e
SM
148 bt_logging_level log_level = trace->log_level;
149 bt_self_component *self_comp = trace->self_comp;
c01594de 150
4164020e 151 BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id);
7cdc2bab 152
4164020e
SM
153 BT_ASSERT(trace->stream_iterators);
154 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 155
4164020e
SM
156 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
157 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 158
4164020e
SM
159 lttng_live_metadata_fini(trace);
160 g_free(trace);
7cdc2bab
MD
161}
162
4164020e
SM
163static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
164 uint64_t trace_id)
7cdc2bab 165{
4164020e
SM
166 struct lttng_live_trace *trace = NULL;
167 bt_logging_level log_level = session->log_level;
168 bt_self_component *self_comp = session->self_comp;
169
170 BT_COMP_LOGD("Creating live trace: "
171 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
172 session->id, trace_id);
173 trace = g_new0(struct lttng_live_trace, 1);
174 if (!trace) {
175 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
176 goto error;
177 }
178 trace->log_level = session->log_level;
179 trace->self_comp = session->self_comp;
180 trace->session = session;
181 trace->id = trace_id;
182 trace->trace_class = NULL;
183 trace->trace = NULL;
184 trace->stream_iterators =
185 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
186 BT_ASSERT(trace->stream_iterators);
187 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
188 g_ptr_array_add(session->traces, trace);
189
190 goto end;
7cdc2bab 191error:
4164020e
SM
192 g_free(trace);
193 trace = NULL;
7cdc2bab 194end:
4164020e 195 return trace;
7cdc2bab
MD
196}
197
198BT_HIDDEN
4164020e
SM
199struct lttng_live_trace *
200lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
201 uint64_t trace_id)
7cdc2bab 202{
4164020e 203 struct lttng_live_trace *trace;
7cdc2bab 204
4164020e
SM
205 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
206 if (trace) {
207 goto end;
208 }
7cdc2bab 209
4164020e
SM
210 /* The session is the owner of the newly created trace. */
211 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 212
14f28187 213end:
4164020e 214 return trace;
7cdc2bab
MD
215}
216
217BT_HIDDEN
4164020e
SM
218int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
219 const char *hostname, const char *session_name)
7cdc2bab 220{
4164020e
SM
221 int ret = 0;
222 struct lttng_live_session *session;
223 bt_logging_level log_level = lttng_live_msg_iter->log_level;
224 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
225
226 BT_COMP_LOGD("Adding live session: "
227 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
228 session_id, hostname, session_name);
229
230 session = g_new0(struct lttng_live_session, 1);
231 if (!session) {
232 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
233 goto error;
234 }
235
236 session->log_level = lttng_live_msg_iter->log_level;
237 session->self_comp = lttng_live_msg_iter->self_comp;
238 session->id = session_id;
239 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
240 BT_ASSERT(session->traces);
241 session->lttng_live_msg_iter = lttng_live_msg_iter;
242 session->new_streams_needed = true;
243 session->hostname = g_string_new(hostname);
244 BT_ASSERT(session->hostname);
245
246 session->session_name = g_string_new(session_name);
247 BT_ASSERT(session->session_name);
248
249 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
250 goto end;
7cdc2bab 251error:
4164020e
SM
252 g_free(session);
253 ret = -1;
7cdc2bab 254end:
4164020e 255 return ret;
7cdc2bab
MD
256}
257
4164020e 258static void lttng_live_destroy_session(struct lttng_live_session *session)
7cdc2bab 259{
4164020e
SM
260 bt_logging_level log_level;
261 bt_self_component *self_comp;
262
263 if (!session) {
264 goto end;
265 }
266
267 log_level = session->log_level;
268 self_comp = session->self_comp;
269 BT_COMP_LOGD("Destroying live session: "
270 "session-id=%" PRIu64 ", session-name=\"%s\"",
271 session->id, session->session_name->str);
272 if (session->id != -1ULL) {
273 if (lttng_live_session_detach(session)) {
274 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
275 /* Old relayd cannot detach sessions. */
276 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id);
277 }
278 }
279 session->id = -1ULL;
280 }
281
282 if (session->traces) {
283 g_ptr_array_free(session->traces, TRUE);
284 }
285
286 if (session->hostname) {
287 g_string_free(session->hostname, TRUE);
288 }
289 if (session->session_name) {
290 g_string_free(session->session_name, TRUE);
291 }
292 g_free(session);
14f28187
FD
293
294end:
4164020e 295 return;
7cdc2bab
MD
296}
297
4164020e 298static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 299{
4164020e
SM
300 if (!lttng_live_msg_iter) {
301 goto end;
302 }
7cdc2bab 303
4164020e
SM
304 if (lttng_live_msg_iter->sessions) {
305 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
306 }
14f28187 307
4164020e
SM
308 if (lttng_live_msg_iter->viewer_connection) {
309 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
310 }
311 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
312 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 313
4164020e
SM
314 /* All stream iterators must be destroyed at this point. */
315 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
316 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 317
4164020e 318 g_free(lttng_live_msg_iter);
14f28187
FD
319
320end:
4164020e 321 return;
14f28187
FD
322}
323
324BT_HIDDEN
325void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
326{
4164020e 327 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 328
4164020e 329 BT_ASSERT(self_msg_iter);
14f28187 330
4164020e
SM
331 lttng_live_msg_iter =
332 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
333 BT_ASSERT(lttng_live_msg_iter);
334 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
335}
336
4164020e
SM
337static enum lttng_live_iterator_status
338lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 339{
4164020e
SM
340 bt_logging_level log_level = lttng_live_stream->log_level;
341 bt_self_component *self_comp = lttng_live_stream->self_comp;
342
343 switch (lttng_live_stream->state) {
344 case LTTNG_LIVE_STREAM_QUIESCENT:
345 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
346 break;
347 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
348 /* Invalid state. */
349 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
350 bt_common_abort();
351 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
352 /* Invalid state. */
353 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
354 bt_common_abort();
355 case LTTNG_LIVE_STREAM_EOF:
356 break;
357 }
358 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
359}
360
361/*
f93afbf9
FD
362 * For active no data stream, fetch next index. As a result of that it can
363 * become either:
364 * - quiescent: won't have events for a bit,
365 * - have data: need to get that data and produce the event,
366 * - have no data on this stream at this point: need to retry (AGAIN) or return
367 * EOF.
7cdc2bab 368 */
4164020e
SM
369static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
370 struct lttng_live_msg_iter *lttng_live_msg_iter,
371 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 372{
4164020e
SM
373 bt_logging_level log_level = lttng_live_msg_iter->log_level;
374 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
375 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
376 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
377 struct packet_index index;
378
379 if (lttng_live_stream->trace->metadata_stream_state ==
380 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
381 BT_COMP_LOGD(
382 "Need to get an update for the metadata stream before proceeding further with this stream: "
383 "stream-name=\"%s\"",
384 lttng_live_stream->name->str);
385 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
386 goto end;
387 }
388
389 if (lttng_live_stream->trace->session->new_streams_needed) {
390 BT_COMP_LOGD(
391 "Need to get an update of all streams before proceeding further with this stream: "
392 "stream-name=\"%s\"",
393 lttng_live_stream->name->str);
394 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
395 goto end;
396 }
397
398 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
399 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
400 goto end;
401 }
402 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
403 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
404 goto end;
405 }
406
407 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
408
409 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
410 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
411 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
412
413 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
414 /*
5cb27175
JG
415 * Because the stream is in the QUIESCENT_NO_DATA
416 * state, we can assert that the last_inactivity_ts was
417 * set and can be safely used in the `if` above.
418 */
4164020e
SM
419 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
420
421 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
422 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
423 } else {
424 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
425 }
426 goto end;
427 }
428
429 lttng_live_stream->base_offset = index.offset;
430 lttng_live_stream->offset = index.offset;
431 lttng_live_stream->len = index.packet_size / CHAR_BIT;
432
433 BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
434 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
435 ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
436 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
437 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
f93afbf9 438
7cdc2bab 439end:
4164020e
SM
440 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
441 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
442 }
443 return ret;
7cdc2bab
MD
444}
445
446/*
14f28187
FD
447 * Creation of the message requires the ctf trace class to be created
448 * beforehand, but the live protocol gives us all streams (including metadata)
449 * at once. So we split it in three steps: getting streams, getting metadata
450 * (which creates the ctf trace class), and then creating the per-stream
451 * messages.
7cdc2bab 452 */
4164020e
SM
453static enum lttng_live_iterator_status
454lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
455 struct lttng_live_session *session)
7cdc2bab 456{
4164020e
SM
457 bt_logging_level log_level = lttng_live_msg_iter->log_level;
458 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
459 enum lttng_live_iterator_status status;
460 uint64_t trace_idx;
461
462 if (!session->attached) {
463 BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id);
464 enum lttng_live_viewer_status attach_status =
465 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
466 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
467 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
468 /*
469 * Clear any causes appended in
470 * `lttng_live_attach_session()` as we want to
471 * return gracefully since the graph was
472 * cancelled.
473 */
474 bt_current_thread_clear_error();
475 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
476 } else {
477 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
478 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session");
479 }
480 goto end;
481 }
482 }
483
a4f118a3 484 BT_COMP_LOGD("Updating all data streams: "
4164020e
SM
485 "session-id=%" PRIu64 ", session-name=\"%s\"",
486 session->id, session->session_name->str);
487
488 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
489 switch (status) {
490 case LTTNG_LIVE_ITERATOR_STATUS_OK:
491 break;
492 case LTTNG_LIVE_ITERATOR_STATUS_END:
493 /*
494 * We received a `_END` from the `_get_new_streams()` function,
495 * which means no more data will ever be received from the data
496 * streams of this session. But it's possible that the metadata
497 * is incomplete.
498 * The live protocol guarantees that we receive all the
499 * metadata needed before we receive data streams needing it.
500 * But it's possible to receive metadata NOT needed by
501 * data streams after the session was closed. For example, this
502 * could happen if a new event is registered and the session is
503 * stopped before any tracepoint for that event is actually
504 * fired.
505 */
506 BT_COMP_LOGD(
507 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
508 "session-id=%" PRIu64 ", session-name=\"%s\"",
509 session->id, session->session_name->str);
510 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
511 break;
512 default:
4164020e
SM
513 goto end;
514 }
515
a4f118a3
FD
516 BT_COMP_LOGD("Updating metadata stream for session: "
517 "session-id=%" PRIu64 ", session-name=\"%s\"",
518 session->id, session->session_name->str);
519
4164020e
SM
520 trace_idx = 0;
521 while (trace_idx < session->traces->len) {
522 struct lttng_live_trace *trace =
523 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
524
525 status = lttng_live_metadata_update(trace);
526 switch (status) {
527 case LTTNG_LIVE_ITERATOR_STATUS_END:
528 case LTTNG_LIVE_ITERATOR_STATUS_OK:
529 trace_idx++;
530 break;
531 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
532 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
533 goto end;
534 default:
535 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
536 "Error updating trace metadata: "
537 "stream-iter-status=%s, trace-id=%" PRIu64,
538 lttng_live_iterator_status_string(status), trace->id);
539 goto end;
540 }
541 }
542
543 /*
544 * Now that we have the metadata we can initialize the downstream
545 * iterator.
546 */
547 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
548
549end:
4164020e 550 return status;
7cdc2bab
MD
551}
552
4164020e
SM
553static void
554lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 555{
4164020e
SM
556 uint64_t session_idx, trace_idx;
557 bt_logging_level log_level = lttng_live_msg_iter->log_level;
558 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
559
560 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
561 struct lttng_live_session *session =
562 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
563 BT_COMP_LOGD("Force marking session as needing new streams: "
564 "session-id=%" PRIu64,
565 session->id);
566 session->new_streams_needed = true;
567 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
568 struct lttng_live_trace *trace =
569 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
570 BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
571 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
572 session->id, trace->id);
573
574 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
575
576 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
577 }
578 }
7cdc2bab
MD
579}
580
4164020e
SM
581static enum lttng_live_iterator_status
582lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 583{
4164020e
SM
584 enum lttng_live_iterator_status status;
585 enum lttng_live_viewer_status viewer_status;
586 bt_logging_level log_level = lttng_live_msg_iter->log_level;
587 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
588 uint64_t session_idx = 0, nr_sessions_opened = 0;
589 struct lttng_live_session *session;
590 enum session_not_found_action sess_not_found_act =
591 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
592
593 BT_COMP_LOGD("Update data and metadata of all sessions: "
594 "live-msg-iter-addr=%p",
595 lttng_live_msg_iter);
596 /*
597 * In a remotely distant future, we could add a "new
598 * session" flag to the protocol, which would tell us that we
599 * need to query for new sessions even though we have sessions
600 * currently ongoing.
601 */
602 if (lttng_live_msg_iter->sessions->len == 0) {
603 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
604 BT_COMP_LOGD(
605 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
606 status = LTTNG_LIVE_ITERATOR_STATUS_END;
607 goto end;
608 } else {
609 BT_COMP_LOGD(
610 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
611 /*
612 * Retry to create a viewer session for the requested
613 * session name.
614 */
615 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
616 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
617 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
618 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
619 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
620 "Error creating LTTng live viewer session");
621 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
622 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
623 } else {
624 bt_common_abort();
625 }
626 goto end;
627 }
628 }
629 }
630
631 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
632 session =
633 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
634 status = lttng_live_get_session(lttng_live_msg_iter, session);
635 switch (status) {
636 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 637 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
638 /*
639 * A session returned `_END`. Other sessions may still
640 * be active so we override the status and continue
641 * looping if needed.
642 */
4164020e
SM
643 break;
644 default:
645 goto end;
646 }
647 if (!session->closed) {
648 nr_sessions_opened++;
649 }
650 }
651
652 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
653 status = LTTNG_LIVE_ITERATOR_STATUS_END;
654 } else {
655 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
656 }
f79c2d7a
FD
657
658end:
4164020e 659 return status;
7cdc2bab
MD
660}
661
4164020e
SM
662static enum lttng_live_iterator_status
663emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
664 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
665 uint64_t timestamp)
7cdc2bab 666{
4164020e
SM
667 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
668 bt_logging_level log_level = lttng_live_msg_iter->log_level;
669 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
670 bt_message *msg = NULL;
671
672 BT_ASSERT(stream_iter->trace->clock_class);
673
674 BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
675 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
676 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
677
678 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
679 stream_iter->trace->clock_class, timestamp);
680 if (!msg) {
681 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message");
682 goto error;
683 }
684
685 *message = msg;
7cdc2bab 686end:
4164020e 687 return ret;
7cdc2bab
MD
688
689error:
4164020e
SM
690 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
691 bt_message_put_ref(msg);
692 goto end;
7cdc2bab
MD
693}
694
4164020e
SM
695static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
696 struct lttng_live_msg_iter *lttng_live_msg_iter,
697 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 698{
4164020e
SM
699 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
700
701 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
702 return LTTNG_LIVE_ITERATOR_STATUS_OK;
703 }
704
705 /*
706 * Check if we already sent an inactivty message downstream for this
707 * `current_inactivity_ts` value.
708 */
709 if (lttng_live_stream->last_inactivity_ts.is_set &&
710 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
711 lttng_live_stream_iterator_set_state(lttng_live_stream,
712 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
713
714 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
715 goto end;
716 }
717
718 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
719 lttng_live_stream->current_inactivity_ts);
720
721 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
722 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 723end:
4164020e 724 return ret;
14f28187
FD
725}
726
4164020e
SM
727static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
728 struct lttng_live_msg_iter *lttng_live_msg_iter,
729 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 730{
4164020e
SM
731 const bt_clock_class *clock_class = NULL;
732 const bt_clock_snapshot *clock_snapshot = NULL;
733 int ret = 0;
734 bt_logging_level log_level = lttng_live_msg_iter->log_level;
735 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
736
737 BT_ASSERT_DBG(msg);
738 BT_ASSERT_DBG(ts_ns);
739
740 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
741 "last-msg-ts=%" PRId64,
742 lttng_live_msg_iter, msg, last_msg_ts_ns);
743
744 switch (bt_message_get_type(msg)) {
745 case BT_MESSAGE_TYPE_EVENT:
746 clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(msg);
747 BT_ASSERT_DBG(clock_class);
748
749 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
750 break;
751 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
752 clock_class =
753 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(msg);
754 BT_ASSERT(clock_class);
755
756 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
757 break;
758 case BT_MESSAGE_TYPE_PACKET_END:
759 clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(msg);
760 BT_ASSERT(clock_class);
761
762 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
763 break;
764 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
765 clock_class =
766 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(msg);
767 BT_ASSERT(clock_class);
768
769 clock_snapshot =
770 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
771 break;
772 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
773 clock_class =
774 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(msg);
775 BT_ASSERT(clock_class);
776
777 clock_snapshot =
778 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
779 break;
780 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
781 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
782 break;
783 default:
784 /* All the other messages have a higher priority */
785 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
786 *ts_ns = last_msg_ts_ns;
787 goto end;
788 }
789
790 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
791 BT_ASSERT_DBG(clock_class);
792
793 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
794 if (ret) {
795 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
796 "Cannot get nanoseconds from Epoch of clock snapshot: "
797 "clock-snapshot-addr=%p",
798 clock_snapshot);
799 goto error;
800 }
801
802 goto end;
14f28187 803
14f28187 804error:
4164020e 805 ret = -1;
7cdc2bab 806
7cdc2bab 807end:
4164020e
SM
808 if (ret == 0) {
809 BT_COMP_LOGD("Found message's timestamp: "
810 "iter-data-addr=%p, msg-addr=%p, "
811 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
812 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
813 }
814
815 return ret;
7cdc2bab
MD
816}
817
4164020e
SM
818static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
819 struct lttng_live_msg_iter *lttng_live_msg_iter,
820 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 821{
4164020e
SM
822 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
823 bt_logging_level log_level = lttng_live_msg_iter->log_level;
824 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
825 enum ctf_msg_iter_status status;
826 uint64_t session_idx, trace_idx;
827
828 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
829 struct lttng_live_session *session =
830 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
831
832 if (session->new_streams_needed) {
833 BT_COMP_LOGD("Need an update for streams: "
834 "session-id=%" PRIu64,
835 session->id);
836 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
837 goto end;
838 }
839 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
840 struct lttng_live_trace *trace =
841 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
842 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
843 BT_COMP_LOGD("Need an update for metadata stream: "
844 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
845 session->id, trace->id);
846 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
847 goto end;
848 }
849 }
850 }
851
852 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
853 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
854 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
855 "Invalid state of live stream iterator"
856 "stream-iter-status=%s",
857 lttng_live_stream_state_string(lttng_live_stream->state));
858 goto end;
859 }
860
861 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
862 switch (status) {
863 case CTF_MSG_ITER_STATUS_EOF:
864 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
865 break;
866 case CTF_MSG_ITER_STATUS_OK:
867 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
868 break;
869 case CTF_MSG_ITER_STATUS_AGAIN:
870 /*
871 * Continue immediately (end of packet). The next
872 * get_index may return AGAIN to delay the following
873 * attempt.
874 */
875 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
876 break;
877 case CTF_MSG_ITER_STATUS_ERROR:
878 default:
879 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
880 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
881 "CTF message iterator failed to get next message: "
882 "msg-iter=%p, msg-iter-status=%s",
883 lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status));
884 break;
885 }
14f28187
FD
886
887end:
4164020e 888 return ret;
7cdc2bab
MD
889}
890
4164020e
SM
891static enum lttng_live_iterator_status
892lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
893 struct lttng_live_stream_iterator *stream_iter,
894 const bt_message **curr_msg)
4a39caef 895{
4164020e
SM
896 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
897 bt_logging_level log_level = lttng_live_msg_iter->log_level;
898 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
899
900 BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
901 "viewer-stream-id=%" PRIu64,
902 stream_iter->name->str, stream_iter->viewer_stream_id);
903
904 /*
905 * The viewer has hung up on us so we are closing the stream. The
906 * `ctf_msg_iter` should simply realize that it needs to close the
907 * stream properly by emitting the necessary stream end message.
908 */
909 enum ctf_msg_iter_status status =
910 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
911
912 if (status == CTF_MSG_ITER_STATUS_ERROR) {
913 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
914 "Error getting the next message from CTF message iterator");
915 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
916 goto end;
917 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
918 BT_COMP_LOGI("Reached the end of the live stream iterator.");
919 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
920 goto end;
921 }
922
923 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef
FD
924
925end:
4164020e 926 return live_status;
4a39caef
FD
927}
928
7cdc2bab
MD
929/*
930 * helper function:
931 * handle_no_data_streams()
932 * retry:
933 * - for each ACTIVE_NO_DATA stream:
934 * - query relayd for stream data, or quiescence info.
935 * - if need metadata, get metadata, goto retry.
936 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
937 * - if quiescent, move to QUIESCENT streams
938 * - if fetched data, move to ACTIVE_DATA streams
939 * (at this point each stream either has data, or is quiescent)
940 *
941 *
942 * iterator_next:
943 * handle_new_streams_and_metadata()
944 * - query relayd for known streams, add them as ACTIVE_NO_DATA
945 * - query relayd for metadata
946 *
947 * call handle_active_no_data_streams()
948 *
949 * handle_quiescent_streams()
950 * - if at least one stream is ACTIVE_DATA:
951 * - peek stream event with lowest timestamp -> next_ts
952 * - for each quiescent stream
953 * - if next_ts >= quiescent end
954 * - set state to ACTIVE_NO_DATA
955 * - else
956 * - for each quiescent stream
957 * - set state to ACTIVE_NO_DATA
958 *
959 * call handle_active_no_data_streams()
960 *
961 * handle_active_data_streams()
962 * - if at least one stream is ACTIVE_DATA:
963 * - get stream event with lowest timestamp from heap
d6e69534 964 * - make that stream event the current message.
7cdc2bab
MD
965 * - move this stream heap position to its next event
966 * - if we need to fetch data from relayd, move
967 * stream to ACTIVE_NO_DATA.
968 * - return OK
969 * - return AGAIN
970 *
971 * end criterion: ctrl-c on client. If relayd exits or the session
972 * closes on the relay daemon side, we keep on waiting for streams.
973 * Eventually handle --end timestamp (also an end criterion).
974 *
975 * When disconnected from relayd: try to re-connect endlessly.
976 */
4164020e
SM
977static enum lttng_live_iterator_status
978lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
979 struct lttng_live_stream_iterator *stream_iter,
980 const bt_message **curr_msg)
7cdc2bab 981{
4164020e
SM
982 bt_logging_level log_level = lttng_live_msg_iter->log_level;
983 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
984 enum lttng_live_iterator_status live_status;
985
986 BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
987 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
988 stream_iter->name->str, stream_iter->viewer_stream_id);
989
990 if (stream_iter->has_stream_hung_up) {
991 /*
992 * The stream has hung up and the stream was properly closed
993 * during the last call to the current function. Return _END
994 * status now so that this stream iterator is removed for the
995 * stream iterator list.
996 */
997 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
998 goto end;
999 }
4a39caef 1000
7cdc2bab 1001retry:
4164020e
SM
1002 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
1003
1004 /*
1005 * Make sure we have the most recent metadata and possibly some new
1006 * streams.
1007 */
1008 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
1009 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1010 goto end;
1011 }
1012
1013 live_status =
1014 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
1015 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1016 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1017 /*
1018 * We overwrite `live_status` since `curr_msg` is
1019 * likely set to a valid message in this function.
1020 */
1021 live_status =
1022 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
1023 }
1024 goto end;
1025 }
1026
1027 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
1028 stream_iter, curr_msg);
1029 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1030 BT_ASSERT(!*curr_msg);
1031 goto end;
1032 }
1033 if (*curr_msg) {
1034 goto end;
1035 }
1036 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
1037 stream_iter, curr_msg);
1038 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1039 BT_ASSERT(!*curr_msg);
1040 }
7cdc2bab
MD
1041
1042end:
4164020e
SM
1043 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
1044 BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
1045 goto retry;
1046 }
14f28187 1047
4164020e
SM
1048 BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, "
1049 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1050 lttng_live_iterator_status_string(live_status), stream_iter->name->str,
1051 stream_iter->viewer_stream_id);
f93afbf9 1052
4164020e 1053 return live_status;
7cdc2bab
MD
1054}
1055
4164020e 1056static bool is_discarded_packet_or_event_message(const bt_message *msg)
8ec4d5ff 1057{
4164020e 1058 const enum bt_message_type msg_type = bt_message_get_type(msg);
8ec4d5ff 1059
4164020e
SM
1060 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
1061 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
8ec4d5ff
JG
1062}
1063
4164020e
SM
1064static enum lttng_live_iterator_status
1065adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
1066 const bt_message *msg_in, bt_message **msg_out,
1067 uint64_t new_begin_ts)
8ec4d5ff 1068{
4164020e
SM
1069 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1070 enum bt_property_availability availability;
1071 const bt_clock_snapshot *clock_snapshot;
1072 uint64_t end_ts;
1073 uint64_t count;
1074
1075 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
1076 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1077
1078 availability = bt_message_discarded_packets_get_count(msg_in, &count);
1079 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1080
1081 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
1082 iter, stream, new_begin_ts, end_ts);
1083 if (!*msg_out) {
1084 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1085 goto end;
1086 }
1087
1088 bt_message_discarded_packets_set_count(*msg_out, count);
8ec4d5ff 1089end:
4164020e 1090 return status;
8ec4d5ff
JG
1091}
1092
4164020e
SM
1093static enum lttng_live_iterator_status
1094adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
1095 const bt_message *msg_in, bt_message **msg_out,
1096 uint64_t new_begin_ts)
8ec4d5ff 1097{
4164020e
SM
1098 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1099 enum bt_property_availability availability;
1100 const bt_clock_snapshot *clock_snapshot;
1101 uint64_t end_ts;
1102 uint64_t count;
1103
1104 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
1105 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1106
1107 availability = bt_message_discarded_events_get_count(msg_in, &count);
1108 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1109
1110 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
1111 iter, stream, new_begin_ts, end_ts);
1112 if (!*msg_out) {
1113 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1114 goto end;
1115 }
1116
1117 bt_message_discarded_events_set_count(*msg_out, count);
8ec4d5ff 1118end:
4164020e 1119 return status;
8ec4d5ff
JG
1120}
1121
4164020e
SM
1122static enum lttng_live_iterator_status
1123handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1124 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1125 const bt_message *late_msg)
285951be 1126{
4164020e
SM
1127 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1128 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1129 const bt_clock_class *clock_class;
1130 const bt_stream_class *stream_class;
1131 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1132 int64_t last_inactivity_ts_ns;
1133 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1134 enum lttng_live_iterator_status adjust_status;
1135 bt_message *adjusted_message;
1136
1137 /*
1138 * The timestamp of the current message is before the last message sent
1139 * by this component. We CANNOT send it as is.
1140 *
1141 * The only expected scenario in which that could happen is the
1142 * following, everything else is a bug in this component, relay deamon,
1143 * or CTF parser.
1144 *
1145 * Expected scenario: The CTF message iterator emitted discarded
1146 * packets and discarded events with synthesized beginning and end
1147 * timestamps from the bounds of the last known packet and the newly
1148 * decoded packet header. The CTF message iterator is not aware of
1149 * stream inactivity beacons. Hence, we have to adjust the beginning
1150 * timestamp of those types of messages if a stream signalled its
1151 * inactivity up until _after_ the last known packet's beginning
1152 * timestamp.
1153 *
1154 * Otherwise, the monotonicity guarantee of message timestamps would
1155 * not be preserved.
1156 *
1157 * In short, the only scenario in which it's okay and fixable to
1158 * received a late message is when:
1159 * 1. the late message is a discarded packets or discarded events
1160 * message,
1161 * 2. this stream produced an inactivity message downstream, and
1162 * 3. the timestamp of the late message is within the inactivity
1163 * timespan we sent downstream through the inactivity message.
1164 */
1165
1166 BT_COMP_LOGD("Handling late message on live stream iterator: "
1167 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1168 stream_iter->name->str, stream_iter->viewer_stream_id);
1169
1170 if (!stream_iter->last_inactivity_ts.is_set) {
1171 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: "
1172 "have a late message when no inactivity message "
1173 "was ever sent for that stream.");
1174 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1175 goto end;
1176 }
1177
1178 if (!is_discarded_packet_or_event_message(late_msg)) {
1179 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1180 "Invalid live stream state: "
1181 "have a late message that is not a packet discarded or "
1182 "event discarded message: late-msg-type=%s",
1183 bt_common_message_type_string(bt_message_get_type(late_msg)));
1184 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1185 goto end;
1186 }
1187
1188 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1189 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1190
1191 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1192 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1193 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1194 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last "
1195 "inactivity message timestamp to nanoseconds");
1196 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1197 goto end;
1198 }
1199
1200 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1201 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1202 "Invalid live stream state: "
1203 "have a late message that is none included in a stream "
1204 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1205 "late-msg-ts-ns=%" PRIu64,
1206 last_inactivity_ts_ns, late_msg_ts_ns);
1207 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1208 goto end;
1209 }
1210
1211 /*
1212 * We now know that it's okay for this message to be late, we can now
1213 * adjust its timestamp to ensure monotonicity.
1214 */
1215 BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1216 "msg-new-ts-ns=%" PRIu64,
1217 bt_common_message_type_string(bt_message_get_type(late_msg)),
1218 stream_iter->last_inactivity_ts.value);
1219 switch (bt_message_get_type(late_msg)) {
1220 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1221 adjust_status = adjust_discarded_events_message(
1222 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1223 stream_iter->last_inactivity_ts.value);
1224 break;
1225 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1226 adjust_status = adjust_discarded_packets_message(
1227 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1228 stream_iter->last_inactivity_ts.value);
1229 break;
1230 default:
1231 bt_common_abort();
1232 }
1233
1234 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1235 stream_iter_status = adjust_status;
1236 goto end;
1237 }
1238
1239 BT_ASSERT_DBG(adjusted_message);
1240 stream_iter->current_msg = adjusted_message;
1241 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1242 bt_message_put_ref(late_msg);
cefd84f8 1243
285951be 1244end:
4164020e 1245 return stream_iter_status;
285951be
FD
1246}
1247
4164020e
SM
1248static enum lttng_live_iterator_status
1249next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1250 struct lttng_live_trace *live_trace,
1251 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1252{
4164020e
SM
1253 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1254 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1255 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1256 enum lttng_live_iterator_status stream_iter_status;
1257 ;
1258 int64_t youngest_candidate_msg_ts = INT64_MAX;
1259 uint64_t stream_iter_idx;
1260
1261 BT_ASSERT_DBG(live_trace);
1262 BT_ASSERT_DBG(live_trace->stream_iterators);
1263
1264 BT_COMP_LOGD("Finding the next stream iterator for trace: "
1265 "trace-id=%" PRIu64,
1266 live_trace->id);
1267 /*
1268 * Update the current message of every stream iterators of this trace.
1269 * The current msg of every stream must have a timestamp equal or
1270 * larger than the last message returned by this iterator. We must
1271 * ensure monotonicity.
1272 */
1273 stream_iter_idx = 0;
1274 while (stream_iter_idx < live_trace->stream_iterators->len) {
1275 bool stream_iter_is_ended = false;
1276 struct lttng_live_stream_iterator *stream_iter =
1277 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1278 stream_iter_idx);
1279
1280 /*
1281 * If there is no current message for this stream, go fetch
1282 * one.
1283 */
1284 while (!stream_iter->current_msg) {
1285 const bt_message *msg = NULL;
1286 int64_t curr_msg_ts_ns = INT64_MAX;
1287
1288 stream_iter_status =
1289 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1290
1291 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1292 stream_iter_is_ended = true;
1293 break;
1294 }
1295
1296 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1297 goto end;
1298 }
1299
1300 BT_ASSERT_DBG(msg);
1301
1302 BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, "
1303 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1304 bt_common_message_type_string(bt_message_get_type(msg)),
1305 stream_iter->name->str, stream_iter->viewer_stream_id);
1306
1307 /*
1308 * Get the timestamp in nanoseconds from origin of this
1309 * messsage.
1310 */
1311 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg,
1312 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1313
1314 /*
1315 * Check if the message of the current live stream
1316 * iterator occurred at the exact same time or after the
1317 * last message returned by this component's message
1318 * iterator. If not, we need to handle it with care.
1319 */
1320 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1321 stream_iter->current_msg = msg;
1322 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1323 } else {
1324 /*
1325 * We received a message from the past. This
1326 * may be fixable but it can also be an error.
1327 */
1328 stream_iter_status =
1329 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1330 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1331 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1332 "Late message could not be handled correctly: "
1333 "lttng-live-msg-iter-addr=%p, "
1334 "stream-name=\"%s\", "
1335 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1336 lttng_live_msg_iter, stream_iter->name->str,
1337 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1338 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1339 goto end;
1340 }
1341 }
1342 }
1343
1344 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1345
1346 if (!stream_iter_is_ended) {
1347 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1348 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1349 /*
1350 * Update the current best candidate message
1351 * for the stream iterator of this live trace
1352 * to be forwarded downstream.
1353 */
1354 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1355 youngest_candidate_stream_iter = stream_iter;
1356 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1357 /*
1358 * Order the messages in an arbitrary but
1359 * deterministic way.
1360 */
1361 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1362 int ret = common_muxing_compare_messages(
1363 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1364 if (ret < 0) {
1365 /*
1366 * The `youngest_candidate_stream_iter->current_msg`
1367 * should go first. Update the next
1368 * iterator and the current timestamp.
1369 */
1370 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1371 youngest_candidate_stream_iter = stream_iter;
1372 } else if (ret == 0) {
1373 /*
1374 * Unable to pick which one should go
1375 * first.
1376 */
1377 BT_COMP_LOGW(
1378 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1379 "stream-iter-addr=%p"
1380 "stream-iter-addr=%p",
1381 stream_iter, youngest_candidate_stream_iter);
1382 }
1383 }
1384
1385 stream_iter_idx++;
1386 } else {
1387 /*
1388 * The live stream iterator has ended. That
1389 * iterator is removed from the array, but
1390 * there is no need to increment
1391 * stream_iter_idx as
1392 * g_ptr_array_remove_index_fast replaces the
1393 * removed element with the array's last
1394 * element.
1395 */
1396 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1397 }
1398 }
1399
1400 if (youngest_candidate_stream_iter) {
1401 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1402 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1403 } else {
1404 /*
1405 * The only case where we don't have a candidate for this trace
1406 * is if we reached the end of all the iterators.
1407 */
1408 BT_ASSERT(live_trace->stream_iterators->len == 0);
1409 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1410 }
14f28187
FD
1411
1412end:
4164020e 1413 return stream_iter_status;
14f28187
FD
1414}
1415
4164020e
SM
1416static enum lttng_live_iterator_status
1417next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1418 struct lttng_live_session *session,
1419 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1420{
4164020e
SM
1421 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1422 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1423 enum lttng_live_iterator_status stream_iter_status;
1424 uint64_t trace_idx = 0;
1425 int64_t youngest_candidate_msg_ts = INT64_MAX;
1426 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1427
1428 BT_COMP_LOGD("Finding the next stream iterator for session: "
1429 "session-id=%" PRIu64,
1430 session->id);
1431 /*
1432 * Make sure we are attached to the session and look for new streams
1433 * and metadata.
1434 */
1435 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1436 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1437 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1438 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1439 goto end;
1440 }
1441
1442 BT_ASSERT_DBG(session->traces);
1443
1444 while (trace_idx < session->traces->len) {
1445 bool trace_is_ended = false;
1446 struct lttng_live_stream_iterator *stream_iter;
1447 struct lttng_live_trace *trace =
1448 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1449
1450 stream_iter_status =
1451 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1452 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1453 /*
1454 * All the live stream iterators for this trace are
1455 * ENDed. Remove the trace from this session.
1456 */
1457 trace_is_ended = true;
1458 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1459 goto end;
1460 }
1461
1462 if (!trace_is_ended) {
1463 BT_ASSERT_DBG(stream_iter);
1464
1465 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1466 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1467 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1468 youngest_candidate_stream_iter = stream_iter;
1469 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1470 /*
1471 * Order the messages in an arbitrary but
1472 * deterministic way.
1473 */
1474 int ret = common_muxing_compare_messages(
1475 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1476 if (ret < 0) {
1477 /*
1478 * The `youngest_candidate_stream_iter->current_msg`
1479 * should go first. Update the next iterator
1480 * and the current timestamp.
1481 */
1482 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1483 youngest_candidate_stream_iter = stream_iter;
1484 } else if (ret == 0) {
1485 /* Unable to pick which one should go first. */
1486 BT_COMP_LOGW(
1487 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1488 "stream-iter-addr=%p"
1489 "stream-iter-addr=%p",
1490 stream_iter, youngest_candidate_stream_iter);
1491 }
1492 }
1493 trace_idx++;
1494 } else {
1495 /*
1496 * trace_idx is not incremented since
1497 * g_ptr_array_remove_index_fast replaces the
1498 * element at trace_idx with the array's last element.
1499 */
1500 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1501 }
1502 }
1503 if (youngest_candidate_stream_iter) {
1504 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1505 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1506 } else {
1507 /*
1508 * The only cases where we don't have a candidate for this
1509 * trace is:
1510 * 1. if we reached the end of all the iterators of all the
1511 * traces of this session,
1512 * 2. if we never had live stream iterator in the first place.
1513 *
1514 * In either cases, we return END.
1515 */
1516 BT_ASSERT(session->traces->len == 0);
1517 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1518 }
7cdc2bab 1519end:
4164020e 1520 return stream_iter_status;
14f28187
FD
1521}
1522
4164020e 1523static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1524{
4164020e 1525 uint64_t i;
14f28187 1526
4164020e
SM
1527 for (i = 0; i < count; i++) {
1528 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1529 }
7cdc2bab
MD
1530}
1531
d3eb6e8f 1532BT_HIDDEN
4164020e
SM
1533bt_message_iterator_class_next_method_status
1534lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1535 uint64_t capacity, uint64_t *count)
d3eb6e8f 1536{
4164020e
SM
1537 bt_message_iterator_class_next_method_status status;
1538 enum lttng_live_viewer_status viewer_status;
1539 struct lttng_live_msg_iter *lttng_live_msg_iter =
1540 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1541 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1542 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1543 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1544 enum lttng_live_iterator_status stream_iter_status;
1545 uint64_t session_idx;
1546
1547 *count = 0;
1548
1549 BT_ASSERT_DBG(lttng_live_msg_iter);
1550
1551 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1552 /*
1553 * The iterator was interrupted in a previous call to the
1554 * `_next()` method. We currently do not support generating
1555 * messages after such event. The babeltrace2 CLI should never
1556 * be running the graph after being interrupted. So this check
1557 * is to prevent other graph users from using this live
1558 * iterator in an messed up internal state.
1559 */
1560 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1561 BT_COMP_LOGE_APPEND_CAUSE(
1562 self_comp,
1563 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1564 goto end;
1565 }
1566
1567 /*
1568 * Clear all the invalid message reference that might be left over in
1569 * the output array.
1570 */
1571 memset(msgs, 0, capacity * sizeof(*msgs));
1572
1573 /*
1574 * If no session are exposed on the relay found at the url provided by
1575 * the user, session count will be 0. In this case, we return status
1576 * end to return gracefully.
1577 */
1578 if (lttng_live_msg_iter->sessions->len == 0) {
1579 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1580 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1581 goto end;
1582 } else {
1583 /*
1584 * The are no more active session for this session
1585 * name. Retry to create a viewer session for the
1586 * requested session name.
1587 */
1588 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1589 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1590 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1591 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1592 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1593 "Error creating LTTng live viewer session");
1594 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1595 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1596 } else {
1597 bt_common_abort();
1598 }
1599 goto end;
1600 }
1601 }
1602 }
1603
1604 if (lttng_live_msg_iter->active_stream_iter == 0) {
1605 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1606 }
1607
1608 /*
1609 * Here the muxing of message is done.
1610 *
1611 * We need to iterate over all the streams of all the traces of all the
1612 * viewer sessions in order to get the message with the smallest
1613 * timestamp. In this case, a session is a viewer session and there is
1614 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1615 * kernel). Each viewer session can have multiple traces, for example,
1616 * 64bit UST viewer sessions could have multiple per-pid traces.
1617 *
1618 * We iterate over the streams of each traces to update and see what is
1619 * their next message's timestamp. From those timestamps, we select the
1620 * message with the smallest timestamp as the best candidate message
1621 * for that trace and do the same thing across all the sessions.
1622 *
1623 * We then compare the timestamp of best candidate message of all the
1624 * sessions to pick the message with the smallest timestamp and we
1625 * return it.
1626 */
1627 while (*count < capacity) {
1628 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1629 *candidate_stream_iter = NULL;
1630 int64_t youngest_msg_ts_ns = INT64_MAX;
1631
1632 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1633 session_idx = 0;
1634 while (session_idx < lttng_live_msg_iter->sessions->len) {
1635 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1636 lttng_live_msg_iter->sessions, session_idx);
1637
1638 /* Find the best candidate message to send downstream. */
1639 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1640 &candidate_stream_iter);
1641
1642 /* If we receive an END status, it means that either:
1643 * - Those traces never had active streams (UST with no
1644 * data produced yet),
1645 * - All live stream iterators have ENDed.*/
1646 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1647 if (session->closed && session->traces->len == 0) {
1648 /*
1649 * Remove the session from the list.
1650 * session_idx is not modified since
1651 * g_ptr_array_remove_index_fast
1652 * replaces the the removed element with
1653 * the array's last element.
1654 */
1655 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1656 } else {
1657 session_idx++;
1658 }
1659 continue;
1660 }
1661
1662 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1663 goto return_status;
1664 }
1665
1666 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1667 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1668 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1669 youngest_stream_iter = candidate_stream_iter;
1670 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1671 /*
1672 * The currently selected message to be sent
1673 * downstream next has the exact same timestamp
1674 * that of the current candidate message. We
1675 * must break the tie in a predictable manner.
1676 */
1677 BT_COMP_LOGD_STR(
1678 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1679 /*
1680 * Order the messages in an arbitrary but
1681 * deterministic way.
1682 */
1683 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1684 youngest_stream_iter->current_msg);
1685 if (ret < 0) {
1686 /*
1687 * The `candidate_stream_iter->current_msg`
1688 * should go first. Update the next
1689 * iterator and the current timestamp.
1690 */
1691 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1692 youngest_stream_iter = candidate_stream_iter;
1693 } else if (ret == 0) {
1694 /* Unable to pick which one should go first. */
1695 BT_COMP_LOGW(
1696 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1697 "next-stream-iter-addr=%p"
1698 "candidate-stream-iter-addr=%p",
1699 youngest_stream_iter, candidate_stream_iter);
1700 }
1701 }
1702
1703 session_idx++;
1704 }
1705
1706 if (!youngest_stream_iter) {
1707 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1708 goto return_status;
1709 }
1710
1711 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1712 /* Ensure monotonicity. */
1713 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1714 youngest_stream_iter->current_msg_ts_ns);
1715
1716 /*
1717 * Insert the next message to the message batch. This will set
1718 * stream iterator current messsage to NULL so that next time
1719 * we fetch the next message of that stream iterator
1720 */
1721 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1722 (*count)++;
1723
1724 /* Update the last timestamp in nanoseconds sent downstream. */
1725 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1726 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1727
1728 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1729 }
f79c2d7a
FD
1730
1731return_status:
4164020e
SM
1732 switch (stream_iter_status) {
1733 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1734 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1735 /*
1736 * If we gathered messages, return _OK even if the graph was
1737 * interrupted. This allows for the components downstream to at
1738 * least get the thoses messages. If the graph was indeed
1739 * interrupted there should not be another _next() call as the
1740 * application will tear down the graph. This component class
1741 * doesn't support restarting after an interruption.
1742 */
1743 if (*count > 0) {
1744 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1745 } else {
1746 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1747 }
1748 break;
1749 case LTTNG_LIVE_ITERATOR_STATUS_END:
1750 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1751 break;
1752 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1753 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1754 "Memory error preparing the next batch of messages: "
1755 "live-iter-status=%s",
1756 lttng_live_iterator_status_string(stream_iter_status));
1757 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1758 break;
1759 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1760 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1761 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1762 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1763 "Error preparing the next batch of messages: "
1764 "live-iter-status=%s",
1765 lttng_live_iterator_status_string(stream_iter_status));
1766
1767 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1768 /* Put all existing messages on error. */
1769 put_messages(msgs, *count);
1770 break;
1771 default:
1772 bt_common_abort();
1773 }
14f28187 1774
f79c2d7a 1775end:
4164020e 1776 return status;
7cdc2bab 1777}
41a2b7ae 1778
4164020e
SM
1779static struct lttng_live_msg_iter *
1780lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1781 bt_self_message_iterator *self_msg_it)
4f74db52 1782{
4164020e
SM
1783 bt_self_component *self_comp = lttng_live_comp->self_comp;
1784 bt_logging_level log_level = lttng_live_comp->log_level;
4f74db52 1785
4164020e
SM
1786 struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1787 if (!lttng_live_msg_iter) {
1788 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
1789 goto end;
1790 }
1791
1792 lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
1793 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1794 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1795 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1796
1797 lttng_live_msg_iter->active_stream_iter = 0;
1798 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1799 lttng_live_msg_iter->was_interrupted = false;
4f74db52 1800
4164020e
SM
1801 lttng_live_msg_iter->sessions =
1802 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1803 BT_ASSERT(lttng_live_msg_iter->sessions);
1804
1805end:
1806 return lttng_live_msg_iter;
4f74db52
FD
1807}
1808
7cdc2bab 1809BT_HIDDEN
4164020e
SM
1810bt_message_iterator_class_initialize_method_status
1811lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1812 bt_self_message_iterator_configuration *config,
1813 bt_self_component_port_output *self_port)
7cdc2bab 1814{
4164020e
SM
1815 bt_message_iterator_class_initialize_method_status status;
1816 struct lttng_live_component *lttng_live;
1817 struct lttng_live_msg_iter *lttng_live_msg_iter;
1818 enum lttng_live_viewer_status viewer_status;
1819 bt_logging_level log_level;
1820 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1821
1822 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1823 log_level = lttng_live->log_level;
1824 self_comp = lttng_live->self_comp;
1825
1826 /* There can be only one downstream iterator at the same time. */
1827 BT_ASSERT(!lttng_live->has_msg_iter);
1828 lttng_live->has_msg_iter = true;
1829
1830 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1831 if (!lttng_live_msg_iter) {
4164020e 1832 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter");
10265ebe 1833 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e
SM
1834 goto error;
1835 }
1836
1837 viewer_status = live_viewer_connection_create(
1838 self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter,
1839 &lttng_live_msg_iter->viewer_connection);
1840 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1841 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1842 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection");
1843 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1844 /*
1845 * Interruption in the _iter_init() method is not
1846 * supported. Return an error.
1847 */
1848 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection");
1849 }
10265ebe
SM
1850
1851 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1852 goto error;
1853 }
1854
1855 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1856 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1857 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1858 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session");
1859 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1860 /*
1861 * Interruption in the _iter_init() method is not
1862 * supported. Return an error.
1863 */
1864 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session");
1865 }
10265ebe
SM
1866
1867 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1868 goto error;
1869 }
1870
1871 if (lttng_live_msg_iter->sessions->len == 0) {
1872 switch (lttng_live->params.sess_not_found_act) {
1873 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1874 BT_COMP_LOGI(
1875 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1876 "%s=\"%s\" component parameter: url=\"%s\"",
1877 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1878 lttng_live->params.url->str);
1879 break;
1880 case SESSION_NOT_FOUND_ACTION_FAIL:
1881 BT_COMP_LOGE_APPEND_CAUSE(
1882 self_comp,
1883 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1884 "component parameter: url =\"%s\"",
1885 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1886 lttng_live->params.url->str);
10265ebe 1887 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1888 goto error;
1889 case SESSION_NOT_FOUND_ACTION_END:
1890 BT_COMP_LOGI(
1891 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1892 "call because of %s=\"%s\" component parameter: "
1893 "url=\"%s\"",
1894 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1895 lttng_live->params.url->str);
1896 break;
1897 default:
1898 bt_common_abort();
1899 }
1900 }
1901
1902 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1903 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1904 goto end;
f79c2d7a 1905
14f28187 1906error:
4164020e 1907 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1908end:
4164020e 1909 return status;
7cdc2bab
MD
1910}
1911
80aff5ef 1912static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1913 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1914 bt_param_validation_value_descr::makeString()},
4164020e
SM
1915 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1916
1917static bt_component_class_query_method_status
1918lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1919 bt_self_component_class *self_comp_class, bt_logging_level log_level)
7cdc2bab 1920{
4164020e
SM
1921 bt_component_class_query_method_status status;
1922 const bt_value *url_value = NULL;
1923 const char *url;
1924 struct live_viewer_connection *viewer_connection = NULL;
1925 enum lttng_live_viewer_status viewer_status;
1926 enum bt_param_validation_status validation_status;
1927 gchar *validate_error = NULL;
1928
1929 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1930 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1931 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1932 goto error;
1933 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1934 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1935 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
1936 goto error;
1937 }
1938
1939 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1940 url = bt_value_string_get(url_value);
1941
1942 viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL,
1943 &viewer_connection);
1944 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1945 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1946 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection");
1947 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1948 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1949 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1950 } else {
1951 bt_common_abort();
1952 }
1953 goto error;
1954 }
1955
1956 status = live_viewer_connection_list_sessions(viewer_connection, result);
1957 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1958 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions");
1959 goto error;
1960 }
1961
1962 goto end;
c7eee084 1963
7cdc2bab 1964error:
4164020e 1965 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1966
4164020e
SM
1967 if (status >= 0) {
1968 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1969 }
c7eee084 1970
7cdc2bab 1971end:
4164020e
SM
1972 if (viewer_connection) {
1973 live_viewer_connection_destroy(viewer_connection);
1974 }
80aff5ef 1975
4164020e 1976 g_free(validate_error);
80aff5ef 1977
4164020e 1978 return status;
7cdc2bab
MD
1979}
1980
4164020e
SM
1981static bt_component_class_query_method_status
1982lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1983 bt_self_component_class *self_comp_class, bt_logging_level log_level)
312df793 1984{
4164020e
SM
1985 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1986 const bt_value *input_type_value;
1987 const bt_value *input_value;
1988 double weight = 0;
1989 struct bt_common_lttng_live_url_parts parts = {0};
1990
1991 /* Used by the logging macros */
1992 __attribute__((unused)) bt_self_component *self_comp = NULL;
1993
1994 *result = NULL;
1995 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1996 if (!input_type_value) {
1997 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter.");
1998 goto error;
1999 }
2000
2001 if (!bt_value_is_string(input_type_value)) {
2002 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value.");
2003 goto error;
2004 }
2005
2006 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
2007 /* We don't handle file system paths */
2008 goto create_result;
2009 }
2010
2011 input_value = bt_value_map_borrow_entry_value_const(params, "input");
2012 if (!input_value) {
2013 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter.");
2014 goto error;
2015 }
2016
2017 if (!bt_value_is_string(input_value)) {
2018 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
2019 "`input` parameter is not a string value.");
2020 goto error;
2021 }
2022
2023 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
2024 if (parts.session_name) {
2025 /*
2026 * Looks pretty much like an LTTng live URL: we got the
2027 * session name part, which forms a complete URL.
2028 */
2029 weight = .75;
2030 }
312df793
PP
2031
2032create_result:
4164020e
SM
2033 *result = bt_value_real_create_init(weight);
2034 if (!*result) {
2035 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
2036 goto error;
2037 }
312df793 2038
4164020e 2039 goto end;
312df793
PP
2040
2041error:
4164020e
SM
2042 if (status >= 0) {
2043 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
2044 }
312df793 2045
4164020e 2046 BT_ASSERT(!*result);
312df793
PP
2047
2048end:
4164020e
SM
2049 bt_common_destroy_lttng_live_url_parts(&parts);
2050 return status;
312df793
PP
2051}
2052
7cdc2bab 2053BT_HIDDEN
4164020e
SM
2054bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
2055 bt_private_query_executor *priv_query_exec,
2056 const char *object, const bt_value *params,
2057 __attribute__((unused)) void *method_data,
2058 const bt_value **result)
7cdc2bab 2059{
4164020e
SM
2060 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2061 bt_self_component *self_comp = NULL;
2062 bt_self_component_class *self_comp_class =
2063 bt_self_component_class_source_as_self_component_class(comp_class);
2064 bt_logging_level log_level = bt_query_executor_get_logging_level(
2065 bt_private_query_executor_as_query_executor_const(priv_query_exec));
2066
2067 if (strcmp(object, "sessions") == 0) {
2068 status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level);
2069 } else if (strcmp(object, "babeltrace.support-info") == 0) {
2070 status = lttng_live_query_support_info(params, result, self_comp_class, log_level);
2071 } else {
2072 BT_COMP_LOGI("Unknown query object `%s`", object);
2073 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
2074 goto end;
2075 }
14f28187
FD
2076
2077end:
4164020e 2078 return status;
7cdc2bab
MD
2079}
2080
4164020e 2081static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
7cdc2bab 2082{
4164020e
SM
2083 if (!lttng_live) {
2084 return;
2085 }
2086 if (lttng_live->params.url) {
2087 g_string_free(lttng_live->params.url, TRUE);
2088 }
2089 g_free(lttng_live);
7cdc2bab
MD
2090}
2091
2092BT_HIDDEN
14f28187 2093void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 2094{
4164020e
SM
2095 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
2096 bt_self_component_source_as_self_component(component));
2097
2098 if (!data) {
2099 return;
2100 }
2101 lttng_live_component_destroy_data(data);
7cdc2bab
MD
2102}
2103
4164020e
SM
2104static enum session_not_found_action
2105parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 2106{
4164020e
SM
2107 enum session_not_found_action action;
2108 const char *no_session_act_str = bt_value_string_get(no_session_param);
2109
2110 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2111 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2112 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2113 action = SESSION_NOT_FOUND_ACTION_FAIL;
2114 } else {
2115 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2116 action = SESSION_NOT_FOUND_ACTION_END;
2117 }
2118
2119 return action;
14f28187
FD
2120}
2121
88730e42
SM
2122static bt_param_validation_value_descr inputs_elem_descr =
2123 bt_param_validation_value_descr::makeString();
80aff5ef
SM
2124
2125static const char *sess_not_found_action_choices[] = {
4164020e
SM
2126 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2127 SESS_NOT_FOUND_ACTION_FAIL_STR,
2128 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
2129};
2130
2131static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
2132 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2133 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2134 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2135 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
2136 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2137
2138static bt_component_class_initialize_method_status
2139lttng_live_component_create(const bt_value *params, bt_logging_level log_level,
2140 bt_self_component *self_comp, struct lttng_live_component **component)
7cdc2bab 2141{
4164020e
SM
2142 struct lttng_live_component *lttng_live = NULL;
2143 const bt_value *inputs_value;
2144 const bt_value *url_value;
2145 const bt_value *value;
2146 const char *url;
2147 enum bt_param_validation_status validation_status;
2148 gchar *validation_error = NULL;
2149 bt_component_class_initialize_method_status status;
2150
2151 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2152 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2153 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2154 goto error;
2155 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2156 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
2157 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2158 goto error;
2159 }
2160
2161 lttng_live = g_new0(struct lttng_live_component, 1);
2162 if (!lttng_live) {
2163 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2164 goto end;
2165 }
2166 lttng_live->log_level = log_level;
2167 lttng_live->self_comp = self_comp;
2168 lttng_live->max_query_size = MAX_QUERY_SIZE;
2169 lttng_live->has_msg_iter = false;
2170
2171 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2172 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2173 url = bt_value_string_get(url_value);
2174
2175 lttng_live->params.url = g_string_new(url);
2176 if (!lttng_live->params.url) {
2177 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2178 goto error;
2179 }
2180
2181 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2182 if (value) {
2183 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2184 } else {
2185 BT_COMP_LOGI("Optional `%s` parameter is missing: "
2186 "defaulting to `%s`.",
2187 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2188 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2189 }
2190
2191 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2192 goto end;
7cdc2bab
MD
2193
2194error:
4164020e
SM
2195 lttng_live_component_destroy_data(lttng_live);
2196 lttng_live = NULL;
7cdc2bab 2197end:
4164020e 2198 g_free(validation_error);
80aff5ef 2199
4164020e
SM
2200 *component = lttng_live;
2201 return status;
d3e4dcd8
PP
2202}
2203
f3bc2010 2204BT_HIDDEN
4164020e
SM
2205bt_component_class_initialize_method_status
2206lttng_live_component_init(bt_self_component_source *self_comp_src,
2207 bt_self_component_source_configuration *config, const bt_value *params,
2208 __attribute__((unused)) void *init_method_data)
f3bc2010 2209{
4164020e
SM
2210 struct lttng_live_component *lttng_live;
2211 bt_component_class_initialize_method_status ret;
2212 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2213 bt_logging_level log_level =
2214 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2215 bt_self_component_add_port_status add_port_status;
2216
2217 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
2218 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2219 goto error;
2220 }
2221
2222 add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2223 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2224 ret = (bt_component_class_initialize_method_status) add_port_status;
2225 goto end;
2226 }
2227
2228 bt_self_component_set_data(self_comp, lttng_live);
2229 goto end;
7cdc2bab 2230
19bdff20 2231error:
4164020e
SM
2232 lttng_live_component_destroy_data(lttng_live);
2233 lttng_live = NULL;
7cdc2bab 2234end:
4164020e 2235 return ret;
d85ef162 2236}
This page took 0.195628 seconds and 4 git commands to generate.