param-validation: add static creation methods to bt_param_validation_value_descr
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
2ece7dd0 11#define BT_COMP_LOG_SELF_COMP self_comp
4164020e
SM
12#define BT_LOG_OUTPUT_LEVEL log_level
13#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
d9c39b0a 14#include "logging/comp-logging.h"
020bc26f 15
14f28187 16#include <inttypes.h>
c4f23e30 17#include <stdbool.h>
14f28187
FD
18#include <unistd.h>
19
3c22a242
FD
20#include <glib.h>
21
578e048b 22#include "common/assert.h"
3fadfbc0 23#include <babeltrace2/babeltrace.h>
578e048b 24#include "compat/compiler.h"
3fadfbc0 25#include <babeltrace2/types.h>
f3bc2010 26
376fc2bd 27#include "plugins/common/muxing/muxing.h"
80aff5ef 28#include "plugins/common/param-validation/param-validation.h"
376fc2bd 29
087cd0f5
SM
30#include "data-stream.hpp"
31#include "metadata.hpp"
32#include "lttng-live.hpp"
7cdc2bab 33
4164020e
SM
34#define MAX_QUERY_SIZE (256 * 1024)
35#define URL_PARAM "url"
36#define INPUTS_PARAM "inputs"
37#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
38#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
39#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
40#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 41
4164020e 42#define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__)
7cdc2bab 43
4164020e 44static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
14f28187 45{
4164020e
SM
46 switch (status) {
47 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
48 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
49 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
50 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
51 case LTTNG_LIVE_ITERATOR_STATUS_END:
52 return "LTTNG_LIVE_ITERATOR_STATUS_END";
53 case LTTNG_LIVE_ITERATOR_STATUS_OK:
54 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
55 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
56 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
57 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
58 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
59 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
60 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
61 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
62 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
63 default:
64 bt_common_abort();
65 }
14f28187
FD
66}
67
4164020e 68static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
7cdc2bab 69{
4164020e
SM
70 switch (state) {
71 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
72 return "ACTIVE_NO_DATA";
73 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
74 return "QUIESCENT_NO_DATA";
75 case LTTNG_LIVE_STREAM_QUIESCENT:
76 return "QUIESCENT";
77 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
78 return "ACTIVE_DATA";
79 case LTTNG_LIVE_STREAM_EOF:
80 return "EOF";
81 default:
82 return "ERROR";
83 }
7cdc2bab 84}
7cdc2bab 85
34533ae0 86void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 87 enum lttng_live_stream_state new_state)
34533ae0 88{
4164020e
SM
89 bt_self_component *self_comp = stream_iter->self_comp;
90 bt_logging_level log_level = stream_iter->log_level;
34533ae0 91
4164020e
SM
92 BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
93 ", old-state=%s, new-state=%s",
94 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
95 lttng_live_stream_state_string(new_state));
34533ae0 96
4164020e 97 stream_iter->state = new_state;
34533ae0
FD
98}
99
4164020e
SM
100#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
101 do { \
102 BT_COMP_LOGD("Live stream iterator state=%s, " \
103 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
104 "curr-inact-ts=%" PRId64, \
105 lttng_live_stream_state_string(live_stream_iter->state), \
106 live_stream_iter->last_inactivity_ts.is_set, \
107 live_stream_iter->last_inactivity_ts.value, \
108 live_stream_iter->current_inactivity_ts); \
109 } while (0);
6f79a7cf
MD
110
111BT_HIDDEN
9b4f9b42 112bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 113{
4164020e 114 bool ret;
6f79a7cf 115
4164020e
SM
116 if (!msg_iter) {
117 ret = false;
118 goto end;
119 }
7cdc2bab 120
4164020e 121 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 122
14f28187 123end:
4164020e 124 return ret;
7cdc2bab
MD
125}
126
4164020e
SM
127static struct lttng_live_trace *
128lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 129{
4164020e
SM
130 uint64_t trace_idx;
131 struct lttng_live_trace *ret_trace = NULL;
132
133 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
134 struct lttng_live_trace *trace =
135 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
136 if (trace->id == trace_id) {
137 ret_trace = trace;
138 goto end;
139 }
140 }
14f28187
FD
141
142end:
4164020e 143 return ret_trace;
d3eb6e8f
PP
144}
145
4164020e 146static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 147{
4164020e
SM
148 bt_logging_level log_level = trace->log_level;
149 bt_self_component *self_comp = trace->self_comp;
c01594de 150
4164020e 151 BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id);
7cdc2bab 152
4164020e
SM
153 BT_ASSERT(trace->stream_iterators);
154 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 155
4164020e
SM
156 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
157 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 158
4164020e
SM
159 lttng_live_metadata_fini(trace);
160 g_free(trace);
7cdc2bab
MD
161}
162
4164020e
SM
163static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
164 uint64_t trace_id)
7cdc2bab 165{
4164020e
SM
166 struct lttng_live_trace *trace = NULL;
167 bt_logging_level log_level = session->log_level;
168 bt_self_component *self_comp = session->self_comp;
169
170 BT_COMP_LOGD("Creating live trace: "
171 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
172 session->id, trace_id);
173 trace = g_new0(struct lttng_live_trace, 1);
174 if (!trace) {
175 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
176 goto error;
177 }
178 trace->log_level = session->log_level;
179 trace->self_comp = session->self_comp;
180 trace->session = session;
181 trace->id = trace_id;
182 trace->trace_class = NULL;
183 trace->trace = NULL;
184 trace->stream_iterators =
185 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
186 BT_ASSERT(trace->stream_iterators);
187 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
188 g_ptr_array_add(session->traces, trace);
189
190 goto end;
7cdc2bab 191error:
4164020e
SM
192 g_free(trace);
193 trace = NULL;
7cdc2bab 194end:
4164020e 195 return trace;
7cdc2bab
MD
196}
197
198BT_HIDDEN
4164020e
SM
199struct lttng_live_trace *
200lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
201 uint64_t trace_id)
7cdc2bab 202{
4164020e 203 struct lttng_live_trace *trace;
7cdc2bab 204
4164020e
SM
205 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
206 if (trace) {
207 goto end;
208 }
7cdc2bab 209
4164020e
SM
210 /* The session is the owner of the newly created trace. */
211 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 212
14f28187 213end:
4164020e 214 return trace;
7cdc2bab
MD
215}
216
217BT_HIDDEN
4164020e
SM
218int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
219 const char *hostname, const char *session_name)
7cdc2bab 220{
4164020e
SM
221 int ret = 0;
222 struct lttng_live_session *session;
223 bt_logging_level log_level = lttng_live_msg_iter->log_level;
224 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
225
226 BT_COMP_LOGD("Adding live session: "
227 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
228 session_id, hostname, session_name);
229
230 session = g_new0(struct lttng_live_session, 1);
231 if (!session) {
232 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
233 goto error;
234 }
235
236 session->log_level = lttng_live_msg_iter->log_level;
237 session->self_comp = lttng_live_msg_iter->self_comp;
238 session->id = session_id;
239 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
240 BT_ASSERT(session->traces);
241 session->lttng_live_msg_iter = lttng_live_msg_iter;
242 session->new_streams_needed = true;
243 session->hostname = g_string_new(hostname);
244 BT_ASSERT(session->hostname);
245
246 session->session_name = g_string_new(session_name);
247 BT_ASSERT(session->session_name);
248
249 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
250 goto end;
7cdc2bab 251error:
4164020e
SM
252 g_free(session);
253 ret = -1;
7cdc2bab 254end:
4164020e 255 return ret;
7cdc2bab
MD
256}
257
4164020e 258static void lttng_live_destroy_session(struct lttng_live_session *session)
7cdc2bab 259{
4164020e
SM
260 bt_logging_level log_level;
261 bt_self_component *self_comp;
262
263 if (!session) {
264 goto end;
265 }
266
267 log_level = session->log_level;
268 self_comp = session->self_comp;
269 BT_COMP_LOGD("Destroying live session: "
270 "session-id=%" PRIu64 ", session-name=\"%s\"",
271 session->id, session->session_name->str);
272 if (session->id != -1ULL) {
273 if (lttng_live_session_detach(session)) {
274 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
275 /* Old relayd cannot detach sessions. */
276 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id);
277 }
278 }
279 session->id = -1ULL;
280 }
281
282 if (session->traces) {
283 g_ptr_array_free(session->traces, TRUE);
284 }
285
286 if (session->hostname) {
287 g_string_free(session->hostname, TRUE);
288 }
289 if (session->session_name) {
290 g_string_free(session->session_name, TRUE);
291 }
292 g_free(session);
14f28187
FD
293
294end:
4164020e 295 return;
7cdc2bab
MD
296}
297
4164020e 298static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 299{
4164020e
SM
300 if (!lttng_live_msg_iter) {
301 goto end;
302 }
7cdc2bab 303
4164020e
SM
304 if (lttng_live_msg_iter->sessions) {
305 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
306 }
14f28187 307
4164020e
SM
308 if (lttng_live_msg_iter->viewer_connection) {
309 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
310 }
311 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
312 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 313
4164020e
SM
314 /* All stream iterators must be destroyed at this point. */
315 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
316 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 317
4164020e 318 g_free(lttng_live_msg_iter);
14f28187
FD
319
320end:
4164020e 321 return;
14f28187
FD
322}
323
324BT_HIDDEN
325void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
326{
4164020e 327 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 328
4164020e 329 BT_ASSERT(self_msg_iter);
14f28187 330
4164020e
SM
331 lttng_live_msg_iter =
332 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
333 BT_ASSERT(lttng_live_msg_iter);
334 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
335}
336
4164020e
SM
337static enum lttng_live_iterator_status
338lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 339{
4164020e
SM
340 bt_logging_level log_level = lttng_live_stream->log_level;
341 bt_self_component *self_comp = lttng_live_stream->self_comp;
342
343 switch (lttng_live_stream->state) {
344 case LTTNG_LIVE_STREAM_QUIESCENT:
345 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
346 break;
347 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
348 /* Invalid state. */
349 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
350 bt_common_abort();
351 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
352 /* Invalid state. */
353 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
354 bt_common_abort();
355 case LTTNG_LIVE_STREAM_EOF:
356 break;
357 }
358 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
359}
360
361/*
f93afbf9
FD
362 * For active no data stream, fetch next index. As a result of that it can
363 * become either:
364 * - quiescent: won't have events for a bit,
365 * - have data: need to get that data and produce the event,
366 * - have no data on this stream at this point: need to retry (AGAIN) or return
367 * EOF.
7cdc2bab 368 */
4164020e
SM
369static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
370 struct lttng_live_msg_iter *lttng_live_msg_iter,
371 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 372{
4164020e
SM
373 bt_logging_level log_level = lttng_live_msg_iter->log_level;
374 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
375 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
376 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
377 struct packet_index index;
378
379 if (lttng_live_stream->trace->metadata_stream_state ==
380 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
381 BT_COMP_LOGD(
382 "Need to get an update for the metadata stream before proceeding further with this stream: "
383 "stream-name=\"%s\"",
384 lttng_live_stream->name->str);
385 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
386 goto end;
387 }
388
389 if (lttng_live_stream->trace->session->new_streams_needed) {
390 BT_COMP_LOGD(
391 "Need to get an update of all streams before proceeding further with this stream: "
392 "stream-name=\"%s\"",
393 lttng_live_stream->name->str);
394 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
395 goto end;
396 }
397
398 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
399 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
400 goto end;
401 }
402 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
403 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
404 goto end;
405 }
406
407 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
408
409 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
410 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
411 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
412
413 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
414 /*
a043c362
FD
415 * Because the stream is in the QUIESCENT_NO_DATA
416 * state, we can assert that the last_inactivity_ts was
417 * set and can be safely used in the `if` above.
418 */
4164020e
SM
419 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
420
421 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
422 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
423 } else {
424 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
425 }
426 goto end;
427 }
428
429 lttng_live_stream->base_offset = index.offset;
430 lttng_live_stream->offset = index.offset;
431 lttng_live_stream->len = index.packet_size / CHAR_BIT;
432
433 BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
434 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
435 ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
436 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
437 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
f93afbf9 438
7cdc2bab 439end:
4164020e
SM
440 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
441 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
442 }
443 return ret;
7cdc2bab
MD
444}
445
446/*
14f28187
FD
447 * Creation of the message requires the ctf trace class to be created
448 * beforehand, but the live protocol gives us all streams (including metadata)
449 * at once. So we split it in three steps: getting streams, getting metadata
450 * (which creates the ctf trace class), and then creating the per-stream
451 * messages.
7cdc2bab 452 */
4164020e
SM
453static enum lttng_live_iterator_status
454lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
455 struct lttng_live_session *session)
7cdc2bab 456{
4164020e
SM
457 bt_logging_level log_level = lttng_live_msg_iter->log_level;
458 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
459 enum lttng_live_iterator_status status;
460 uint64_t trace_idx;
461
462 if (!session->attached) {
463 BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id);
464 enum lttng_live_viewer_status attach_status =
465 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
466 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
467 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
468 /*
469 * Clear any causes appended in
470 * `lttng_live_attach_session()` as we want to
471 * return gracefully since the graph was
472 * cancelled.
473 */
474 bt_current_thread_clear_error();
475 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
476 } else {
477 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
478 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session");
479 }
480 goto end;
481 }
482 }
483
484 BT_COMP_LOGD("Updating all streams and metadata for session: "
485 "session-id=%" PRIu64 ", session-name=\"%s\"",
486 session->id, session->session_name->str);
487
488 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
489 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && status != LTTNG_LIVE_ITERATOR_STATUS_END) {
490 goto end;
491 }
492
493 trace_idx = 0;
494 while (trace_idx < session->traces->len) {
495 struct lttng_live_trace *trace =
496 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
497
498 status = lttng_live_metadata_update(trace);
499 switch (status) {
500 case LTTNG_LIVE_ITERATOR_STATUS_END:
501 case LTTNG_LIVE_ITERATOR_STATUS_OK:
502 trace_idx++;
503 break;
504 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
505 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
506 goto end;
507 default:
508 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
509 "Error updating trace metadata: "
510 "stream-iter-status=%s, trace-id=%" PRIu64,
511 lttng_live_iterator_status_string(status), trace->id);
512 goto end;
513 }
514 }
515
516 /*
517 * Now that we have the metadata we can initialize the downstream
518 * iterator.
519 */
520 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
521
522end:
4164020e 523 return status;
7cdc2bab
MD
524}
525
4164020e
SM
526static void
527lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 528{
4164020e
SM
529 uint64_t session_idx, trace_idx;
530 bt_logging_level log_level = lttng_live_msg_iter->log_level;
531 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
532
533 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
534 struct lttng_live_session *session =
535 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
536 BT_COMP_LOGD("Force marking session as needing new streams: "
537 "session-id=%" PRIu64,
538 session->id);
539 session->new_streams_needed = true;
540 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
541 struct lttng_live_trace *trace =
542 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
543 BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
544 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
545 session->id, trace->id);
546
547 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
548
549 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
550 }
551 }
7cdc2bab
MD
552}
553
4164020e
SM
554static enum lttng_live_iterator_status
555lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 556{
4164020e
SM
557 enum lttng_live_iterator_status status;
558 enum lttng_live_viewer_status viewer_status;
559 bt_logging_level log_level = lttng_live_msg_iter->log_level;
560 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
561 uint64_t session_idx = 0, nr_sessions_opened = 0;
562 struct lttng_live_session *session;
563 enum session_not_found_action sess_not_found_act =
564 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
565
566 BT_COMP_LOGD("Update data and metadata of all sessions: "
567 "live-msg-iter-addr=%p",
568 lttng_live_msg_iter);
569 /*
570 * In a remotely distant future, we could add a "new
571 * session" flag to the protocol, which would tell us that we
572 * need to query for new sessions even though we have sessions
573 * currently ongoing.
574 */
575 if (lttng_live_msg_iter->sessions->len == 0) {
576 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
577 BT_COMP_LOGD(
578 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
579 status = LTTNG_LIVE_ITERATOR_STATUS_END;
580 goto end;
581 } else {
582 BT_COMP_LOGD(
583 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
584 /*
585 * Retry to create a viewer session for the requested
586 * session name.
587 */
588 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
589 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
590 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
591 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
592 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
593 "Error creating LTTng live viewer session");
594 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
595 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
596 } else {
597 bt_common_abort();
598 }
599 goto end;
600 }
601 }
602 }
603
604 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
605 session =
606 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
607 status = lttng_live_get_session(lttng_live_msg_iter, session);
608 switch (status) {
609 case LTTNG_LIVE_ITERATOR_STATUS_OK:
610 break;
611 case LTTNG_LIVE_ITERATOR_STATUS_END:
612 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
613 break;
614 default:
615 goto end;
616 }
617 if (!session->closed) {
618 nr_sessions_opened++;
619 }
620 }
621
622 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
623 status = LTTNG_LIVE_ITERATOR_STATUS_END;
624 } else {
625 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
626 }
f79c2d7a
FD
627
628end:
4164020e 629 return status;
7cdc2bab
MD
630}
631
4164020e
SM
632static enum lttng_live_iterator_status
633emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
634 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
635 uint64_t timestamp)
7cdc2bab 636{
4164020e
SM
637 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
638 bt_logging_level log_level = lttng_live_msg_iter->log_level;
639 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
640 bt_message *msg = NULL;
641
642 BT_ASSERT(stream_iter->trace->clock_class);
643
644 BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
645 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
646 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
647
648 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
649 stream_iter->trace->clock_class, timestamp);
650 if (!msg) {
651 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message");
652 goto error;
653 }
654
655 *message = msg;
7cdc2bab 656end:
4164020e 657 return ret;
7cdc2bab
MD
658
659error:
4164020e
SM
660 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
661 bt_message_put_ref(msg);
662 goto end;
7cdc2bab
MD
663}
664
4164020e
SM
665static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
666 struct lttng_live_msg_iter *lttng_live_msg_iter,
667 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 668{
4164020e
SM
669 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
670
671 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
672 return LTTNG_LIVE_ITERATOR_STATUS_OK;
673 }
674
675 /*
676 * Check if we already sent an inactivty message downstream for this
677 * `current_inactivity_ts` value.
678 */
679 if (lttng_live_stream->last_inactivity_ts.is_set &&
680 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
681 lttng_live_stream_iterator_set_state(lttng_live_stream,
682 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
683
684 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
685 goto end;
686 }
687
688 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
689 lttng_live_stream->current_inactivity_ts);
690
691 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
692 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 693end:
4164020e 694 return ret;
14f28187
FD
695}
696
4164020e
SM
697static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
698 struct lttng_live_msg_iter *lttng_live_msg_iter,
699 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 700{
4164020e
SM
701 const bt_clock_class *clock_class = NULL;
702 const bt_clock_snapshot *clock_snapshot = NULL;
703 int ret = 0;
704 bt_logging_level log_level = lttng_live_msg_iter->log_level;
705 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
706
707 BT_ASSERT_DBG(msg);
708 BT_ASSERT_DBG(ts_ns);
709
710 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
711 "last-msg-ts=%" PRId64,
712 lttng_live_msg_iter, msg, last_msg_ts_ns);
713
714 switch (bt_message_get_type(msg)) {
715 case BT_MESSAGE_TYPE_EVENT:
716 clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(msg);
717 BT_ASSERT_DBG(clock_class);
718
719 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
720 break;
721 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
722 clock_class =
723 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(msg);
724 BT_ASSERT(clock_class);
725
726 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
727 break;
728 case BT_MESSAGE_TYPE_PACKET_END:
729 clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(msg);
730 BT_ASSERT(clock_class);
731
732 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
733 break;
734 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
735 clock_class =
736 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(msg);
737 BT_ASSERT(clock_class);
738
739 clock_snapshot =
740 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
741 break;
742 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
743 clock_class =
744 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(msg);
745 BT_ASSERT(clock_class);
746
747 clock_snapshot =
748 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
749 break;
750 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
751 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
752 break;
753 default:
754 /* All the other messages have a higher priority */
755 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
756 *ts_ns = last_msg_ts_ns;
757 goto end;
758 }
759
760 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
761 BT_ASSERT_DBG(clock_class);
762
763 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
764 if (ret) {
765 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
766 "Cannot get nanoseconds from Epoch of clock snapshot: "
767 "clock-snapshot-addr=%p",
768 clock_snapshot);
769 goto error;
770 }
771
772 goto end;
14f28187 773
14f28187 774error:
4164020e 775 ret = -1;
7cdc2bab 776
7cdc2bab 777end:
4164020e
SM
778 if (ret == 0) {
779 BT_COMP_LOGD("Found message's timestamp: "
780 "iter-data-addr=%p, msg-addr=%p, "
781 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
782 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
783 }
784
785 return ret;
7cdc2bab
MD
786}
787
4164020e
SM
788static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
789 struct lttng_live_msg_iter *lttng_live_msg_iter,
790 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 791{
4164020e
SM
792 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
793 bt_logging_level log_level = lttng_live_msg_iter->log_level;
794 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
795 enum ctf_msg_iter_status status;
796 uint64_t session_idx, trace_idx;
797
798 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
799 struct lttng_live_session *session =
800 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
801
802 if (session->new_streams_needed) {
803 BT_COMP_LOGD("Need an update for streams: "
804 "session-id=%" PRIu64,
805 session->id);
806 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
807 goto end;
808 }
809 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
810 struct lttng_live_trace *trace =
811 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
812 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
813 BT_COMP_LOGD("Need an update for metadata stream: "
814 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
815 session->id, trace->id);
816 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
817 goto end;
818 }
819 }
820 }
821
822 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
823 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
824 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
825 "Invalid state of live stream iterator"
826 "stream-iter-status=%s",
827 lttng_live_stream_state_string(lttng_live_stream->state));
828 goto end;
829 }
830
831 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
832 switch (status) {
833 case CTF_MSG_ITER_STATUS_EOF:
834 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
835 break;
836 case CTF_MSG_ITER_STATUS_OK:
837 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
838 break;
839 case CTF_MSG_ITER_STATUS_AGAIN:
840 /*
841 * Continue immediately (end of packet). The next
842 * get_index may return AGAIN to delay the following
843 * attempt.
844 */
845 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
846 break;
847 case CTF_MSG_ITER_STATUS_ERROR:
848 default:
849 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
850 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
851 "CTF message iterator failed to get next message: "
852 "msg-iter=%p, msg-iter-status=%s",
853 lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status));
854 break;
855 }
14f28187
FD
856
857end:
4164020e 858 return ret;
7cdc2bab
MD
859}
860
4164020e
SM
861static enum lttng_live_iterator_status
862lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
863 struct lttng_live_stream_iterator *stream_iter,
864 const bt_message **curr_msg)
4a39caef 865{
4164020e
SM
866 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
867 bt_logging_level log_level = lttng_live_msg_iter->log_level;
868 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
869
870 BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
871 "viewer-stream-id=%" PRIu64,
872 stream_iter->name->str, stream_iter->viewer_stream_id);
873
874 /*
875 * The viewer has hung up on us so we are closing the stream. The
876 * `ctf_msg_iter` should simply realize that it needs to close the
877 * stream properly by emitting the necessary stream end message.
878 */
879 enum ctf_msg_iter_status status =
880 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
881
882 if (status == CTF_MSG_ITER_STATUS_ERROR) {
883 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
884 "Error getting the next message from CTF message iterator");
885 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
886 goto end;
887 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
888 BT_COMP_LOGI("Reached the end of the live stream iterator.");
889 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
890 goto end;
891 }
892
893 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef
FD
894
895end:
4164020e 896 return live_status;
4a39caef
FD
897}
898
7cdc2bab
MD
899/*
900 * helper function:
901 * handle_no_data_streams()
902 * retry:
903 * - for each ACTIVE_NO_DATA stream:
904 * - query relayd for stream data, or quiescence info.
905 * - if need metadata, get metadata, goto retry.
906 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
907 * - if quiescent, move to QUIESCENT streams
908 * - if fetched data, move to ACTIVE_DATA streams
909 * (at this point each stream either has data, or is quiescent)
910 *
911 *
912 * iterator_next:
913 * handle_new_streams_and_metadata()
914 * - query relayd for known streams, add them as ACTIVE_NO_DATA
915 * - query relayd for metadata
916 *
917 * call handle_active_no_data_streams()
918 *
919 * handle_quiescent_streams()
920 * - if at least one stream is ACTIVE_DATA:
921 * - peek stream event with lowest timestamp -> next_ts
922 * - for each quiescent stream
923 * - if next_ts >= quiescent end
924 * - set state to ACTIVE_NO_DATA
925 * - else
926 * - for each quiescent stream
927 * - set state to ACTIVE_NO_DATA
928 *
929 * call handle_active_no_data_streams()
930 *
931 * handle_active_data_streams()
932 * - if at least one stream is ACTIVE_DATA:
933 * - get stream event with lowest timestamp from heap
d6e69534 934 * - make that stream event the current message.
7cdc2bab
MD
935 * - move this stream heap position to its next event
936 * - if we need to fetch data from relayd, move
937 * stream to ACTIVE_NO_DATA.
938 * - return OK
939 * - return AGAIN
940 *
941 * end criterion: ctrl-c on client. If relayd exits or the session
942 * closes on the relay daemon side, we keep on waiting for streams.
943 * Eventually handle --end timestamp (also an end criterion).
944 *
945 * When disconnected from relayd: try to re-connect endlessly.
946 */
4164020e
SM
947static enum lttng_live_iterator_status
948lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
949 struct lttng_live_stream_iterator *stream_iter,
950 const bt_message **curr_msg)
7cdc2bab 951{
4164020e
SM
952 bt_logging_level log_level = lttng_live_msg_iter->log_level;
953 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
954 enum lttng_live_iterator_status live_status;
955
956 BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
957 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
958 stream_iter->name->str, stream_iter->viewer_stream_id);
959
960 if (stream_iter->has_stream_hung_up) {
961 /*
962 * The stream has hung up and the stream was properly closed
963 * during the last call to the current function. Return _END
964 * status now so that this stream iterator is removed for the
965 * stream iterator list.
966 */
967 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
968 goto end;
969 }
4a39caef 970
7cdc2bab 971retry:
4164020e
SM
972 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
973
974 /*
975 * Make sure we have the most recent metadata and possibly some new
976 * streams.
977 */
978 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
979 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
980 goto end;
981 }
982
983 live_status =
984 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
985 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
986 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
987 /*
988 * We overwrite `live_status` since `curr_msg` is
989 * likely set to a valid message in this function.
990 */
991 live_status =
992 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
993 }
994 goto end;
995 }
996
997 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
998 stream_iter, curr_msg);
999 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1000 BT_ASSERT(!*curr_msg);
1001 goto end;
1002 }
1003 if (*curr_msg) {
1004 goto end;
1005 }
1006 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
1007 stream_iter, curr_msg);
1008 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1009 BT_ASSERT(!*curr_msg);
1010 }
7cdc2bab
MD
1011
1012end:
4164020e
SM
1013 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
1014 BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
1015 goto retry;
1016 }
14f28187 1017
4164020e
SM
1018 BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, "
1019 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1020 lttng_live_iterator_status_string(live_status), stream_iter->name->str,
1021 stream_iter->viewer_stream_id);
f93afbf9 1022
4164020e 1023 return live_status;
7cdc2bab
MD
1024}
1025
4164020e 1026static bool is_discarded_packet_or_event_message(const bt_message *msg)
8ec4d5ff 1027{
4164020e 1028 const enum bt_message_type msg_type = bt_message_get_type(msg);
8ec4d5ff 1029
4164020e
SM
1030 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
1031 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
8ec4d5ff
JG
1032}
1033
4164020e
SM
1034static enum lttng_live_iterator_status
1035adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
1036 const bt_message *msg_in, bt_message **msg_out,
1037 uint64_t new_begin_ts)
8ec4d5ff 1038{
4164020e
SM
1039 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1040 enum bt_property_availability availability;
1041 const bt_clock_snapshot *clock_snapshot;
1042 uint64_t end_ts;
1043 uint64_t count;
1044
1045 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
1046 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1047
1048 availability = bt_message_discarded_packets_get_count(msg_in, &count);
1049 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1050
1051 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
1052 iter, stream, new_begin_ts, end_ts);
1053 if (!*msg_out) {
1054 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1055 goto end;
1056 }
1057
1058 bt_message_discarded_packets_set_count(*msg_out, count);
8ec4d5ff 1059end:
4164020e 1060 return status;
8ec4d5ff
JG
1061}
1062
4164020e
SM
1063static enum lttng_live_iterator_status
1064adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
1065 const bt_message *msg_in, bt_message **msg_out,
1066 uint64_t new_begin_ts)
8ec4d5ff 1067{
4164020e
SM
1068 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1069 enum bt_property_availability availability;
1070 const bt_clock_snapshot *clock_snapshot;
1071 uint64_t end_ts;
1072 uint64_t count;
1073
1074 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
1075 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1076
1077 availability = bt_message_discarded_events_get_count(msg_in, &count);
1078 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1079
1080 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
1081 iter, stream, new_begin_ts, end_ts);
1082 if (!*msg_out) {
1083 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1084 goto end;
1085 }
1086
1087 bt_message_discarded_events_set_count(*msg_out, count);
8ec4d5ff 1088end:
4164020e 1089 return status;
8ec4d5ff
JG
1090}
1091
4164020e
SM
1092static enum lttng_live_iterator_status
1093handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1094 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1095 const bt_message *late_msg)
285951be 1096{
4164020e
SM
1097 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1098 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1099 const bt_clock_class *clock_class;
1100 const bt_stream_class *stream_class;
1101 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1102 int64_t last_inactivity_ts_ns;
1103 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1104 enum lttng_live_iterator_status adjust_status;
1105 bt_message *adjusted_message;
1106
1107 /*
1108 * The timestamp of the current message is before the last message sent
1109 * by this component. We CANNOT send it as is.
1110 *
1111 * The only expected scenario in which that could happen is the
1112 * following, everything else is a bug in this component, relay deamon,
1113 * or CTF parser.
1114 *
1115 * Expected scenario: The CTF message iterator emitted discarded
1116 * packets and discarded events with synthesized beginning and end
1117 * timestamps from the bounds of the last known packet and the newly
1118 * decoded packet header. The CTF message iterator is not aware of
1119 * stream inactivity beacons. Hence, we have to adjust the beginning
1120 * timestamp of those types of messages if a stream signalled its
1121 * inactivity up until _after_ the last known packet's beginning
1122 * timestamp.
1123 *
1124 * Otherwise, the monotonicity guarantee of message timestamps would
1125 * not be preserved.
1126 *
1127 * In short, the only scenario in which it's okay and fixable to
1128 * received a late message is when:
1129 * 1. the late message is a discarded packets or discarded events
1130 * message,
1131 * 2. this stream produced an inactivity message downstream, and
1132 * 3. the timestamp of the late message is within the inactivity
1133 * timespan we sent downstream through the inactivity message.
1134 */
1135
1136 BT_COMP_LOGD("Handling late message on live stream iterator: "
1137 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1138 stream_iter->name->str, stream_iter->viewer_stream_id);
1139
1140 if (!stream_iter->last_inactivity_ts.is_set) {
1141 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: "
1142 "have a late message when no inactivity message "
1143 "was ever sent for that stream.");
1144 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1145 goto end;
1146 }
1147
1148 if (!is_discarded_packet_or_event_message(late_msg)) {
1149 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1150 "Invalid live stream state: "
1151 "have a late message that is not a packet discarded or "
1152 "event discarded message: late-msg-type=%s",
1153 bt_common_message_type_string(bt_message_get_type(late_msg)));
1154 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1155 goto end;
1156 }
1157
1158 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1159 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1160
1161 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1162 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1163 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1164 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last "
1165 "inactivity message timestamp to nanoseconds");
1166 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1167 goto end;
1168 }
1169
1170 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1171 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1172 "Invalid live stream state: "
1173 "have a late message that is none included in a stream "
1174 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1175 "late-msg-ts-ns=%" PRIu64,
1176 last_inactivity_ts_ns, late_msg_ts_ns);
1177 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1178 goto end;
1179 }
1180
1181 /*
1182 * We now know that it's okay for this message to be late, we can now
1183 * adjust its timestamp to ensure monotonicity.
1184 */
1185 BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1186 "msg-new-ts-ns=%" PRIu64,
1187 bt_common_message_type_string(bt_message_get_type(late_msg)),
1188 stream_iter->last_inactivity_ts.value);
1189 switch (bt_message_get_type(late_msg)) {
1190 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1191 adjust_status = adjust_discarded_events_message(
1192 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1193 stream_iter->last_inactivity_ts.value);
1194 break;
1195 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1196 adjust_status = adjust_discarded_packets_message(
1197 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1198 stream_iter->last_inactivity_ts.value);
1199 break;
1200 default:
1201 bt_common_abort();
1202 }
1203
1204 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1205 stream_iter_status = adjust_status;
1206 goto end;
1207 }
1208
1209 BT_ASSERT_DBG(adjusted_message);
1210 stream_iter->current_msg = adjusted_message;
1211 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1212 bt_message_put_ref(late_msg);
cefd84f8 1213
285951be 1214end:
4164020e 1215 return stream_iter_status;
285951be
FD
1216}
1217
4164020e
SM
1218static enum lttng_live_iterator_status
1219next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1220 struct lttng_live_trace *live_trace,
1221 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1222{
4164020e
SM
1223 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1224 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1225 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1226 enum lttng_live_iterator_status stream_iter_status;
1227 ;
1228 int64_t youngest_candidate_msg_ts = INT64_MAX;
1229 uint64_t stream_iter_idx;
1230
1231 BT_ASSERT_DBG(live_trace);
1232 BT_ASSERT_DBG(live_trace->stream_iterators);
1233
1234 BT_COMP_LOGD("Finding the next stream iterator for trace: "
1235 "trace-id=%" PRIu64,
1236 live_trace->id);
1237 /*
1238 * Update the current message of every stream iterators of this trace.
1239 * The current msg of every stream must have a timestamp equal or
1240 * larger than the last message returned by this iterator. We must
1241 * ensure monotonicity.
1242 */
1243 stream_iter_idx = 0;
1244 while (stream_iter_idx < live_trace->stream_iterators->len) {
1245 bool stream_iter_is_ended = false;
1246 struct lttng_live_stream_iterator *stream_iter =
1247 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1248 stream_iter_idx);
1249
1250 /*
1251 * If there is no current message for this stream, go fetch
1252 * one.
1253 */
1254 while (!stream_iter->current_msg) {
1255 const bt_message *msg = NULL;
1256 int64_t curr_msg_ts_ns = INT64_MAX;
1257
1258 stream_iter_status =
1259 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1260
1261 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1262 stream_iter_is_ended = true;
1263 break;
1264 }
1265
1266 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1267 goto end;
1268 }
1269
1270 BT_ASSERT_DBG(msg);
1271
1272 BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, "
1273 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1274 bt_common_message_type_string(bt_message_get_type(msg)),
1275 stream_iter->name->str, stream_iter->viewer_stream_id);
1276
1277 /*
1278 * Get the timestamp in nanoseconds from origin of this
1279 * messsage.
1280 */
1281 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg,
1282 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1283
1284 /*
1285 * Check if the message of the current live stream
1286 * iterator occurred at the exact same time or after the
1287 * last message returned by this component's message
1288 * iterator. If not, we need to handle it with care.
1289 */
1290 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1291 stream_iter->current_msg = msg;
1292 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1293 } else {
1294 /*
1295 * We received a message from the past. This
1296 * may be fixable but it can also be an error.
1297 */
1298 stream_iter_status =
1299 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1300 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1301 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1302 "Late message could not be handled correctly: "
1303 "lttng-live-msg-iter-addr=%p, "
1304 "stream-name=\"%s\", "
1305 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1306 lttng_live_msg_iter, stream_iter->name->str,
1307 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1308 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1309 goto end;
1310 }
1311 }
1312 }
1313
1314 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1315
1316 if (!stream_iter_is_ended) {
1317 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1318 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1319 /*
1320 * Update the current best candidate message
1321 * for the stream iterator of this live trace
1322 * to be forwarded downstream.
1323 */
1324 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1325 youngest_candidate_stream_iter = stream_iter;
1326 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1327 /*
1328 * Order the messages in an arbitrary but
1329 * deterministic way.
1330 */
1331 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1332 int ret = common_muxing_compare_messages(
1333 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1334 if (ret < 0) {
1335 /*
1336 * The `youngest_candidate_stream_iter->current_msg`
1337 * should go first. Update the next
1338 * iterator and the current timestamp.
1339 */
1340 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1341 youngest_candidate_stream_iter = stream_iter;
1342 } else if (ret == 0) {
1343 /*
1344 * Unable to pick which one should go
1345 * first.
1346 */
1347 BT_COMP_LOGW(
1348 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1349 "stream-iter-addr=%p"
1350 "stream-iter-addr=%p",
1351 stream_iter, youngest_candidate_stream_iter);
1352 }
1353 }
1354
1355 stream_iter_idx++;
1356 } else {
1357 /*
1358 * The live stream iterator has ended. That
1359 * iterator is removed from the array, but
1360 * there is no need to increment
1361 * stream_iter_idx as
1362 * g_ptr_array_remove_index_fast replaces the
1363 * removed element with the array's last
1364 * element.
1365 */
1366 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1367 }
1368 }
1369
1370 if (youngest_candidate_stream_iter) {
1371 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1372 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1373 } else {
1374 /*
1375 * The only case where we don't have a candidate for this trace
1376 * is if we reached the end of all the iterators.
1377 */
1378 BT_ASSERT(live_trace->stream_iterators->len == 0);
1379 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1380 }
14f28187
FD
1381
1382end:
4164020e 1383 return stream_iter_status;
14f28187
FD
1384}
1385
4164020e
SM
1386static enum lttng_live_iterator_status
1387next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1388 struct lttng_live_session *session,
1389 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1390{
4164020e
SM
1391 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1392 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1393 enum lttng_live_iterator_status stream_iter_status;
1394 uint64_t trace_idx = 0;
1395 int64_t youngest_candidate_msg_ts = INT64_MAX;
1396 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1397
1398 BT_COMP_LOGD("Finding the next stream iterator for session: "
1399 "session-id=%" PRIu64,
1400 session->id);
1401 /*
1402 * Make sure we are attached to the session and look for new streams
1403 * and metadata.
1404 */
1405 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1406 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1407 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1408 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1409 goto end;
1410 }
1411
1412 BT_ASSERT_DBG(session->traces);
1413
1414 while (trace_idx < session->traces->len) {
1415 bool trace_is_ended = false;
1416 struct lttng_live_stream_iterator *stream_iter;
1417 struct lttng_live_trace *trace =
1418 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1419
1420 stream_iter_status =
1421 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1422 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1423 /*
1424 * All the live stream iterators for this trace are
1425 * ENDed. Remove the trace from this session.
1426 */
1427 trace_is_ended = true;
1428 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1429 goto end;
1430 }
1431
1432 if (!trace_is_ended) {
1433 BT_ASSERT_DBG(stream_iter);
1434
1435 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1436 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1437 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1438 youngest_candidate_stream_iter = stream_iter;
1439 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1440 /*
1441 * Order the messages in an arbitrary but
1442 * deterministic way.
1443 */
1444 int ret = common_muxing_compare_messages(
1445 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1446 if (ret < 0) {
1447 /*
1448 * The `youngest_candidate_stream_iter->current_msg`
1449 * should go first. Update the next iterator
1450 * and the current timestamp.
1451 */
1452 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1453 youngest_candidate_stream_iter = stream_iter;
1454 } else if (ret == 0) {
1455 /* Unable to pick which one should go first. */
1456 BT_COMP_LOGW(
1457 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1458 "stream-iter-addr=%p"
1459 "stream-iter-addr=%p",
1460 stream_iter, youngest_candidate_stream_iter);
1461 }
1462 }
1463 trace_idx++;
1464 } else {
1465 /*
1466 * trace_idx is not incremented since
1467 * g_ptr_array_remove_index_fast replaces the
1468 * element at trace_idx with the array's last element.
1469 */
1470 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1471 }
1472 }
1473 if (youngest_candidate_stream_iter) {
1474 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1475 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1476 } else {
1477 /*
1478 * The only cases where we don't have a candidate for this
1479 * trace is:
1480 * 1. if we reached the end of all the iterators of all the
1481 * traces of this session,
1482 * 2. if we never had live stream iterator in the first place.
1483 *
1484 * In either cases, we return END.
1485 */
1486 BT_ASSERT(session->traces->len == 0);
1487 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1488 }
7cdc2bab 1489end:
4164020e 1490 return stream_iter_status;
14f28187
FD
1491}
1492
4164020e 1493static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1494{
4164020e 1495 uint64_t i;
14f28187 1496
4164020e
SM
1497 for (i = 0; i < count; i++) {
1498 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1499 }
7cdc2bab
MD
1500}
1501
d3eb6e8f 1502BT_HIDDEN
4164020e
SM
1503bt_message_iterator_class_next_method_status
1504lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1505 uint64_t capacity, uint64_t *count)
d3eb6e8f 1506{
4164020e
SM
1507 bt_message_iterator_class_next_method_status status;
1508 enum lttng_live_viewer_status viewer_status;
1509 struct lttng_live_msg_iter *lttng_live_msg_iter =
1510 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1511 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1512 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1513 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1514 enum lttng_live_iterator_status stream_iter_status;
1515 uint64_t session_idx;
1516
1517 *count = 0;
1518
1519 BT_ASSERT_DBG(lttng_live_msg_iter);
1520
1521 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1522 /*
1523 * The iterator was interrupted in a previous call to the
1524 * `_next()` method. We currently do not support generating
1525 * messages after such event. The babeltrace2 CLI should never
1526 * be running the graph after being interrupted. So this check
1527 * is to prevent other graph users from using this live
1528 * iterator in an messed up internal state.
1529 */
1530 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1531 BT_COMP_LOGE_APPEND_CAUSE(
1532 self_comp,
1533 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1534 goto end;
1535 }
1536
1537 /*
1538 * Clear all the invalid message reference that might be left over in
1539 * the output array.
1540 */
1541 memset(msgs, 0, capacity * sizeof(*msgs));
1542
1543 /*
1544 * If no session are exposed on the relay found at the url provided by
1545 * the user, session count will be 0. In this case, we return status
1546 * end to return gracefully.
1547 */
1548 if (lttng_live_msg_iter->sessions->len == 0) {
1549 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1550 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1551 goto end;
1552 } else {
1553 /*
1554 * The are no more active session for this session
1555 * name. Retry to create a viewer session for the
1556 * requested session name.
1557 */
1558 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1559 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1560 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1561 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1562 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1563 "Error creating LTTng live viewer session");
1564 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1565 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1566 } else {
1567 bt_common_abort();
1568 }
1569 goto end;
1570 }
1571 }
1572 }
1573
1574 if (lttng_live_msg_iter->active_stream_iter == 0) {
1575 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1576 }
1577
1578 /*
1579 * Here the muxing of message is done.
1580 *
1581 * We need to iterate over all the streams of all the traces of all the
1582 * viewer sessions in order to get the message with the smallest
1583 * timestamp. In this case, a session is a viewer session and there is
1584 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1585 * kernel). Each viewer session can have multiple traces, for example,
1586 * 64bit UST viewer sessions could have multiple per-pid traces.
1587 *
1588 * We iterate over the streams of each traces to update and see what is
1589 * their next message's timestamp. From those timestamps, we select the
1590 * message with the smallest timestamp as the best candidate message
1591 * for that trace and do the same thing across all the sessions.
1592 *
1593 * We then compare the timestamp of best candidate message of all the
1594 * sessions to pick the message with the smallest timestamp and we
1595 * return it.
1596 */
1597 while (*count < capacity) {
1598 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1599 *candidate_stream_iter = NULL;
1600 int64_t youngest_msg_ts_ns = INT64_MAX;
1601
1602 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1603 session_idx = 0;
1604 while (session_idx < lttng_live_msg_iter->sessions->len) {
1605 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1606 lttng_live_msg_iter->sessions, session_idx);
1607
1608 /* Find the best candidate message to send downstream. */
1609 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1610 &candidate_stream_iter);
1611
1612 /* If we receive an END status, it means that either:
1613 * - Those traces never had active streams (UST with no
1614 * data produced yet),
1615 * - All live stream iterators have ENDed.*/
1616 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1617 if (session->closed && session->traces->len == 0) {
1618 /*
1619 * Remove the session from the list.
1620 * session_idx is not modified since
1621 * g_ptr_array_remove_index_fast
1622 * replaces the the removed element with
1623 * the array's last element.
1624 */
1625 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1626 } else {
1627 session_idx++;
1628 }
1629 continue;
1630 }
1631
1632 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1633 goto return_status;
1634 }
1635
1636 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1637 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1638 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1639 youngest_stream_iter = candidate_stream_iter;
1640 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1641 /*
1642 * The currently selected message to be sent
1643 * downstream next has the exact same timestamp
1644 * that of the current candidate message. We
1645 * must break the tie in a predictable manner.
1646 */
1647 BT_COMP_LOGD_STR(
1648 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1649 /*
1650 * Order the messages in an arbitrary but
1651 * deterministic way.
1652 */
1653 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1654 youngest_stream_iter->current_msg);
1655 if (ret < 0) {
1656 /*
1657 * The `candidate_stream_iter->current_msg`
1658 * should go first. Update the next
1659 * iterator and the current timestamp.
1660 */
1661 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1662 youngest_stream_iter = candidate_stream_iter;
1663 } else if (ret == 0) {
1664 /* Unable to pick which one should go first. */
1665 BT_COMP_LOGW(
1666 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1667 "next-stream-iter-addr=%p"
1668 "candidate-stream-iter-addr=%p",
1669 youngest_stream_iter, candidate_stream_iter);
1670 }
1671 }
1672
1673 session_idx++;
1674 }
1675
1676 if (!youngest_stream_iter) {
1677 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1678 goto return_status;
1679 }
1680
1681 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1682 /* Ensure monotonicity. */
1683 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1684 youngest_stream_iter->current_msg_ts_ns);
1685
1686 /*
1687 * Insert the next message to the message batch. This will set
1688 * stream iterator current messsage to NULL so that next time
1689 * we fetch the next message of that stream iterator
1690 */
1691 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1692 (*count)++;
1693
1694 /* Update the last timestamp in nanoseconds sent downstream. */
1695 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1696 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1697
1698 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1699 }
f79c2d7a
FD
1700
1701return_status:
4164020e
SM
1702 switch (stream_iter_status) {
1703 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1704 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1705 /*
1706 * If we gathered messages, return _OK even if the graph was
1707 * interrupted. This allows for the components downstream to at
1708 * least get the thoses messages. If the graph was indeed
1709 * interrupted there should not be another _next() call as the
1710 * application will tear down the graph. This component class
1711 * doesn't support restarting after an interruption.
1712 */
1713 if (*count > 0) {
1714 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1715 } else {
1716 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1717 }
1718 break;
1719 case LTTNG_LIVE_ITERATOR_STATUS_END:
1720 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1721 break;
1722 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1723 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1724 "Memory error preparing the next batch of messages: "
1725 "live-iter-status=%s",
1726 lttng_live_iterator_status_string(stream_iter_status));
1727 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1728 break;
1729 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1730 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1731 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1732 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1733 "Error preparing the next batch of messages: "
1734 "live-iter-status=%s",
1735 lttng_live_iterator_status_string(stream_iter_status));
1736
1737 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1738 /* Put all existing messages on error. */
1739 put_messages(msgs, *count);
1740 break;
1741 default:
1742 bt_common_abort();
1743 }
14f28187 1744
f79c2d7a 1745end:
4164020e 1746 return status;
7cdc2bab 1747}
41a2b7ae 1748
4164020e
SM
1749static struct lttng_live_msg_iter *
1750lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1751 bt_self_message_iterator *self_msg_it)
4f74db52 1752{
4164020e
SM
1753 bt_self_component *self_comp = lttng_live_comp->self_comp;
1754 bt_logging_level log_level = lttng_live_comp->log_level;
4f74db52 1755
4164020e
SM
1756 struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1757 if (!lttng_live_msg_iter) {
1758 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
1759 goto end;
1760 }
1761
1762 lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
1763 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1764 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1765 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1766
1767 lttng_live_msg_iter->active_stream_iter = 0;
1768 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1769 lttng_live_msg_iter->was_interrupted = false;
4f74db52 1770
4164020e
SM
1771 lttng_live_msg_iter->sessions =
1772 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1773 BT_ASSERT(lttng_live_msg_iter->sessions);
1774
1775end:
1776 return lttng_live_msg_iter;
4f74db52
FD
1777}
1778
7cdc2bab 1779BT_HIDDEN
4164020e
SM
1780bt_message_iterator_class_initialize_method_status
1781lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1782 bt_self_message_iterator_configuration *config,
1783 bt_self_component_port_output *self_port)
7cdc2bab 1784{
4164020e
SM
1785 bt_message_iterator_class_initialize_method_status status;
1786 struct lttng_live_component *lttng_live;
1787 struct lttng_live_msg_iter *lttng_live_msg_iter;
1788 enum lttng_live_viewer_status viewer_status;
1789 bt_logging_level log_level;
1790 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1791
1792 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1793 log_level = lttng_live->log_level;
1794 self_comp = lttng_live->self_comp;
1795
1796 /* There can be only one downstream iterator at the same time. */
1797 BT_ASSERT(!lttng_live->has_msg_iter);
1798 lttng_live->has_msg_iter = true;
1799
1800 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1801 if (!lttng_live_msg_iter) {
1802 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1803 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter");
1804 goto error;
1805 }
1806
1807 viewer_status = live_viewer_connection_create(
1808 self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter,
1809 &lttng_live_msg_iter->viewer_connection);
1810 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1811 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1812 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection");
1813 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1814 /*
1815 * Interruption in the _iter_init() method is not
1816 * supported. Return an error.
1817 */
1818 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection");
1819 }
1820 goto error;
1821 }
1822
1823 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1824 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1825 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1826 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session");
1827 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1828 /*
1829 * Interruption in the _iter_init() method is not
1830 * supported. Return an error.
1831 */
1832 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session");
1833 }
1834 goto error;
1835 }
1836
1837 if (lttng_live_msg_iter->sessions->len == 0) {
1838 switch (lttng_live->params.sess_not_found_act) {
1839 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1840 BT_COMP_LOGI(
1841 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1842 "%s=\"%s\" component parameter: url=\"%s\"",
1843 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1844 lttng_live->params.url->str);
1845 break;
1846 case SESSION_NOT_FOUND_ACTION_FAIL:
1847 BT_COMP_LOGE_APPEND_CAUSE(
1848 self_comp,
1849 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1850 "component parameter: url =\"%s\"",
1851 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1852 lttng_live->params.url->str);
1853 goto error;
1854 case SESSION_NOT_FOUND_ACTION_END:
1855 BT_COMP_LOGI(
1856 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1857 "call because of %s=\"%s\" component parameter: "
1858 "url=\"%s\"",
1859 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1860 lttng_live->params.url->str);
1861 break;
1862 default:
1863 bt_common_abort();
1864 }
1865 }
1866
1867 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1868 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1869 goto end;
f79c2d7a 1870
14f28187 1871error:
4164020e
SM
1872 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1873 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1874end:
4164020e 1875 return status;
7cdc2bab
MD
1876}
1877
80aff5ef 1878static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1879 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1880 bt_param_validation_value_descr::makeString()},
4164020e
SM
1881 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1882
1883static bt_component_class_query_method_status
1884lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1885 bt_self_component_class *self_comp_class, bt_logging_level log_level)
7cdc2bab 1886{
4164020e
SM
1887 bt_component_class_query_method_status status;
1888 const bt_value *url_value = NULL;
1889 const char *url;
1890 struct live_viewer_connection *viewer_connection = NULL;
1891 enum lttng_live_viewer_status viewer_status;
1892 enum bt_param_validation_status validation_status;
1893 gchar *validate_error = NULL;
1894
1895 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1896 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1897 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1898 goto error;
1899 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1900 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1901 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
1902 goto error;
1903 }
1904
1905 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1906 url = bt_value_string_get(url_value);
1907
1908 viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL,
1909 &viewer_connection);
1910 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1911 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1912 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection");
1913 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1914 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1915 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1916 } else {
1917 bt_common_abort();
1918 }
1919 goto error;
1920 }
1921
1922 status = live_viewer_connection_list_sessions(viewer_connection, result);
1923 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1924 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions");
1925 goto error;
1926 }
1927
1928 goto end;
c7eee084 1929
7cdc2bab 1930error:
4164020e 1931 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1932
4164020e
SM
1933 if (status >= 0) {
1934 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1935 }
c7eee084 1936
7cdc2bab 1937end:
4164020e
SM
1938 if (viewer_connection) {
1939 live_viewer_connection_destroy(viewer_connection);
1940 }
80aff5ef 1941
4164020e 1942 g_free(validate_error);
80aff5ef 1943
4164020e 1944 return status;
7cdc2bab
MD
1945}
1946
4164020e
SM
1947static bt_component_class_query_method_status
1948lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1949 bt_self_component_class *self_comp_class, bt_logging_level log_level)
312df793 1950{
4164020e
SM
1951 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1952 const bt_value *input_type_value;
1953 const bt_value *input_value;
1954 double weight = 0;
1955 struct bt_common_lttng_live_url_parts parts = {0};
1956
1957 /* Used by the logging macros */
1958 __attribute__((unused)) bt_self_component *self_comp = NULL;
1959
1960 *result = NULL;
1961 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1962 if (!input_type_value) {
1963 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter.");
1964 goto error;
1965 }
1966
1967 if (!bt_value_is_string(input_type_value)) {
1968 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value.");
1969 goto error;
1970 }
1971
1972 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1973 /* We don't handle file system paths */
1974 goto create_result;
1975 }
1976
1977 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1978 if (!input_value) {
1979 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter.");
1980 goto error;
1981 }
1982
1983 if (!bt_value_is_string(input_value)) {
1984 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1985 "`input` parameter is not a string value.");
1986 goto error;
1987 }
1988
1989 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1990 if (parts.session_name) {
1991 /*
1992 * Looks pretty much like an LTTng live URL: we got the
1993 * session name part, which forms a complete URL.
1994 */
1995 weight = .75;
1996 }
312df793
PP
1997
1998create_result:
4164020e
SM
1999 *result = bt_value_real_create_init(weight);
2000 if (!*result) {
2001 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
2002 goto error;
2003 }
312df793 2004
4164020e 2005 goto end;
312df793
PP
2006
2007error:
4164020e
SM
2008 if (status >= 0) {
2009 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
2010 }
312df793 2011
4164020e 2012 BT_ASSERT(!*result);
312df793
PP
2013
2014end:
4164020e
SM
2015 bt_common_destroy_lttng_live_url_parts(&parts);
2016 return status;
312df793
PP
2017}
2018
7cdc2bab 2019BT_HIDDEN
4164020e
SM
2020bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
2021 bt_private_query_executor *priv_query_exec,
2022 const char *object, const bt_value *params,
2023 __attribute__((unused)) void *method_data,
2024 const bt_value **result)
7cdc2bab 2025{
4164020e
SM
2026 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2027 bt_self_component *self_comp = NULL;
2028 bt_self_component_class *self_comp_class =
2029 bt_self_component_class_source_as_self_component_class(comp_class);
2030 bt_logging_level log_level = bt_query_executor_get_logging_level(
2031 bt_private_query_executor_as_query_executor_const(priv_query_exec));
2032
2033 if (strcmp(object, "sessions") == 0) {
2034 status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level);
2035 } else if (strcmp(object, "babeltrace.support-info") == 0) {
2036 status = lttng_live_query_support_info(params, result, self_comp_class, log_level);
2037 } else {
2038 BT_COMP_LOGI("Unknown query object `%s`", object);
2039 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
2040 goto end;
2041 }
14f28187
FD
2042
2043end:
4164020e 2044 return status;
7cdc2bab
MD
2045}
2046
4164020e 2047static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
7cdc2bab 2048{
4164020e
SM
2049 if (!lttng_live) {
2050 return;
2051 }
2052 if (lttng_live->params.url) {
2053 g_string_free(lttng_live->params.url, TRUE);
2054 }
2055 g_free(lttng_live);
7cdc2bab
MD
2056}
2057
2058BT_HIDDEN
14f28187 2059void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 2060{
4164020e
SM
2061 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
2062 bt_self_component_source_as_self_component(component));
2063
2064 if (!data) {
2065 return;
2066 }
2067 lttng_live_component_destroy_data(data);
7cdc2bab
MD
2068}
2069
4164020e
SM
2070static enum session_not_found_action
2071parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 2072{
4164020e
SM
2073 enum session_not_found_action action;
2074 const char *no_session_act_str = bt_value_string_get(no_session_param);
2075
2076 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2077 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2078 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2079 action = SESSION_NOT_FOUND_ACTION_FAIL;
2080 } else {
2081 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2082 action = SESSION_NOT_FOUND_ACTION_END;
2083 }
2084
2085 return action;
14f28187
FD
2086}
2087
88730e42
SM
2088static bt_param_validation_value_descr inputs_elem_descr =
2089 bt_param_validation_value_descr::makeString();
80aff5ef
SM
2090
2091static const char *sess_not_found_action_choices[] = {
4164020e
SM
2092 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2093 SESS_NOT_FOUND_ACTION_FAIL_STR,
2094 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
2095};
2096
2097static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
2098 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2099 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2100 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2101 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
2102 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2103
2104static bt_component_class_initialize_method_status
2105lttng_live_component_create(const bt_value *params, bt_logging_level log_level,
2106 bt_self_component *self_comp, struct lttng_live_component **component)
7cdc2bab 2107{
4164020e
SM
2108 struct lttng_live_component *lttng_live = NULL;
2109 const bt_value *inputs_value;
2110 const bt_value *url_value;
2111 const bt_value *value;
2112 const char *url;
2113 enum bt_param_validation_status validation_status;
2114 gchar *validation_error = NULL;
2115 bt_component_class_initialize_method_status status;
2116
2117 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2118 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2119 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2120 goto error;
2121 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2122 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
2123 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2124 goto error;
2125 }
2126
2127 lttng_live = g_new0(struct lttng_live_component, 1);
2128 if (!lttng_live) {
2129 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2130 goto end;
2131 }
2132 lttng_live->log_level = log_level;
2133 lttng_live->self_comp = self_comp;
2134 lttng_live->max_query_size = MAX_QUERY_SIZE;
2135 lttng_live->has_msg_iter = false;
2136
2137 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2138 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2139 url = bt_value_string_get(url_value);
2140
2141 lttng_live->params.url = g_string_new(url);
2142 if (!lttng_live->params.url) {
2143 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2144 goto error;
2145 }
2146
2147 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2148 if (value) {
2149 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2150 } else {
2151 BT_COMP_LOGI("Optional `%s` parameter is missing: "
2152 "defaulting to `%s`.",
2153 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2154 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2155 }
2156
2157 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2158 goto end;
7cdc2bab
MD
2159
2160error:
4164020e
SM
2161 lttng_live_component_destroy_data(lttng_live);
2162 lttng_live = NULL;
7cdc2bab 2163end:
4164020e 2164 g_free(validation_error);
80aff5ef 2165
4164020e
SM
2166 *component = lttng_live;
2167 return status;
d3e4dcd8
PP
2168}
2169
f3bc2010 2170BT_HIDDEN
4164020e
SM
2171bt_component_class_initialize_method_status
2172lttng_live_component_init(bt_self_component_source *self_comp_src,
2173 bt_self_component_source_configuration *config, const bt_value *params,
2174 __attribute__((unused)) void *init_method_data)
f3bc2010 2175{
4164020e
SM
2176 struct lttng_live_component *lttng_live;
2177 bt_component_class_initialize_method_status ret;
2178 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2179 bt_logging_level log_level =
2180 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2181 bt_self_component_add_port_status add_port_status;
2182
2183 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
2184 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2185 goto error;
2186 }
2187
2188 add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2189 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2190 ret = (bt_component_class_initialize_method_status) add_port_status;
2191 goto end;
2192 }
2193
2194 bt_self_component_set_data(self_comp, lttng_live);
2195 goto end;
7cdc2bab 2196
19bdff20 2197error:
4164020e
SM
2198 lttng_live_component_destroy_data(lttng_live);
2199 lttng_live = NULL;
7cdc2bab 2200end:
4164020e 2201 return ret;
d85ef162 2202}
This page took 0.191264 seconds and 4 git commands to generate.