Sort includes in C++ files
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
c802cacb 11#include <glib.h>
14f28187 12#include <inttypes.h>
c4f23e30 13#include <stdbool.h>
14f28187
FD
14#include <unistd.h>
15
c802cacb
SM
16#include <babeltrace2/babeltrace.h>
17
18#define BT_COMP_LOG_SELF_COMP self_comp
19#define BT_LOG_OUTPUT_LEVEL log_level
20#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
21#include "logging/comp-logging.h"
3c22a242 22
578e048b 23#include "common/assert.h"
578e048b 24#include "compat/compiler.h"
f3bc2010 25
376fc2bd 26#include "plugins/common/muxing/muxing.h"
80aff5ef 27#include "plugins/common/param-validation/param-validation.h"
376fc2bd 28
087cd0f5 29#include "data-stream.hpp"
087cd0f5 30#include "lttng-live.hpp"
c802cacb 31#include "metadata.hpp"
7cdc2bab 32
4164020e
SM
33#define MAX_QUERY_SIZE (256 * 1024)
34#define URL_PARAM "url"
35#define INPUTS_PARAM "inputs"
36#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
37#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
38#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
39#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 40
4164020e 41#define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__)
7cdc2bab 42
4164020e 43static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
14f28187 44{
4164020e
SM
45 switch (status) {
46 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
47 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
48 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
49 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
50 case LTTNG_LIVE_ITERATOR_STATUS_END:
51 return "LTTNG_LIVE_ITERATOR_STATUS_END";
52 case LTTNG_LIVE_ITERATOR_STATUS_OK:
53 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
54 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
55 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
56 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
57 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
58 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
59 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
60 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
61 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
62 default:
63 bt_common_abort();
64 }
14f28187
FD
65}
66
4164020e 67static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
7cdc2bab 68{
4164020e
SM
69 switch (state) {
70 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
71 return "ACTIVE_NO_DATA";
72 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
73 return "QUIESCENT_NO_DATA";
74 case LTTNG_LIVE_STREAM_QUIESCENT:
75 return "QUIESCENT";
76 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
77 return "ACTIVE_DATA";
78 case LTTNG_LIVE_STREAM_EOF:
79 return "EOF";
80 default:
81 return "ERROR";
82 }
7cdc2bab 83}
7cdc2bab 84
34533ae0 85void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 86 enum lttng_live_stream_state new_state)
34533ae0 87{
4164020e
SM
88 bt_self_component *self_comp = stream_iter->self_comp;
89 bt_logging_level log_level = stream_iter->log_level;
34533ae0 90
4164020e
SM
91 BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
92 ", old-state=%s, new-state=%s",
93 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
94 lttng_live_stream_state_string(new_state));
34533ae0 95
4164020e 96 stream_iter->state = new_state;
34533ae0
FD
97}
98
4164020e
SM
99#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
100 do { \
101 BT_COMP_LOGD("Live stream iterator state=%s, " \
102 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
103 "curr-inact-ts=%" PRId64, \
104 lttng_live_stream_state_string(live_stream_iter->state), \
105 live_stream_iter->last_inactivity_ts.is_set, \
106 live_stream_iter->last_inactivity_ts.value, \
107 live_stream_iter->current_inactivity_ts); \
108 } while (0);
6f79a7cf 109
9b4f9b42 110bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 111{
4164020e 112 bool ret;
6f79a7cf 113
4164020e
SM
114 if (!msg_iter) {
115 ret = false;
116 goto end;
117 }
7cdc2bab 118
4164020e 119 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 120
14f28187 121end:
4164020e 122 return ret;
7cdc2bab
MD
123}
124
4164020e
SM
125static struct lttng_live_trace *
126lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 127{
4164020e
SM
128 uint64_t trace_idx;
129 struct lttng_live_trace *ret_trace = NULL;
130
131 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
132 struct lttng_live_trace *trace =
133 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
134 if (trace->id == trace_id) {
135 ret_trace = trace;
136 goto end;
137 }
138 }
14f28187
FD
139
140end:
4164020e 141 return ret_trace;
d3eb6e8f
PP
142}
143
4164020e 144static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 145{
4164020e
SM
146 bt_logging_level log_level = trace->log_level;
147 bt_self_component *self_comp = trace->self_comp;
c01594de 148
4164020e 149 BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id);
7cdc2bab 150
4164020e
SM
151 BT_ASSERT(trace->stream_iterators);
152 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 153
4164020e
SM
154 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
155 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 156
4164020e
SM
157 lttng_live_metadata_fini(trace);
158 g_free(trace);
7cdc2bab
MD
159}
160
4164020e
SM
161static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
162 uint64_t trace_id)
7cdc2bab 163{
4164020e
SM
164 struct lttng_live_trace *trace = NULL;
165 bt_logging_level log_level = session->log_level;
166 bt_self_component *self_comp = session->self_comp;
167
168 BT_COMP_LOGD("Creating live trace: "
169 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
170 session->id, trace_id);
171 trace = g_new0(struct lttng_live_trace, 1);
172 if (!trace) {
173 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
174 goto error;
175 }
176 trace->log_level = session->log_level;
177 trace->self_comp = session->self_comp;
178 trace->session = session;
179 trace->id = trace_id;
180 trace->trace_class = NULL;
181 trace->trace = NULL;
182 trace->stream_iterators =
183 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
184 BT_ASSERT(trace->stream_iterators);
185 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
186 g_ptr_array_add(session->traces, trace);
187
188 goto end;
7cdc2bab 189error:
4164020e
SM
190 g_free(trace);
191 trace = NULL;
7cdc2bab 192end:
4164020e 193 return trace;
7cdc2bab
MD
194}
195
4164020e
SM
196struct lttng_live_trace *
197lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
198 uint64_t trace_id)
7cdc2bab 199{
4164020e 200 struct lttng_live_trace *trace;
7cdc2bab 201
4164020e
SM
202 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
203 if (trace) {
204 goto end;
205 }
7cdc2bab 206
4164020e
SM
207 /* The session is the owner of the newly created trace. */
208 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 209
14f28187 210end:
4164020e 211 return trace;
7cdc2bab
MD
212}
213
4164020e
SM
214int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
215 const char *hostname, const char *session_name)
7cdc2bab 216{
4164020e
SM
217 int ret = 0;
218 struct lttng_live_session *session;
219 bt_logging_level log_level = lttng_live_msg_iter->log_level;
220 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
221
222 BT_COMP_LOGD("Adding live session: "
223 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
224 session_id, hostname, session_name);
225
226 session = g_new0(struct lttng_live_session, 1);
227 if (!session) {
228 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
229 goto error;
230 }
231
232 session->log_level = lttng_live_msg_iter->log_level;
233 session->self_comp = lttng_live_msg_iter->self_comp;
234 session->id = session_id;
235 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
236 BT_ASSERT(session->traces);
237 session->lttng_live_msg_iter = lttng_live_msg_iter;
238 session->new_streams_needed = true;
239 session->hostname = g_string_new(hostname);
240 BT_ASSERT(session->hostname);
241
242 session->session_name = g_string_new(session_name);
243 BT_ASSERT(session->session_name);
244
245 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
246 goto end;
7cdc2bab 247error:
4164020e
SM
248 g_free(session);
249 ret = -1;
7cdc2bab 250end:
4164020e 251 return ret;
7cdc2bab
MD
252}
253
4164020e 254static void lttng_live_destroy_session(struct lttng_live_session *session)
7cdc2bab 255{
4164020e
SM
256 bt_logging_level log_level;
257 bt_self_component *self_comp;
258
259 if (!session) {
260 goto end;
261 }
262
263 log_level = session->log_level;
264 self_comp = session->self_comp;
265 BT_COMP_LOGD("Destroying live session: "
266 "session-id=%" PRIu64 ", session-name=\"%s\"",
267 session->id, session->session_name->str);
268 if (session->id != -1ULL) {
269 if (lttng_live_session_detach(session)) {
270 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
271 /* Old relayd cannot detach sessions. */
272 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id);
273 }
274 }
275 session->id = -1ULL;
276 }
277
278 if (session->traces) {
279 g_ptr_array_free(session->traces, TRUE);
280 }
281
282 if (session->hostname) {
283 g_string_free(session->hostname, TRUE);
284 }
285 if (session->session_name) {
286 g_string_free(session->session_name, TRUE);
287 }
288 g_free(session);
14f28187
FD
289
290end:
4164020e 291 return;
7cdc2bab
MD
292}
293
4164020e 294static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 295{
4164020e
SM
296 if (!lttng_live_msg_iter) {
297 goto end;
298 }
7cdc2bab 299
4164020e
SM
300 if (lttng_live_msg_iter->sessions) {
301 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
302 }
14f28187 303
4164020e
SM
304 if (lttng_live_msg_iter->viewer_connection) {
305 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
306 }
307 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
308 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 309
4164020e
SM
310 /* All stream iterators must be destroyed at this point. */
311 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
312 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 313
4164020e 314 g_free(lttng_live_msg_iter);
14f28187
FD
315
316end:
4164020e 317 return;
14f28187
FD
318}
319
14f28187
FD
320void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
321{
4164020e 322 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 323
4164020e 324 BT_ASSERT(self_msg_iter);
14f28187 325
4164020e
SM
326 lttng_live_msg_iter =
327 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
328 BT_ASSERT(lttng_live_msg_iter);
329 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
330}
331
4164020e
SM
332static enum lttng_live_iterator_status
333lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 334{
4164020e
SM
335 bt_logging_level log_level = lttng_live_stream->log_level;
336 bt_self_component *self_comp = lttng_live_stream->self_comp;
337
338 switch (lttng_live_stream->state) {
339 case LTTNG_LIVE_STREAM_QUIESCENT:
340 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
341 break;
342 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
343 /* Invalid state. */
344 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
345 bt_common_abort();
346 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
347 /* Invalid state. */
348 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
349 bt_common_abort();
350 case LTTNG_LIVE_STREAM_EOF:
351 break;
352 }
353 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
354}
355
356/*
f93afbf9
FD
357 * For active no data stream, fetch next index. As a result of that it can
358 * become either:
359 * - quiescent: won't have events for a bit,
360 * - have data: need to get that data and produce the event,
361 * - have no data on this stream at this point: need to retry (AGAIN) or return
362 * EOF.
7cdc2bab 363 */
4164020e
SM
364static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
365 struct lttng_live_msg_iter *lttng_live_msg_iter,
366 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 367{
4164020e
SM
368 bt_logging_level log_level = lttng_live_msg_iter->log_level;
369 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
370 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
371 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
372 struct packet_index index;
373
374 if (lttng_live_stream->trace->metadata_stream_state ==
375 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
376 BT_COMP_LOGD(
377 "Need to get an update for the metadata stream before proceeding further with this stream: "
378 "stream-name=\"%s\"",
379 lttng_live_stream->name->str);
380 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
381 goto end;
382 }
383
384 if (lttng_live_stream->trace->session->new_streams_needed) {
385 BT_COMP_LOGD(
386 "Need to get an update of all streams before proceeding further with this stream: "
387 "stream-name=\"%s\"",
388 lttng_live_stream->name->str);
389 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
390 goto end;
391 }
392
393 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
394 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
395 goto end;
396 }
397 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
398 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
399 goto end;
400 }
401
402 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
403
404 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
405 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
406 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
407
408 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
409 /*
5cb27175
JG
410 * Because the stream is in the QUIESCENT_NO_DATA
411 * state, we can assert that the last_inactivity_ts was
412 * set and can be safely used in the `if` above.
413 */
4164020e
SM
414 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
415
416 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
417 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
418 } else {
419 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
420 }
421 goto end;
422 }
423
424 lttng_live_stream->base_offset = index.offset;
425 lttng_live_stream->offset = index.offset;
426 lttng_live_stream->len = index.packet_size / CHAR_BIT;
427
428 BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
429 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
430 ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
431 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
432 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
f93afbf9 433
7cdc2bab 434end:
4164020e
SM
435 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
436 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
437 }
438 return ret;
7cdc2bab
MD
439}
440
441/*
14f28187
FD
442 * Creation of the message requires the ctf trace class to be created
443 * beforehand, but the live protocol gives us all streams (including metadata)
444 * at once. So we split it in three steps: getting streams, getting metadata
445 * (which creates the ctf trace class), and then creating the per-stream
446 * messages.
7cdc2bab 447 */
4164020e
SM
448static enum lttng_live_iterator_status
449lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
450 struct lttng_live_session *session)
7cdc2bab 451{
4164020e
SM
452 bt_logging_level log_level = lttng_live_msg_iter->log_level;
453 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
454 enum lttng_live_iterator_status status;
455 uint64_t trace_idx;
456
457 if (!session->attached) {
458 BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id);
459 enum lttng_live_viewer_status attach_status =
460 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
461 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
462 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
463 /*
464 * Clear any causes appended in
465 * `lttng_live_attach_session()` as we want to
466 * return gracefully since the graph was
467 * cancelled.
468 */
469 bt_current_thread_clear_error();
470 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
471 } else {
472 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
473 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session");
474 }
475 goto end;
476 }
477 }
478
a4f118a3 479 BT_COMP_LOGD("Updating all data streams: "
4164020e
SM
480 "session-id=%" PRIu64 ", session-name=\"%s\"",
481 session->id, session->session_name->str);
482
483 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
484 switch (status) {
485 case LTTNG_LIVE_ITERATOR_STATUS_OK:
486 break;
487 case LTTNG_LIVE_ITERATOR_STATUS_END:
488 /*
489 * We received a `_END` from the `_get_new_streams()` function,
490 * which means no more data will ever be received from the data
491 * streams of this session. But it's possible that the metadata
492 * is incomplete.
493 * The live protocol guarantees that we receive all the
494 * metadata needed before we receive data streams needing it.
495 * But it's possible to receive metadata NOT needed by
496 * data streams after the session was closed. For example, this
497 * could happen if a new event is registered and the session is
498 * stopped before any tracepoint for that event is actually
499 * fired.
500 */
501 BT_COMP_LOGD(
502 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
503 "session-id=%" PRIu64 ", session-name=\"%s\"",
504 session->id, session->session_name->str);
505 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
506 break;
507 default:
4164020e
SM
508 goto end;
509 }
510
a4f118a3
FD
511 BT_COMP_LOGD("Updating metadata stream for session: "
512 "session-id=%" PRIu64 ", session-name=\"%s\"",
513 session->id, session->session_name->str);
514
4164020e
SM
515 trace_idx = 0;
516 while (trace_idx < session->traces->len) {
517 struct lttng_live_trace *trace =
518 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
519
520 status = lttng_live_metadata_update(trace);
521 switch (status) {
522 case LTTNG_LIVE_ITERATOR_STATUS_END:
523 case LTTNG_LIVE_ITERATOR_STATUS_OK:
524 trace_idx++;
525 break;
526 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
527 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
528 goto end;
529 default:
530 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
531 "Error updating trace metadata: "
532 "stream-iter-status=%s, trace-id=%" PRIu64,
533 lttng_live_iterator_status_string(status), trace->id);
534 goto end;
535 }
536 }
537
538 /*
539 * Now that we have the metadata we can initialize the downstream
540 * iterator.
541 */
542 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
543
544end:
4164020e 545 return status;
7cdc2bab
MD
546}
547
4164020e
SM
548static void
549lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 550{
4164020e
SM
551 uint64_t session_idx, trace_idx;
552 bt_logging_level log_level = lttng_live_msg_iter->log_level;
553 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
554
555 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
556 struct lttng_live_session *session =
557 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
558 BT_COMP_LOGD("Force marking session as needing new streams: "
559 "session-id=%" PRIu64,
560 session->id);
561 session->new_streams_needed = true;
562 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
563 struct lttng_live_trace *trace =
564 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
565 BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
566 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
567 session->id, trace->id);
568
569 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
570
571 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
572 }
573 }
7cdc2bab
MD
574}
575
4164020e
SM
576static enum lttng_live_iterator_status
577lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 578{
4164020e
SM
579 enum lttng_live_iterator_status status;
580 enum lttng_live_viewer_status viewer_status;
581 bt_logging_level log_level = lttng_live_msg_iter->log_level;
582 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
583 uint64_t session_idx = 0, nr_sessions_opened = 0;
584 struct lttng_live_session *session;
585 enum session_not_found_action sess_not_found_act =
586 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
587
588 BT_COMP_LOGD("Update data and metadata of all sessions: "
589 "live-msg-iter-addr=%p",
590 lttng_live_msg_iter);
591 /*
592 * In a remotely distant future, we could add a "new
593 * session" flag to the protocol, which would tell us that we
594 * need to query for new sessions even though we have sessions
595 * currently ongoing.
596 */
597 if (lttng_live_msg_iter->sessions->len == 0) {
598 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
599 BT_COMP_LOGD(
600 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
601 status = LTTNG_LIVE_ITERATOR_STATUS_END;
602 goto end;
603 } else {
604 BT_COMP_LOGD(
605 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
606 /*
607 * Retry to create a viewer session for the requested
608 * session name.
609 */
610 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
611 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
612 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
613 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
614 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
615 "Error creating LTTng live viewer session");
616 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
617 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
618 } else {
619 bt_common_abort();
620 }
621 goto end;
622 }
623 }
624 }
625
626 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
627 session =
628 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
629 status = lttng_live_get_session(lttng_live_msg_iter, session);
630 switch (status) {
631 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 632 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
633 /*
634 * A session returned `_END`. Other sessions may still
635 * be active so we override the status and continue
636 * looping if needed.
637 */
4164020e
SM
638 break;
639 default:
640 goto end;
641 }
642 if (!session->closed) {
643 nr_sessions_opened++;
644 }
645 }
646
647 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
648 status = LTTNG_LIVE_ITERATOR_STATUS_END;
649 } else {
650 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
651 }
f79c2d7a
FD
652
653end:
4164020e 654 return status;
7cdc2bab
MD
655}
656
4164020e
SM
657static enum lttng_live_iterator_status
658emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
659 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
660 uint64_t timestamp)
7cdc2bab 661{
4164020e
SM
662 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
663 bt_logging_level log_level = lttng_live_msg_iter->log_level;
664 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
665 bt_message *msg = NULL;
666
667 BT_ASSERT(stream_iter->trace->clock_class);
668
669 BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
670 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
671 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
672
673 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
674 stream_iter->trace->clock_class, timestamp);
675 if (!msg) {
676 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message");
677 goto error;
678 }
679
680 *message = msg;
7cdc2bab 681end:
4164020e 682 return ret;
7cdc2bab
MD
683
684error:
4164020e
SM
685 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
686 bt_message_put_ref(msg);
687 goto end;
7cdc2bab
MD
688}
689
4164020e
SM
690static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
691 struct lttng_live_msg_iter *lttng_live_msg_iter,
692 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 693{
4164020e
SM
694 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
695
696 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
697 return LTTNG_LIVE_ITERATOR_STATUS_OK;
698 }
699
700 /*
701 * Check if we already sent an inactivty message downstream for this
702 * `current_inactivity_ts` value.
703 */
704 if (lttng_live_stream->last_inactivity_ts.is_set &&
705 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
706 lttng_live_stream_iterator_set_state(lttng_live_stream,
707 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
708
709 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
710 goto end;
711 }
712
713 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
714 lttng_live_stream->current_inactivity_ts);
715
716 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
717 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 718end:
4164020e 719 return ret;
14f28187
FD
720}
721
7d91f1ac 722static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
4164020e 723 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 724{
4164020e
SM
725 const bt_clock_snapshot *clock_snapshot = NULL;
726 int ret = 0;
727 bt_logging_level log_level = lttng_live_msg_iter->log_level;
728 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
729
730 BT_ASSERT_DBG(msg);
731 BT_ASSERT_DBG(ts_ns);
732
733 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
734 "last-msg-ts=%" PRId64,
735 lttng_live_msg_iter, msg, last_msg_ts_ns);
736
737 switch (bt_message_get_type(msg)) {
738 case BT_MESSAGE_TYPE_EVENT:
4164020e
SM
739 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
740 break;
741 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
4164020e
SM
742 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
743 break;
744 case BT_MESSAGE_TYPE_PACKET_END:
4164020e
SM
745 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
746 break;
747 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
4164020e
SM
748 clock_snapshot =
749 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
750 break;
751 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
4164020e
SM
752 clock_snapshot =
753 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
754 break;
755 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
756 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
757 break;
758 default:
759 /* All the other messages have a higher priority */
760 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
761 *ts_ns = last_msg_ts_ns;
762 goto end;
763 }
764
4164020e
SM
765 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
766 if (ret) {
767 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
768 "Cannot get nanoseconds from Epoch of clock snapshot: "
769 "clock-snapshot-addr=%p",
770 clock_snapshot);
771 goto error;
772 }
773
774 goto end;
14f28187 775
14f28187 776error:
4164020e 777 ret = -1;
7cdc2bab 778
7cdc2bab 779end:
4164020e
SM
780 if (ret == 0) {
781 BT_COMP_LOGD("Found message's timestamp: "
782 "iter-data-addr=%p, msg-addr=%p, "
783 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
784 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
785 }
786
787 return ret;
7cdc2bab
MD
788}
789
4164020e
SM
790static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
791 struct lttng_live_msg_iter *lttng_live_msg_iter,
792 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
7cdc2bab 793{
4164020e
SM
794 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
795 bt_logging_level log_level = lttng_live_msg_iter->log_level;
796 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
797 enum ctf_msg_iter_status status;
798 uint64_t session_idx, trace_idx;
799
800 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
801 struct lttng_live_session *session =
802 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
803
804 if (session->new_streams_needed) {
805 BT_COMP_LOGD("Need an update for streams: "
806 "session-id=%" PRIu64,
807 session->id);
808 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
809 goto end;
810 }
811 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
812 struct lttng_live_trace *trace =
813 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
814 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
815 BT_COMP_LOGD("Need an update for metadata stream: "
816 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
817 session->id, trace->id);
818 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
819 goto end;
820 }
821 }
822 }
823
824 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
825 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
826 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
827 "Invalid state of live stream iterator"
828 "stream-iter-status=%s",
829 lttng_live_stream_state_string(lttng_live_stream->state));
830 goto end;
831 }
832
833 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
834 switch (status) {
835 case CTF_MSG_ITER_STATUS_EOF:
836 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
837 break;
838 case CTF_MSG_ITER_STATUS_OK:
839 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
840 break;
841 case CTF_MSG_ITER_STATUS_AGAIN:
842 /*
843 * Continue immediately (end of packet). The next
844 * get_index may return AGAIN to delay the following
845 * attempt.
846 */
847 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
848 break;
849 case CTF_MSG_ITER_STATUS_ERROR:
850 default:
851 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
852 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
853 "CTF message iterator failed to get next message: "
854 "msg-iter=%p, msg-iter-status=%s",
855 lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status));
856 break;
857 }
14f28187
FD
858
859end:
4164020e 860 return ret;
7cdc2bab
MD
861}
862
4164020e
SM
863static enum lttng_live_iterator_status
864lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
865 struct lttng_live_stream_iterator *stream_iter,
866 const bt_message **curr_msg)
4a39caef 867{
4164020e
SM
868 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
869 bt_logging_level log_level = lttng_live_msg_iter->log_level;
870 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
871
872 BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
873 "viewer-stream-id=%" PRIu64,
874 stream_iter->name->str, stream_iter->viewer_stream_id);
875
876 /*
877 * The viewer has hung up on us so we are closing the stream. The
878 * `ctf_msg_iter` should simply realize that it needs to close the
879 * stream properly by emitting the necessary stream end message.
880 */
881 enum ctf_msg_iter_status status =
882 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
883
884 if (status == CTF_MSG_ITER_STATUS_ERROR) {
885 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
886 "Error getting the next message from CTF message iterator");
887 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
888 goto end;
889 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
890 BT_COMP_LOGI("Reached the end of the live stream iterator.");
891 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
892 goto end;
893 }
894
895 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef
FD
896
897end:
4164020e 898 return live_status;
4a39caef
FD
899}
900
7cdc2bab
MD
901/*
902 * helper function:
903 * handle_no_data_streams()
904 * retry:
905 * - for each ACTIVE_NO_DATA stream:
906 * - query relayd for stream data, or quiescence info.
907 * - if need metadata, get metadata, goto retry.
908 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
909 * - if quiescent, move to QUIESCENT streams
910 * - if fetched data, move to ACTIVE_DATA streams
911 * (at this point each stream either has data, or is quiescent)
912 *
913 *
914 * iterator_next:
915 * handle_new_streams_and_metadata()
916 * - query relayd for known streams, add them as ACTIVE_NO_DATA
917 * - query relayd for metadata
918 *
919 * call handle_active_no_data_streams()
920 *
921 * handle_quiescent_streams()
922 * - if at least one stream is ACTIVE_DATA:
923 * - peek stream event with lowest timestamp -> next_ts
924 * - for each quiescent stream
925 * - if next_ts >= quiescent end
926 * - set state to ACTIVE_NO_DATA
927 * - else
928 * - for each quiescent stream
929 * - set state to ACTIVE_NO_DATA
930 *
931 * call handle_active_no_data_streams()
932 *
933 * handle_active_data_streams()
934 * - if at least one stream is ACTIVE_DATA:
935 * - get stream event with lowest timestamp from heap
d6e69534 936 * - make that stream event the current message.
7cdc2bab
MD
937 * - move this stream heap position to its next event
938 * - if we need to fetch data from relayd, move
939 * stream to ACTIVE_NO_DATA.
940 * - return OK
941 * - return AGAIN
942 *
943 * end criterion: ctrl-c on client. If relayd exits or the session
944 * closes on the relay daemon side, we keep on waiting for streams.
945 * Eventually handle --end timestamp (also an end criterion).
946 *
947 * When disconnected from relayd: try to re-connect endlessly.
948 */
4164020e
SM
949static enum lttng_live_iterator_status
950lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
951 struct lttng_live_stream_iterator *stream_iter,
952 const bt_message **curr_msg)
7cdc2bab 953{
4164020e
SM
954 bt_logging_level log_level = lttng_live_msg_iter->log_level;
955 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
956 enum lttng_live_iterator_status live_status;
957
958 BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
959 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
960 stream_iter->name->str, stream_iter->viewer_stream_id);
961
962 if (stream_iter->has_stream_hung_up) {
963 /*
964 * The stream has hung up and the stream was properly closed
965 * during the last call to the current function. Return _END
966 * status now so that this stream iterator is removed for the
967 * stream iterator list.
968 */
969 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
970 goto end;
971 }
4a39caef 972
7cdc2bab 973retry:
4164020e
SM
974 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
975
976 /*
977 * Make sure we have the most recent metadata and possibly some new
978 * streams.
979 */
980 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
981 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
982 goto end;
983 }
984
985 live_status =
986 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
987 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
988 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
989 /*
990 * We overwrite `live_status` since `curr_msg` is
991 * likely set to a valid message in this function.
992 */
993 live_status =
994 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
995 }
996 goto end;
997 }
998
999 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
1000 stream_iter, curr_msg);
1001 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1002 BT_ASSERT(!*curr_msg);
1003 goto end;
1004 }
1005 if (*curr_msg) {
1006 goto end;
1007 }
1008 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
1009 stream_iter, curr_msg);
1010 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1011 BT_ASSERT(!*curr_msg);
1012 }
7cdc2bab
MD
1013
1014end:
4164020e
SM
1015 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
1016 BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
1017 goto retry;
1018 }
14f28187 1019
4164020e
SM
1020 BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, "
1021 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1022 lttng_live_iterator_status_string(live_status), stream_iter->name->str,
1023 stream_iter->viewer_stream_id);
f93afbf9 1024
4164020e 1025 return live_status;
7cdc2bab
MD
1026}
1027
4164020e 1028static bool is_discarded_packet_or_event_message(const bt_message *msg)
8ec4d5ff 1029{
4164020e 1030 const enum bt_message_type msg_type = bt_message_get_type(msg);
8ec4d5ff 1031
4164020e
SM
1032 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
1033 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
8ec4d5ff
JG
1034}
1035
4164020e
SM
1036static enum lttng_live_iterator_status
1037adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
1038 const bt_message *msg_in, bt_message **msg_out,
1039 uint64_t new_begin_ts)
8ec4d5ff 1040{
4164020e
SM
1041 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1042 enum bt_property_availability availability;
1043 const bt_clock_snapshot *clock_snapshot;
1044 uint64_t end_ts;
1045 uint64_t count;
1046
1047 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
1048 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1049
1050 availability = bt_message_discarded_packets_get_count(msg_in, &count);
1051 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1052
1053 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
1054 iter, stream, new_begin_ts, end_ts);
1055 if (!*msg_out) {
1056 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1057 goto end;
1058 }
1059
1060 bt_message_discarded_packets_set_count(*msg_out, count);
8ec4d5ff 1061end:
4164020e 1062 return status;
8ec4d5ff
JG
1063}
1064
4164020e
SM
1065static enum lttng_live_iterator_status
1066adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
1067 const bt_message *msg_in, bt_message **msg_out,
1068 uint64_t new_begin_ts)
8ec4d5ff 1069{
4164020e
SM
1070 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1071 enum bt_property_availability availability;
1072 const bt_clock_snapshot *clock_snapshot;
1073 uint64_t end_ts;
1074 uint64_t count;
1075
1076 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
1077 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1078
1079 availability = bt_message_discarded_events_get_count(msg_in, &count);
1080 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1081
1082 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
1083 iter, stream, new_begin_ts, end_ts);
1084 if (!*msg_out) {
1085 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1086 goto end;
1087 }
1088
1089 bt_message_discarded_events_set_count(*msg_out, count);
8ec4d5ff 1090end:
4164020e 1091 return status;
8ec4d5ff
JG
1092}
1093
4164020e
SM
1094static enum lttng_live_iterator_status
1095handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1096 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1097 const bt_message *late_msg)
285951be 1098{
4164020e
SM
1099 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1100 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1101 const bt_clock_class *clock_class;
1102 const bt_stream_class *stream_class;
1103 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1104 int64_t last_inactivity_ts_ns;
1105 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1106 enum lttng_live_iterator_status adjust_status;
1107 bt_message *adjusted_message;
1108
1109 /*
1110 * The timestamp of the current message is before the last message sent
1111 * by this component. We CANNOT send it as is.
1112 *
1113 * The only expected scenario in which that could happen is the
1114 * following, everything else is a bug in this component, relay deamon,
1115 * or CTF parser.
1116 *
1117 * Expected scenario: The CTF message iterator emitted discarded
1118 * packets and discarded events with synthesized beginning and end
1119 * timestamps from the bounds of the last known packet and the newly
1120 * decoded packet header. The CTF message iterator is not aware of
1121 * stream inactivity beacons. Hence, we have to adjust the beginning
1122 * timestamp of those types of messages if a stream signalled its
1123 * inactivity up until _after_ the last known packet's beginning
1124 * timestamp.
1125 *
1126 * Otherwise, the monotonicity guarantee of message timestamps would
1127 * not be preserved.
1128 *
1129 * In short, the only scenario in which it's okay and fixable to
1130 * received a late message is when:
1131 * 1. the late message is a discarded packets or discarded events
1132 * message,
1133 * 2. this stream produced an inactivity message downstream, and
1134 * 3. the timestamp of the late message is within the inactivity
1135 * timespan we sent downstream through the inactivity message.
1136 */
1137
1138 BT_COMP_LOGD("Handling late message on live stream iterator: "
1139 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1140 stream_iter->name->str, stream_iter->viewer_stream_id);
1141
1142 if (!stream_iter->last_inactivity_ts.is_set) {
1143 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: "
1144 "have a late message when no inactivity message "
1145 "was ever sent for that stream.");
1146 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1147 goto end;
1148 }
1149
1150 if (!is_discarded_packet_or_event_message(late_msg)) {
1151 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1152 "Invalid live stream state: "
1153 "have a late message that is not a packet discarded or "
1154 "event discarded message: late-msg-type=%s",
1155 bt_common_message_type_string(bt_message_get_type(late_msg)));
1156 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1157 goto end;
1158 }
1159
1160 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1161 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1162
1163 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1164 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1165 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1166 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last "
1167 "inactivity message timestamp to nanoseconds");
1168 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1169 goto end;
1170 }
1171
1172 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1173 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1174 "Invalid live stream state: "
1175 "have a late message that is none included in a stream "
1176 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1177 "late-msg-ts-ns=%" PRIu64,
1178 last_inactivity_ts_ns, late_msg_ts_ns);
1179 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1180 goto end;
1181 }
1182
1183 /*
1184 * We now know that it's okay for this message to be late, we can now
1185 * adjust its timestamp to ensure monotonicity.
1186 */
1187 BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1188 "msg-new-ts-ns=%" PRIu64,
1189 bt_common_message_type_string(bt_message_get_type(late_msg)),
1190 stream_iter->last_inactivity_ts.value);
1191 switch (bt_message_get_type(late_msg)) {
1192 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1193 adjust_status = adjust_discarded_events_message(
1194 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1195 stream_iter->last_inactivity_ts.value);
1196 break;
1197 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1198 adjust_status = adjust_discarded_packets_message(
1199 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1200 stream_iter->last_inactivity_ts.value);
1201 break;
1202 default:
1203 bt_common_abort();
1204 }
1205
1206 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1207 stream_iter_status = adjust_status;
1208 goto end;
1209 }
1210
1211 BT_ASSERT_DBG(adjusted_message);
1212 stream_iter->current_msg = adjusted_message;
1213 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1214 bt_message_put_ref(late_msg);
cefd84f8 1215
285951be 1216end:
4164020e 1217 return stream_iter_status;
285951be
FD
1218}
1219
4164020e
SM
1220static enum lttng_live_iterator_status
1221next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1222 struct lttng_live_trace *live_trace,
1223 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1224{
4164020e
SM
1225 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1226 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1227 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1228 enum lttng_live_iterator_status stream_iter_status;
1229 ;
1230 int64_t youngest_candidate_msg_ts = INT64_MAX;
1231 uint64_t stream_iter_idx;
1232
1233 BT_ASSERT_DBG(live_trace);
1234 BT_ASSERT_DBG(live_trace->stream_iterators);
1235
1236 BT_COMP_LOGD("Finding the next stream iterator for trace: "
1237 "trace-id=%" PRIu64,
1238 live_trace->id);
1239 /*
1240 * Update the current message of every stream iterators of this trace.
1241 * The current msg of every stream must have a timestamp equal or
1242 * larger than the last message returned by this iterator. We must
1243 * ensure monotonicity.
1244 */
1245 stream_iter_idx = 0;
1246 while (stream_iter_idx < live_trace->stream_iterators->len) {
1247 bool stream_iter_is_ended = false;
1248 struct lttng_live_stream_iterator *stream_iter =
1249 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1250 stream_iter_idx);
1251
1252 /*
1253 * If there is no current message for this stream, go fetch
1254 * one.
1255 */
1256 while (!stream_iter->current_msg) {
1257 const bt_message *msg = NULL;
1258 int64_t curr_msg_ts_ns = INT64_MAX;
1259
1260 stream_iter_status =
1261 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1262
1263 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1264 stream_iter_is_ended = true;
1265 break;
1266 }
1267
1268 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1269 goto end;
1270 }
1271
1272 BT_ASSERT_DBG(msg);
1273
1274 BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, "
1275 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1276 bt_common_message_type_string(bt_message_get_type(msg)),
1277 stream_iter->name->str, stream_iter->viewer_stream_id);
1278
1279 /*
1280 * Get the timestamp in nanoseconds from origin of this
1281 * messsage.
1282 */
7d91f1ac
SM
1283 live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
1284 &curr_msg_ts_ns);
4164020e
SM
1285
1286 /*
1287 * Check if the message of the current live stream
1288 * iterator occurred at the exact same time or after the
1289 * last message returned by this component's message
1290 * iterator. If not, we need to handle it with care.
1291 */
1292 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1293 stream_iter->current_msg = msg;
1294 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1295 } else {
1296 /*
1297 * We received a message from the past. This
1298 * may be fixable but it can also be an error.
1299 */
1300 stream_iter_status =
1301 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1302 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1303 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1304 "Late message could not be handled correctly: "
1305 "lttng-live-msg-iter-addr=%p, "
1306 "stream-name=\"%s\", "
1307 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1308 lttng_live_msg_iter, stream_iter->name->str,
1309 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1310 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1311 goto end;
1312 }
1313 }
1314 }
1315
1316 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1317
1318 if (!stream_iter_is_ended) {
1319 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1320 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1321 /*
1322 * Update the current best candidate message
1323 * for the stream iterator of this live trace
1324 * to be forwarded downstream.
1325 */
1326 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1327 youngest_candidate_stream_iter = stream_iter;
1328 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1329 /*
1330 * Order the messages in an arbitrary but
1331 * deterministic way.
1332 */
1333 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1334 int ret = common_muxing_compare_messages(
1335 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1336 if (ret < 0) {
1337 /*
1338 * The `youngest_candidate_stream_iter->current_msg`
1339 * should go first. Update the next
1340 * iterator and the current timestamp.
1341 */
1342 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1343 youngest_candidate_stream_iter = stream_iter;
1344 } else if (ret == 0) {
1345 /*
1346 * Unable to pick which one should go
1347 * first.
1348 */
1349 BT_COMP_LOGW(
1350 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1351 "stream-iter-addr=%p"
1352 "stream-iter-addr=%p",
1353 stream_iter, youngest_candidate_stream_iter);
1354 }
1355 }
1356
1357 stream_iter_idx++;
1358 } else {
1359 /*
1360 * The live stream iterator has ended. That
1361 * iterator is removed from the array, but
1362 * there is no need to increment
1363 * stream_iter_idx as
1364 * g_ptr_array_remove_index_fast replaces the
1365 * removed element with the array's last
1366 * element.
1367 */
1368 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1369 }
1370 }
1371
1372 if (youngest_candidate_stream_iter) {
1373 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1374 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1375 } else {
1376 /*
1377 * The only case where we don't have a candidate for this trace
1378 * is if we reached the end of all the iterators.
1379 */
1380 BT_ASSERT(live_trace->stream_iterators->len == 0);
1381 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1382 }
14f28187
FD
1383
1384end:
4164020e 1385 return stream_iter_status;
14f28187
FD
1386}
1387
4164020e
SM
1388static enum lttng_live_iterator_status
1389next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1390 struct lttng_live_session *session,
1391 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1392{
4164020e
SM
1393 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1394 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1395 enum lttng_live_iterator_status stream_iter_status;
1396 uint64_t trace_idx = 0;
1397 int64_t youngest_candidate_msg_ts = INT64_MAX;
1398 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1399
1400 BT_COMP_LOGD("Finding the next stream iterator for session: "
1401 "session-id=%" PRIu64,
1402 session->id);
1403 /*
1404 * Make sure we are attached to the session and look for new streams
1405 * and metadata.
1406 */
1407 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1408 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1409 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1410 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1411 goto end;
1412 }
1413
1414 BT_ASSERT_DBG(session->traces);
1415
1416 while (trace_idx < session->traces->len) {
1417 bool trace_is_ended = false;
1418 struct lttng_live_stream_iterator *stream_iter;
1419 struct lttng_live_trace *trace =
1420 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1421
1422 stream_iter_status =
1423 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1424 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1425 /*
1426 * All the live stream iterators for this trace are
1427 * ENDed. Remove the trace from this session.
1428 */
1429 trace_is_ended = true;
1430 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1431 goto end;
1432 }
1433
1434 if (!trace_is_ended) {
1435 BT_ASSERT_DBG(stream_iter);
1436
1437 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1438 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1439 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1440 youngest_candidate_stream_iter = stream_iter;
1441 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1442 /*
1443 * Order the messages in an arbitrary but
1444 * deterministic way.
1445 */
1446 int ret = common_muxing_compare_messages(
1447 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1448 if (ret < 0) {
1449 /*
1450 * The `youngest_candidate_stream_iter->current_msg`
1451 * should go first. Update the next iterator
1452 * and the current timestamp.
1453 */
1454 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1455 youngest_candidate_stream_iter = stream_iter;
1456 } else if (ret == 0) {
1457 /* Unable to pick which one should go first. */
1458 BT_COMP_LOGW(
1459 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1460 "stream-iter-addr=%p"
1461 "stream-iter-addr=%p",
1462 stream_iter, youngest_candidate_stream_iter);
1463 }
1464 }
1465 trace_idx++;
1466 } else {
1467 /*
1468 * trace_idx is not incremented since
1469 * g_ptr_array_remove_index_fast replaces the
1470 * element at trace_idx with the array's last element.
1471 */
1472 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1473 }
1474 }
1475 if (youngest_candidate_stream_iter) {
1476 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1477 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1478 } else {
1479 /*
1480 * The only cases where we don't have a candidate for this
1481 * trace is:
1482 * 1. if we reached the end of all the iterators of all the
1483 * traces of this session,
1484 * 2. if we never had live stream iterator in the first place.
1485 *
1486 * In either cases, we return END.
1487 */
1488 BT_ASSERT(session->traces->len == 0);
1489 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1490 }
7cdc2bab 1491end:
4164020e 1492 return stream_iter_status;
14f28187
FD
1493}
1494
4164020e 1495static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1496{
4164020e 1497 uint64_t i;
14f28187 1498
4164020e
SM
1499 for (i = 0; i < count; i++) {
1500 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1501 }
7cdc2bab
MD
1502}
1503
4164020e
SM
1504bt_message_iterator_class_next_method_status
1505lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1506 uint64_t capacity, uint64_t *count)
d3eb6e8f 1507{
4164020e
SM
1508 bt_message_iterator_class_next_method_status status;
1509 enum lttng_live_viewer_status viewer_status;
1510 struct lttng_live_msg_iter *lttng_live_msg_iter =
1511 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1512 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1513 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1514 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1515 enum lttng_live_iterator_status stream_iter_status;
1516 uint64_t session_idx;
1517
1518 *count = 0;
1519
1520 BT_ASSERT_DBG(lttng_live_msg_iter);
1521
1522 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1523 /*
1524 * The iterator was interrupted in a previous call to the
1525 * `_next()` method. We currently do not support generating
1526 * messages after such event. The babeltrace2 CLI should never
1527 * be running the graph after being interrupted. So this check
1528 * is to prevent other graph users from using this live
1529 * iterator in an messed up internal state.
1530 */
1531 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1532 BT_COMP_LOGE_APPEND_CAUSE(
1533 self_comp,
1534 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1535 goto end;
1536 }
1537
1538 /*
1539 * Clear all the invalid message reference that might be left over in
1540 * the output array.
1541 */
1542 memset(msgs, 0, capacity * sizeof(*msgs));
1543
1544 /*
1545 * If no session are exposed on the relay found at the url provided by
1546 * the user, session count will be 0. In this case, we return status
1547 * end to return gracefully.
1548 */
1549 if (lttng_live_msg_iter->sessions->len == 0) {
1550 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1551 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1552 goto end;
1553 } else {
1554 /*
1555 * The are no more active session for this session
1556 * name. Retry to create a viewer session for the
1557 * requested session name.
1558 */
1559 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1560 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1561 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1562 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1563 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1564 "Error creating LTTng live viewer session");
1565 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1566 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1567 } else {
1568 bt_common_abort();
1569 }
1570 goto end;
1571 }
1572 }
1573 }
1574
1575 if (lttng_live_msg_iter->active_stream_iter == 0) {
1576 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1577 }
1578
1579 /*
1580 * Here the muxing of message is done.
1581 *
1582 * We need to iterate over all the streams of all the traces of all the
1583 * viewer sessions in order to get the message with the smallest
1584 * timestamp. In this case, a session is a viewer session and there is
1585 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1586 * kernel). Each viewer session can have multiple traces, for example,
1587 * 64bit UST viewer sessions could have multiple per-pid traces.
1588 *
1589 * We iterate over the streams of each traces to update and see what is
1590 * their next message's timestamp. From those timestamps, we select the
1591 * message with the smallest timestamp as the best candidate message
1592 * for that trace and do the same thing across all the sessions.
1593 *
1594 * We then compare the timestamp of best candidate message of all the
1595 * sessions to pick the message with the smallest timestamp and we
1596 * return it.
1597 */
1598 while (*count < capacity) {
1599 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1600 *candidate_stream_iter = NULL;
1601 int64_t youngest_msg_ts_ns = INT64_MAX;
1602
1603 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1604 session_idx = 0;
1605 while (session_idx < lttng_live_msg_iter->sessions->len) {
1606 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1607 lttng_live_msg_iter->sessions, session_idx);
1608
1609 /* Find the best candidate message to send downstream. */
1610 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1611 &candidate_stream_iter);
1612
1613 /* If we receive an END status, it means that either:
1614 * - Those traces never had active streams (UST with no
1615 * data produced yet),
1616 * - All live stream iterators have ENDed.*/
1617 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1618 if (session->closed && session->traces->len == 0) {
1619 /*
1620 * Remove the session from the list.
1621 * session_idx is not modified since
1622 * g_ptr_array_remove_index_fast
1623 * replaces the the removed element with
1624 * the array's last element.
1625 */
1626 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1627 } else {
1628 session_idx++;
1629 }
1630 continue;
1631 }
1632
1633 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1634 goto return_status;
1635 }
1636
1637 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1638 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1639 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1640 youngest_stream_iter = candidate_stream_iter;
1641 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1642 /*
1643 * The currently selected message to be sent
1644 * downstream next has the exact same timestamp
1645 * that of the current candidate message. We
1646 * must break the tie in a predictable manner.
1647 */
1648 BT_COMP_LOGD_STR(
1649 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1650 /*
1651 * Order the messages in an arbitrary but
1652 * deterministic way.
1653 */
1654 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1655 youngest_stream_iter->current_msg);
1656 if (ret < 0) {
1657 /*
1658 * The `candidate_stream_iter->current_msg`
1659 * should go first. Update the next
1660 * iterator and the current timestamp.
1661 */
1662 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1663 youngest_stream_iter = candidate_stream_iter;
1664 } else if (ret == 0) {
1665 /* Unable to pick which one should go first. */
1666 BT_COMP_LOGW(
1667 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1668 "next-stream-iter-addr=%p"
1669 "candidate-stream-iter-addr=%p",
1670 youngest_stream_iter, candidate_stream_iter);
1671 }
1672 }
1673
1674 session_idx++;
1675 }
1676
1677 if (!youngest_stream_iter) {
1678 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1679 goto return_status;
1680 }
1681
1682 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1683 /* Ensure monotonicity. */
1684 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1685 youngest_stream_iter->current_msg_ts_ns);
1686
1687 /*
1688 * Insert the next message to the message batch. This will set
1689 * stream iterator current messsage to NULL so that next time
1690 * we fetch the next message of that stream iterator
1691 */
1692 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1693 (*count)++;
1694
1695 /* Update the last timestamp in nanoseconds sent downstream. */
1696 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1697 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1698
1699 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1700 }
f79c2d7a
FD
1701
1702return_status:
4164020e
SM
1703 switch (stream_iter_status) {
1704 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1705 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1706 /*
1707 * If we gathered messages, return _OK even if the graph was
1708 * interrupted. This allows for the components downstream to at
1709 * least get the thoses messages. If the graph was indeed
1710 * interrupted there should not be another _next() call as the
1711 * application will tear down the graph. This component class
1712 * doesn't support restarting after an interruption.
1713 */
1714 if (*count > 0) {
1715 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1716 } else {
1717 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1718 }
1719 break;
1720 case LTTNG_LIVE_ITERATOR_STATUS_END:
1721 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1722 break;
1723 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1724 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1725 "Memory error preparing the next batch of messages: "
1726 "live-iter-status=%s",
1727 lttng_live_iterator_status_string(stream_iter_status));
1728 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1729 break;
1730 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1731 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1732 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1733 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1734 "Error preparing the next batch of messages: "
1735 "live-iter-status=%s",
1736 lttng_live_iterator_status_string(stream_iter_status));
1737
1738 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1739 /* Put all existing messages on error. */
1740 put_messages(msgs, *count);
1741 break;
1742 default:
1743 bt_common_abort();
1744 }
14f28187 1745
f79c2d7a 1746end:
4164020e 1747 return status;
7cdc2bab 1748}
41a2b7ae 1749
4164020e
SM
1750static struct lttng_live_msg_iter *
1751lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1752 bt_self_message_iterator *self_msg_it)
4f74db52 1753{
4164020e
SM
1754 bt_self_component *self_comp = lttng_live_comp->self_comp;
1755 bt_logging_level log_level = lttng_live_comp->log_level;
4f74db52 1756
4164020e
SM
1757 struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1758 if (!lttng_live_msg_iter) {
1759 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
1760 goto end;
1761 }
1762
1763 lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
1764 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1765 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1766 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1767
1768 lttng_live_msg_iter->active_stream_iter = 0;
1769 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1770 lttng_live_msg_iter->was_interrupted = false;
4f74db52 1771
4164020e
SM
1772 lttng_live_msg_iter->sessions =
1773 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1774 BT_ASSERT(lttng_live_msg_iter->sessions);
1775
1776end:
1777 return lttng_live_msg_iter;
4f74db52
FD
1778}
1779
4164020e
SM
1780bt_message_iterator_class_initialize_method_status
1781lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
7d91f1ac 1782 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
7cdc2bab 1783{
4164020e
SM
1784 bt_message_iterator_class_initialize_method_status status;
1785 struct lttng_live_component *lttng_live;
1786 struct lttng_live_msg_iter *lttng_live_msg_iter;
1787 enum lttng_live_viewer_status viewer_status;
1788 bt_logging_level log_level;
1789 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1790
1791 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1792 log_level = lttng_live->log_level;
1793 self_comp = lttng_live->self_comp;
1794
1795 /* There can be only one downstream iterator at the same time. */
1796 BT_ASSERT(!lttng_live->has_msg_iter);
1797 lttng_live->has_msg_iter = true;
1798
1799 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1800 if (!lttng_live_msg_iter) {
4164020e 1801 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter");
10265ebe 1802 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e
SM
1803 goto error;
1804 }
1805
1806 viewer_status = live_viewer_connection_create(
1807 self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter,
1808 &lttng_live_msg_iter->viewer_connection);
1809 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1810 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1811 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection");
1812 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1813 /*
1814 * Interruption in the _iter_init() method is not
1815 * supported. Return an error.
1816 */
1817 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection");
1818 }
10265ebe
SM
1819
1820 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1821 goto error;
1822 }
1823
1824 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1825 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1826 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1827 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session");
1828 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1829 /*
1830 * Interruption in the _iter_init() method is not
1831 * supported. Return an error.
1832 */
1833 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session");
1834 }
10265ebe
SM
1835
1836 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1837 goto error;
1838 }
1839
1840 if (lttng_live_msg_iter->sessions->len == 0) {
1841 switch (lttng_live->params.sess_not_found_act) {
1842 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1843 BT_COMP_LOGI(
1844 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1845 "%s=\"%s\" component parameter: url=\"%s\"",
1846 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1847 lttng_live->params.url->str);
1848 break;
1849 case SESSION_NOT_FOUND_ACTION_FAIL:
1850 BT_COMP_LOGE_APPEND_CAUSE(
1851 self_comp,
1852 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1853 "component parameter: url =\"%s\"",
1854 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1855 lttng_live->params.url->str);
10265ebe 1856 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
1857 goto error;
1858 case SESSION_NOT_FOUND_ACTION_END:
1859 BT_COMP_LOGI(
1860 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1861 "call because of %s=\"%s\" component parameter: "
1862 "url=\"%s\"",
1863 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1864 lttng_live->params.url->str);
1865 break;
1866 default:
1867 bt_common_abort();
1868 }
1869 }
1870
1871 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1872 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1873 goto end;
f79c2d7a 1874
14f28187 1875error:
4164020e 1876 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1877end:
4164020e 1878 return status;
7cdc2bab
MD
1879}
1880
80aff5ef 1881static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1882 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1883 bt_param_validation_value_descr::makeString()},
4164020e
SM
1884 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1885
1886static bt_component_class_query_method_status
1887lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1888 bt_self_component_class *self_comp_class, bt_logging_level log_level)
7cdc2bab 1889{
4164020e
SM
1890 bt_component_class_query_method_status status;
1891 const bt_value *url_value = NULL;
1892 const char *url;
1893 struct live_viewer_connection *viewer_connection = NULL;
1894 enum lttng_live_viewer_status viewer_status;
1895 enum bt_param_validation_status validation_status;
1896 gchar *validate_error = NULL;
1897
1898 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1899 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1900 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1901 goto error;
1902 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1903 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1904 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
1905 goto error;
1906 }
1907
1908 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1909 url = bt_value_string_get(url_value);
1910
1911 viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL,
1912 &viewer_connection);
1913 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1914 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1915 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection");
1916 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1917 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1918 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1919 } else {
1920 bt_common_abort();
1921 }
1922 goto error;
1923 }
1924
1925 status = live_viewer_connection_list_sessions(viewer_connection, result);
1926 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1927 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions");
1928 goto error;
1929 }
1930
1931 goto end;
c7eee084 1932
7cdc2bab 1933error:
4164020e 1934 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1935
4164020e
SM
1936 if (status >= 0) {
1937 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1938 }
c7eee084 1939
7cdc2bab 1940end:
4164020e
SM
1941 if (viewer_connection) {
1942 live_viewer_connection_destroy(viewer_connection);
1943 }
80aff5ef 1944
4164020e 1945 g_free(validate_error);
80aff5ef 1946
4164020e 1947 return status;
7cdc2bab
MD
1948}
1949
4164020e
SM
1950static bt_component_class_query_method_status
1951lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1952 bt_self_component_class *self_comp_class, bt_logging_level log_level)
312df793 1953{
4164020e
SM
1954 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1955 const bt_value *input_type_value;
1956 const bt_value *input_value;
1957 double weight = 0;
dd420a9b 1958 struct bt_common_lttng_live_url_parts parts = {};
4164020e
SM
1959
1960 /* Used by the logging macros */
1961 __attribute__((unused)) bt_self_component *self_comp = NULL;
1962
1963 *result = NULL;
1964 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1965 if (!input_type_value) {
1966 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter.");
1967 goto error;
1968 }
1969
1970 if (!bt_value_is_string(input_type_value)) {
1971 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value.");
1972 goto error;
1973 }
1974
1975 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1976 /* We don't handle file system paths */
1977 goto create_result;
1978 }
1979
1980 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1981 if (!input_value) {
1982 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter.");
1983 goto error;
1984 }
1985
1986 if (!bt_value_is_string(input_value)) {
1987 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1988 "`input` parameter is not a string value.");
1989 goto error;
1990 }
1991
1992 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1993 if (parts.session_name) {
1994 /*
1995 * Looks pretty much like an LTTng live URL: we got the
1996 * session name part, which forms a complete URL.
1997 */
1998 weight = .75;
1999 }
312df793
PP
2000
2001create_result:
4164020e
SM
2002 *result = bt_value_real_create_init(weight);
2003 if (!*result) {
2004 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
2005 goto error;
2006 }
312df793 2007
4164020e 2008 goto end;
312df793
PP
2009
2010error:
4164020e
SM
2011 if (status >= 0) {
2012 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
2013 }
312df793 2014
4164020e 2015 BT_ASSERT(!*result);
312df793
PP
2016
2017end:
4164020e
SM
2018 bt_common_destroy_lttng_live_url_parts(&parts);
2019 return status;
312df793
PP
2020}
2021
4164020e
SM
2022bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
2023 bt_private_query_executor *priv_query_exec,
2024 const char *object, const bt_value *params,
2025 __attribute__((unused)) void *method_data,
2026 const bt_value **result)
7cdc2bab 2027{
4164020e
SM
2028 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2029 bt_self_component *self_comp = NULL;
2030 bt_self_component_class *self_comp_class =
2031 bt_self_component_class_source_as_self_component_class(comp_class);
2032 bt_logging_level log_level = bt_query_executor_get_logging_level(
2033 bt_private_query_executor_as_query_executor_const(priv_query_exec));
2034
2035 if (strcmp(object, "sessions") == 0) {
2036 status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level);
2037 } else if (strcmp(object, "babeltrace.support-info") == 0) {
2038 status = lttng_live_query_support_info(params, result, self_comp_class, log_level);
2039 } else {
2040 BT_COMP_LOGI("Unknown query object `%s`", object);
2041 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
2042 goto end;
2043 }
14f28187
FD
2044
2045end:
4164020e 2046 return status;
7cdc2bab
MD
2047}
2048
4164020e 2049static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
7cdc2bab 2050{
4164020e
SM
2051 if (!lttng_live) {
2052 return;
2053 }
2054 if (lttng_live->params.url) {
2055 g_string_free(lttng_live->params.url, TRUE);
2056 }
2057 g_free(lttng_live);
7cdc2bab
MD
2058}
2059
14f28187 2060void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 2061{
4164020e
SM
2062 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
2063 bt_self_component_source_as_self_component(component));
2064
2065 if (!data) {
2066 return;
2067 }
2068 lttng_live_component_destroy_data(data);
7cdc2bab
MD
2069}
2070
4164020e
SM
2071static enum session_not_found_action
2072parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 2073{
4164020e
SM
2074 enum session_not_found_action action;
2075 const char *no_session_act_str = bt_value_string_get(no_session_param);
2076
2077 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2078 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2079 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2080 action = SESSION_NOT_FOUND_ACTION_FAIL;
2081 } else {
2082 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2083 action = SESSION_NOT_FOUND_ACTION_END;
2084 }
2085
2086 return action;
14f28187
FD
2087}
2088
88730e42
SM
2089static bt_param_validation_value_descr inputs_elem_descr =
2090 bt_param_validation_value_descr::makeString();
80aff5ef
SM
2091
2092static const char *sess_not_found_action_choices[] = {
4164020e
SM
2093 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2094 SESS_NOT_FOUND_ACTION_FAIL_STR,
2095 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
2096};
2097
2098static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
2099 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2100 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2101 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2102 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
2103 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2104
2105static bt_component_class_initialize_method_status
2106lttng_live_component_create(const bt_value *params, bt_logging_level log_level,
2107 bt_self_component *self_comp, struct lttng_live_component **component)
7cdc2bab 2108{
4164020e
SM
2109 struct lttng_live_component *lttng_live = NULL;
2110 const bt_value *inputs_value;
2111 const bt_value *url_value;
2112 const bt_value *value;
2113 const char *url;
2114 enum bt_param_validation_status validation_status;
2115 gchar *validation_error = NULL;
2116 bt_component_class_initialize_method_status status;
2117
2118 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2119 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2120 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2121 goto error;
2122 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2123 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
2124 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2125 goto error;
2126 }
2127
2128 lttng_live = g_new0(struct lttng_live_component, 1);
2129 if (!lttng_live) {
2130 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2131 goto end;
2132 }
2133 lttng_live->log_level = log_level;
2134 lttng_live->self_comp = self_comp;
2135 lttng_live->max_query_size = MAX_QUERY_SIZE;
2136 lttng_live->has_msg_iter = false;
2137
2138 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2139 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2140 url = bt_value_string_get(url_value);
2141
2142 lttng_live->params.url = g_string_new(url);
2143 if (!lttng_live->params.url) {
2144 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2145 goto error;
2146 }
2147
2148 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2149 if (value) {
2150 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2151 } else {
2152 BT_COMP_LOGI("Optional `%s` parameter is missing: "
2153 "defaulting to `%s`.",
2154 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2155 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2156 }
2157
2158 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2159 goto end;
7cdc2bab
MD
2160
2161error:
4164020e
SM
2162 lttng_live_component_destroy_data(lttng_live);
2163 lttng_live = NULL;
7cdc2bab 2164end:
4164020e 2165 g_free(validation_error);
80aff5ef 2166
4164020e
SM
2167 *component = lttng_live;
2168 return status;
d3e4dcd8
PP
2169}
2170
4164020e
SM
2171bt_component_class_initialize_method_status
2172lttng_live_component_init(bt_self_component_source *self_comp_src,
7d91f1ac 2173 bt_self_component_source_configuration *, const bt_value *params, void *)
f3bc2010 2174{
4164020e
SM
2175 struct lttng_live_component *lttng_live;
2176 bt_component_class_initialize_method_status ret;
2177 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2178 bt_logging_level log_level =
2179 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2180 bt_self_component_add_port_status add_port_status;
2181
2182 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
2183 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2184 goto error;
2185 }
2186
2187 add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2188 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2189 ret = (bt_component_class_initialize_method_status) add_port_status;
2190 goto end;
2191 }
2192
2193 bt_self_component_set_data(self_comp, lttng_live);
2194 goto end;
7cdc2bab 2195
19bdff20 2196error:
4164020e
SM
2197 lttng_live_component_destroy_data(lttng_live);
2198 lttng_live = NULL;
7cdc2bab 2199end:
4164020e 2200 return ret;
d85ef162 2201}
This page took 0.22192 seconds and 4 git commands to generate.