src.ctf.lttng-live: make lttng_live_trace::stream_iterators an std::vector
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
CommitLineData
f3bc2010 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
f3bc2010 3 *
14f28187 4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010 7 *
0235b0db 8 * Babeltrace CTF LTTng-live Client Component
f3bc2010
JG
9 */
10
c802cacb 11#include <glib.h>
14f28187
FD
12#include <unistd.h>
13
578e048b 14#include "common/assert.h"
0f5c5d5c 15#include "cpp-common/bt2c/fmt.hpp"
6838d8aa 16#include "cpp-common/bt2c/glib-up.hpp"
db00b877 17#include "cpp-common/bt2c/vector.hpp"
ef9e5f5d 18#include "cpp-common/bt2s/make-unique.hpp"
0f5c5d5c 19#include "cpp-common/vendor/fmt/format.h"
f3bc2010 20
376fc2bd 21#include "plugins/common/muxing/muxing.h"
80aff5ef 22#include "plugins/common/param-validation/param-validation.h"
376fc2bd 23
087cd0f5 24#include "data-stream.hpp"
087cd0f5 25#include "lttng-live.hpp"
c802cacb 26#include "metadata.hpp"
7cdc2bab 27
4164020e
SM
28#define MAX_QUERY_SIZE (256 * 1024)
29#define URL_PARAM "url"
30#define INPUTS_PARAM "inputs"
31#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
32#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
33#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
34#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 35
34533ae0 36void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
4164020e 37 enum lttng_live_stream_state new_state)
34533ae0 38{
0f5c5d5c
SM
39 BT_CPPLOGD_SPEC(stream_iter->logger,
40 "Setting live stream iterator state: viewer-stream-id={}, "
41 "old-state={}, new-state={}",
42 stream_iter->viewer_stream_id, stream_iter->state, new_state);
34533ae0 43
4164020e 44 stream_iter->state = new_state;
34533ae0
FD
45}
46
4164020e
SM
47#define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
48 do { \
0f5c5d5c
SM
49 BT_CPPLOGD_SPEC((live_stream_iter)->logger, \
50 "Live stream iterator state={}, " \
51 "last-inact-ts-is-set={}, last-inact-ts-value={}, " \
52 "curr-inact-ts={}", \
53 (live_stream_iter)->state, (live_stream_iter)->last_inactivity_ts.is_set, \
54 (live_stream_iter)->last_inactivity_ts.value, \
55 (live_stream_iter)->current_inactivity_ts); \
4164020e 56 } while (0);
6f79a7cf 57
9b4f9b42 58bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
6f79a7cf 59{
4164020e 60 bool ret;
6f79a7cf 61
4164020e
SM
62 if (!msg_iter) {
63 ret = false;
64 goto end;
65 }
7cdc2bab 66
4164020e 67 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
4bf0e537 68
14f28187 69end:
4164020e 70 return ret;
7cdc2bab
MD
71}
72
4164020e
SM
73static struct lttng_live_trace *
74lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
d3e4dcd8 75{
4164020e
SM
76 uint64_t trace_idx;
77 struct lttng_live_trace *ret_trace = NULL;
78
79 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
80 struct lttng_live_trace *trace =
81 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
82 if (trace->id == trace_id) {
83 ret_trace = trace;
84 goto end;
85 }
86 }
14f28187
FD
87
88end:
4164020e 89 return ret_trace;
d3eb6e8f
PP
90}
91
4164020e 92static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 93{
0f5c5d5c 94 BT_CPPLOGD_SPEC(trace->logger, "Destroying live trace: trace-id={}", trace->id);
7cdc2bab 95
afb0f12b 96 delete trace;
7cdc2bab
MD
97}
98
4164020e
SM
99static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
100 uint64_t trace_id)
7cdc2bab 101{
0f5c5d5c
SM
102 BT_CPPLOGD_SPEC(session->logger, "Creating live trace: session-id={}, trace-id={}", session->id,
103 trace_id);
4164020e 104
0f5c5d5c 105 lttng_live_trace *trace = new lttng_live_trace {session->logger};
4164020e
SM
106 trace->session = session;
107 trace->id = trace_id;
4164020e
SM
108 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
109 g_ptr_array_add(session->traces, trace);
110
4164020e 111 return trace;
7cdc2bab
MD
112}
113
4164020e
SM
114struct lttng_live_trace *
115lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
116 uint64_t trace_id)
7cdc2bab 117{
4164020e 118 struct lttng_live_trace *trace;
7cdc2bab 119
4164020e
SM
120 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
121 if (trace) {
122 goto end;
123 }
7cdc2bab 124
4164020e
SM
125 /* The session is the owner of the newly created trace. */
126 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 127
14f28187 128end:
4164020e 129 return trace;
7cdc2bab
MD
130}
131
4164020e
SM
132int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
133 const char *hostname, const char *session_name)
7cdc2bab 134{
0f5c5d5c
SM
135 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
136 "Adding live session: "
137 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
138 session_id, hostname, session_name);
4164020e 139
0f5c5d5c 140 lttng_live_session *session = new lttng_live_session {lttng_live_msg_iter->logger};
4164020e
SM
141 session->self_comp = lttng_live_msg_iter->self_comp;
142 session->id = session_id;
143 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
144 BT_ASSERT(session->traces);
145 session->lttng_live_msg_iter = lttng_live_msg_iter;
146 session->new_streams_needed = true;
147 session->hostname = g_string_new(hostname);
148 BT_ASSERT(session->hostname);
149
150 session->session_name = g_string_new(session_name);
151 BT_ASSERT(session->session_name);
152
153 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
afb0f12b
SM
154
155 return 0;
7cdc2bab
MD
156}
157
4164020e 158static void lttng_live_destroy_session(struct lttng_live_session *session)
7cdc2bab 159{
4164020e
SM
160 if (!session) {
161 goto end;
162 }
163
0f5c5d5c
SM
164 BT_CPPLOGD_SPEC(session->logger,
165 "Destroying live session: "
166 "session-id={}, session-name=\"{}\"",
167 session->id, session->session_name->str);
4164020e
SM
168 if (session->id != -1ULL) {
169 if (lttng_live_session_detach(session)) {
170 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
171 /* Old relayd cannot detach sessions. */
0f5c5d5c
SM
172 BT_CPPLOGD_SPEC(session->logger, "Unable to detach lttng live session {}",
173 session->id);
4164020e
SM
174 }
175 }
176 session->id = -1ULL;
177 }
178
179 if (session->traces) {
180 g_ptr_array_free(session->traces, TRUE);
181 }
182
183 if (session->hostname) {
184 g_string_free(session->hostname, TRUE);
185 }
afb0f12b 186
4164020e
SM
187 if (session->session_name) {
188 g_string_free(session->session_name, TRUE);
189 }
afb0f12b
SM
190
191 delete session;
14f28187
FD
192
193end:
4164020e 194 return;
7cdc2bab
MD
195}
196
4164020e 197static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 198{
4164020e
SM
199 if (!lttng_live_msg_iter) {
200 goto end;
201 }
7cdc2bab 202
4164020e
SM
203 if (lttng_live_msg_iter->sessions) {
204 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
205 }
14f28187 206
4164020e
SM
207 if (lttng_live_msg_iter->viewer_connection) {
208 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
209 }
210 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
211 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
14f28187 212
4164020e
SM
213 /* All stream iterators must be destroyed at this point. */
214 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
215 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
14f28187 216
afb0f12b 217 delete lttng_live_msg_iter;
14f28187
FD
218
219end:
4164020e 220 return;
14f28187
FD
221}
222
14f28187
FD
223void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
224{
4164020e 225 struct lttng_live_msg_iter *lttng_live_msg_iter;
14f28187 226
4164020e 227 BT_ASSERT(self_msg_iter);
14f28187 228
4164020e
SM
229 lttng_live_msg_iter =
230 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
231 BT_ASSERT(lttng_live_msg_iter);
232 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
233}
234
4164020e
SM
235static enum lttng_live_iterator_status
236lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 237{
4164020e
SM
238 switch (lttng_live_stream->state) {
239 case LTTNG_LIVE_STREAM_QUIESCENT:
240 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
241 break;
242 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
243 /* Invalid state. */
0f5c5d5c 244 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"ACTIVE_NO_DATA\"");
4164020e
SM
245 bt_common_abort();
246 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
247 /* Invalid state. */
0f5c5d5c 248 BT_CPPLOGF_SPEC(lttng_live_stream->logger, "Unexpected stream state \"QUIESCENT_NO_DATA\"");
4164020e
SM
249 bt_common_abort();
250 case LTTNG_LIVE_STREAM_EOF:
251 break;
252 }
253 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
254}
255
256/*
f93afbf9
FD
257 * For active no data stream, fetch next index. As a result of that it can
258 * become either:
259 * - quiescent: won't have events for a bit,
260 * - have data: need to get that data and produce the event,
261 * - have no data on this stream at this point: need to retry (AGAIN) or return
262 * EOF.
7cdc2bab 263 */
4164020e
SM
264static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
265 struct lttng_live_msg_iter *lttng_live_msg_iter,
266 struct lttng_live_stream_iterator *lttng_live_stream)
7cdc2bab 267{
4164020e
SM
268 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
269 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
270 struct packet_index index;
271
272 if (lttng_live_stream->trace->metadata_stream_state ==
273 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
e28ca558
SM
274 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
275 "Need to get an update for the metadata stream before proceeding "
276 "further with this stream: stream-name=\"{}\"",
277 lttng_live_stream->name);
4164020e
SM
278 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
279 goto end;
280 }
281
282 if (lttng_live_stream->trace->session->new_streams_needed) {
e28ca558
SM
283 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
284 "Need to get an update of all streams before proceeding further "
285 "with this stream: stream-name=\"{}\"",
286 lttng_live_stream->name);
4164020e
SM
287 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
288 goto end;
289 }
290
291 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
292 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
293 goto end;
294 }
295 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
296 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
297 goto end;
298 }
299
300 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
301
302 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
303 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
304 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
305
306 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
307 /*
5cb27175
JG
308 * Because the stream is in the QUIESCENT_NO_DATA
309 * state, we can assert that the last_inactivity_ts was
310 * set and can be safely used in the `if` above.
311 */
4164020e
SM
312 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
313
314 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
315 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
316 } else {
317 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
318 }
319 goto end;
320 }
321
322 lttng_live_stream->base_offset = index.offset;
323 lttng_live_stream->offset = index.offset;
324 lttng_live_stream->len = index.packet_size / CHAR_BIT;
325
0f5c5d5c
SM
326 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
327 "Setting live stream reading info: stream-name=\"{}\", "
328 "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
e28ca558 329 lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
0f5c5d5c
SM
330 lttng_live_stream->base_offset, lttng_live_stream->offset,
331 lttng_live_stream->len);
f93afbf9 332
7cdc2bab 333end:
4164020e
SM
334 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
335 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
336 }
337 return ret;
7cdc2bab
MD
338}
339
340/*
14f28187
FD
341 * Creation of the message requires the ctf trace class to be created
342 * beforehand, but the live protocol gives us all streams (including metadata)
343 * at once. So we split it in three steps: getting streams, getting metadata
344 * (which creates the ctf trace class), and then creating the per-stream
345 * messages.
7cdc2bab 346 */
4164020e
SM
347static enum lttng_live_iterator_status
348lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
349 struct lttng_live_session *session)
7cdc2bab 350{
4164020e
SM
351 enum lttng_live_iterator_status status;
352 uint64_t trace_idx;
353
354 if (!session->attached) {
0f5c5d5c
SM
355 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
356 session->id);
4164020e
SM
357 enum lttng_live_viewer_status attach_status =
358 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
359 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
360 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
361 /*
362 * Clear any causes appended in
363 * `lttng_live_attach_session()` as we want to
364 * return gracefully since the graph was
365 * cancelled.
366 */
367 bt_current_thread_clear_error();
368 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
369 } else {
370 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
371 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
372 "Error attaching to LTTng live session");
4164020e
SM
373 }
374 goto end;
375 }
376 }
377
0f5c5d5c
SM
378 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
379 "Updating all data streams: "
380 "session-id={}, session-name=\"{}\"",
381 session->id, session->session_name->str);
4164020e
SM
382
383 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
a4f118a3
FD
384 switch (status) {
385 case LTTNG_LIVE_ITERATOR_STATUS_OK:
386 break;
387 case LTTNG_LIVE_ITERATOR_STATUS_END:
388 /*
f8a11f24
SM
389 * We received a `_END` from the `_get_new_streams()` function,
390 * which means no more data will ever be received from the data
391 * streams of this session. But it's possible that the metadata
392 * is incomplete.
393 * The live protocol guarantees that we receive all the
394 * metadata needed before we receive data streams needing it.
395 * But it's possible to receive metadata NOT needed by
396 * data streams after the session was closed. For example, this
397 * could happen if a new event is registered and the session is
398 * stopped before any tracepoint for that event is actually
399 * fired.
400 */
0f5c5d5c
SM
401 BT_CPPLOGD_SPEC(
402 lttng_live_msg_iter->logger,
a4f118a3 403 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
0f5c5d5c 404 "session-id={}, session-name=\"{}\"",
a4f118a3
FD
405 session->id, session->session_name->str);
406 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
407 break;
408 default:
4164020e
SM
409 goto end;
410 }
411
0f5c5d5c
SM
412 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
413 "Updating metadata stream for session: "
414 "session-id={}, session-name=\"{}\"",
415 session->id, session->session_name->str);
a4f118a3 416
4164020e
SM
417 trace_idx = 0;
418 while (trace_idx < session->traces->len) {
419 struct lttng_live_trace *trace =
420 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
421
422 status = lttng_live_metadata_update(trace);
423 switch (status) {
424 case LTTNG_LIVE_ITERATOR_STATUS_END:
425 case LTTNG_LIVE_ITERATOR_STATUS_OK:
426 trace_idx++;
427 break;
428 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
429 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
430 goto end;
431 default:
0f5c5d5c
SM
432 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
433 "Error updating trace metadata: "
434 "stream-iter-status={}, trace-id={}",
435 status, trace->id);
4164020e
SM
436 goto end;
437 }
438 }
439
440 /*
441 * Now that we have the metadata we can initialize the downstream
442 * iterator.
443 */
444 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
14f28187
FD
445
446end:
4164020e 447 return status;
7cdc2bab
MD
448}
449
4164020e
SM
450static void
451lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 452{
4164020e 453 uint64_t session_idx, trace_idx;
4164020e
SM
454
455 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
456 struct lttng_live_session *session =
457 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
0f5c5d5c
SM
458 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
459 "Force marking session as needing new streams: "
460 "session-id={}",
461 session->id);
4164020e
SM
462 session->new_streams_needed = true;
463 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
464 struct lttng_live_trace *trace =
465 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
0f5c5d5c
SM
466 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
467 "Force marking trace metadata state as needing an update: "
468 "session-id={}, trace-id={}",
469 session->id, trace->id);
4164020e
SM
470
471 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
472
473 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
474 }
475 }
7cdc2bab
MD
476}
477
4164020e
SM
478static enum lttng_live_iterator_status
479lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 480{
4164020e
SM
481 enum lttng_live_iterator_status status;
482 enum lttng_live_viewer_status viewer_status;
4164020e
SM
483 uint64_t session_idx = 0, nr_sessions_opened = 0;
484 struct lttng_live_session *session;
485 enum session_not_found_action sess_not_found_act =
486 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
487
0f5c5d5c
SM
488 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
489 "Update data and metadata of all sessions: "
490 "live-msg-iter-addr={}",
491 fmt::ptr(lttng_live_msg_iter));
4164020e
SM
492 /*
493 * In a remotely distant future, we could add a "new
494 * session" flag to the protocol, which would tell us that we
495 * need to query for new sessions even though we have sessions
496 * currently ongoing.
497 */
498 if (lttng_live_msg_iter->sessions->len == 0) {
499 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
0f5c5d5c
SM
500 BT_CPPLOGD_SPEC(
501 lttng_live_msg_iter->logger,
4164020e
SM
502 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
503 status = LTTNG_LIVE_ITERATOR_STATUS_END;
504 goto end;
505 } else {
0f5c5d5c
SM
506 BT_CPPLOGD_SPEC(
507 lttng_live_msg_iter->logger,
4164020e
SM
508 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
509 /*
510 * Retry to create a viewer session for the requested
511 * session name.
512 */
513 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
514 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
515 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
516 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
517 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
518 "Error creating LTTng live viewer session");
4164020e
SM
519 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
520 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
521 } else {
522 bt_common_abort();
523 }
524 goto end;
525 }
526 }
527 }
528
529 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
530 session =
531 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
532 status = lttng_live_get_session(lttng_live_msg_iter, session);
533 switch (status) {
534 case LTTNG_LIVE_ITERATOR_STATUS_OK:
4164020e 535 case LTTNG_LIVE_ITERATOR_STATUS_END:
a4f118a3
FD
536 /*
537 * A session returned `_END`. Other sessions may still
538 * be active so we override the status and continue
539 * looping if needed.
540 */
4164020e
SM
541 break;
542 default:
543 goto end;
544 }
545 if (!session->closed) {
546 nr_sessions_opened++;
547 }
548 }
549
550 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
551 status = LTTNG_LIVE_ITERATOR_STATUS_END;
552 } else {
553 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
554 }
f79c2d7a
FD
555
556end:
4164020e 557 return status;
7cdc2bab
MD
558}
559
4164020e
SM
560static enum lttng_live_iterator_status
561emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7
SM
562 struct lttng_live_stream_iterator *stream_iter,
563 bt2::ConstMessage::Shared& message, uint64_t timestamp)
7cdc2bab 564{
4164020e 565 BT_ASSERT(stream_iter->trace->clock_class);
0f5c5d5c
SM
566 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
567 "Emitting inactivity message for stream: ctf-stream-id={}, "
568 "viewer-stream-id={}, timestamp={}",
569 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id,
570 timestamp);
4164020e 571
35e432d7
SM
572 const auto msg = bt_message_message_iterator_inactivity_create(
573 lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
574
4164020e 575 if (!msg) {
0f5c5d5c
SM
576 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
577 "Error emitting message iterator inactivity message");
35e432d7 578 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4164020e
SM
579 }
580
35e432d7
SM
581 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
582 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
583}
584
4164020e
SM
585static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
586 struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7 587 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
7cdc2bab 588{
4164020e
SM
589 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
590
591 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
592 return LTTNG_LIVE_ITERATOR_STATUS_OK;
593 }
594
595 /*
596 * Check if we already sent an inactivty message downstream for this
597 * `current_inactivity_ts` value.
598 */
599 if (lttng_live_stream->last_inactivity_ts.is_set &&
600 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
601 lttng_live_stream_iterator_set_state(lttng_live_stream,
602 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
603
604 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
605 goto end;
606 }
607
608 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
609 lttng_live_stream->current_inactivity_ts);
610
611 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
612 lttng_live_stream->last_inactivity_ts.is_set = true;
14f28187 613end:
4164020e 614 return ret;
14f28187
FD
615}
616
7d91f1ac 617static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
4164020e 618 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
14f28187 619{
4164020e
SM
620 const bt_clock_snapshot *clock_snapshot = NULL;
621 int ret = 0;
4164020e
SM
622
623 BT_ASSERT_DBG(msg);
624 BT_ASSERT_DBG(ts_ns);
625
0f5c5d5c
SM
626 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
627 "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
628 "last-msg-ts={}",
629 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
4164020e
SM
630
631 switch (bt_message_get_type(msg)) {
632 case BT_MESSAGE_TYPE_EVENT:
4164020e
SM
633 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
634 break;
635 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
4164020e
SM
636 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
637 break;
638 case BT_MESSAGE_TYPE_PACKET_END:
4164020e
SM
639 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
640 break;
641 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
4164020e
SM
642 clock_snapshot =
643 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
644 break;
645 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
4164020e
SM
646 clock_snapshot =
647 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
648 break;
649 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
650 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
651 break;
652 default:
653 /* All the other messages have a higher priority */
0f5c5d5c
SM
654 BT_CPPLOGD_STR_SPEC(lttng_live_msg_iter->logger,
655 "Message has no timestamp: using the last message timestamp.");
4164020e
SM
656 *ts_ns = last_msg_ts_ns;
657 goto end;
658 }
659
4164020e
SM
660 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
661 if (ret) {
0f5c5d5c
SM
662 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
663 "Cannot get nanoseconds from Epoch of clock snapshot: "
664 "clock-snapshot-addr={}",
665 fmt::ptr(clock_snapshot));
4164020e
SM
666 goto error;
667 }
668
669 goto end;
14f28187 670
14f28187 671error:
4164020e 672 ret = -1;
7cdc2bab 673
7cdc2bab 674end:
4164020e 675 if (ret == 0) {
0f5c5d5c
SM
676 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
677 "Found message's timestamp: iter-data-addr={}, msg-addr={}, "
678 "last-msg-ts={}, ts={}",
679 fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
4164020e
SM
680 }
681
682 return ret;
7cdc2bab
MD
683}
684
4164020e
SM
685static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
686 struct lttng_live_msg_iter *lttng_live_msg_iter,
35e432d7 687 struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
7cdc2bab 688{
4164020e 689 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e
SM
690 enum ctf_msg_iter_status status;
691 uint64_t session_idx, trace_idx;
692
693 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
694 struct lttng_live_session *session =
695 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
696
697 if (session->new_streams_needed) {
0f5c5d5c
SM
698 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
699 "Need an update for streams: "
700 "session-id={}",
701 session->id);
4164020e
SM
702 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
703 goto end;
704 }
705 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
706 struct lttng_live_trace *trace =
707 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
708 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
0f5c5d5c
SM
709 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
710 "Need an update for metadata stream: "
711 "session-id={}, trace-id={}",
712 session->id, trace->id);
4164020e
SM
713 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
714 goto end;
715 }
716 }
717 }
718
719 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
720 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
721 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
722 "Invalid state of live stream iterator"
723 "stream-iter-status={}",
724 lttng_live_stream->state);
4164020e
SM
725 goto end;
726 }
727
35e432d7
SM
728 const bt_message *msg;
729 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
4164020e
SM
730 switch (status) {
731 case CTF_MSG_ITER_STATUS_EOF:
732 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
733 break;
734 case CTF_MSG_ITER_STATUS_OK:
35e432d7 735 message = bt2::ConstMessage::Shared::createWithoutRef(msg);
4164020e
SM
736 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
737 break;
738 case CTF_MSG_ITER_STATUS_AGAIN:
739 /*
740 * Continue immediately (end of packet). The next
741 * get_index may return AGAIN to delay the following
742 * attempt.
743 */
744 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
745 break;
746 case CTF_MSG_ITER_STATUS_ERROR:
747 default:
748 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
0f5c5d5c
SM
749 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
750 "CTF message iterator failed to get next message: "
751 "msg-iter={}, msg-iter-status={}",
752 fmt::ptr(lttng_live_stream->msg_iter), status);
4164020e
SM
753 break;
754 }
14f28187
FD
755
756end:
4164020e 757 return ret;
7cdc2bab
MD
758}
759
4164020e
SM
760static enum lttng_live_iterator_status
761lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
762 struct lttng_live_stream_iterator *stream_iter,
35e432d7 763 bt2::ConstMessage::Shared& curr_msg)
4a39caef 764{
4164020e 765 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
4164020e 766
0f5c5d5c
SM
767 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
768 "Closing live stream iterator: stream-name=\"{}\", "
769 "viewer-stream-id={}",
e28ca558 770 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
771
772 /*
773 * The viewer has hung up on us so we are closing the stream. The
774 * `ctf_msg_iter` should simply realize that it needs to close the
775 * stream properly by emitting the necessary stream end message.
776 */
35e432d7 777 const bt_message *msg;
4164020e 778 enum ctf_msg_iter_status status =
35e432d7 779 ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
4164020e
SM
780
781 if (status == CTF_MSG_ITER_STATUS_ERROR) {
0f5c5d5c
SM
782 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
783 "Error getting the next message from CTF message iterator");
4164020e
SM
784 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
785 goto end;
786 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
0f5c5d5c
SM
787 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
788 "Reached the end of the live stream iterator.");
4164020e
SM
789 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
790 goto end;
791 }
792
793 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
4a39caef 794
35e432d7
SM
795 curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
796
4a39caef 797end:
4164020e 798 return live_status;
4a39caef
FD
799}
800
7cdc2bab
MD
801/*
802 * helper function:
803 * handle_no_data_streams()
804 * retry:
805 * - for each ACTIVE_NO_DATA stream:
806 * - query relayd for stream data, or quiescence info.
807 * - if need metadata, get metadata, goto retry.
808 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
809 * - if quiescent, move to QUIESCENT streams
810 * - if fetched data, move to ACTIVE_DATA streams
811 * (at this point each stream either has data, or is quiescent)
812 *
813 *
814 * iterator_next:
815 * handle_new_streams_and_metadata()
816 * - query relayd for known streams, add them as ACTIVE_NO_DATA
817 * - query relayd for metadata
818 *
819 * call handle_active_no_data_streams()
820 *
821 * handle_quiescent_streams()
822 * - if at least one stream is ACTIVE_DATA:
823 * - peek stream event with lowest timestamp -> next_ts
824 * - for each quiescent stream
825 * - if next_ts >= quiescent end
826 * - set state to ACTIVE_NO_DATA
827 * - else
828 * - for each quiescent stream
829 * - set state to ACTIVE_NO_DATA
830 *
831 * call handle_active_no_data_streams()
832 *
833 * handle_active_data_streams()
834 * - if at least one stream is ACTIVE_DATA:
835 * - get stream event with lowest timestamp from heap
d6e69534 836 * - make that stream event the current message.
7cdc2bab
MD
837 * - move this stream heap position to its next event
838 * - if we need to fetch data from relayd, move
839 * stream to ACTIVE_NO_DATA.
840 * - return OK
841 * - return AGAIN
842 *
843 * end criterion: ctrl-c on client. If relayd exits or the session
844 * closes on the relay daemon side, we keep on waiting for streams.
845 * Eventually handle --end timestamp (also an end criterion).
846 *
847 * When disconnected from relayd: try to re-connect endlessly.
848 */
4164020e
SM
849static enum lttng_live_iterator_status
850lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
851 struct lttng_live_stream_iterator *stream_iter,
35e432d7 852 bt2::ConstMessage::Shared& curr_msg)
7cdc2bab 853{
4164020e
SM
854 enum lttng_live_iterator_status live_status;
855
0f5c5d5c
SM
856 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
857 "Advancing live stream iterator until next message if possible: "
858 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 859 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
860
861 if (stream_iter->has_stream_hung_up) {
862 /*
863 * The stream has hung up and the stream was properly closed
864 * during the last call to the current function. Return _END
865 * status now so that this stream iterator is removed for the
866 * stream iterator list.
867 */
868 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
869 goto end;
870 }
4a39caef 871
7cdc2bab 872retry:
4164020e
SM
873 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
874
875 /*
876 * Make sure we have the most recent metadata and possibly some new
877 * streams.
878 */
879 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
880 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
881 goto end;
882 }
883
884 live_status =
885 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
886 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
887 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
888 /*
889 * We overwrite `live_status` since `curr_msg` is
890 * likely set to a valid message in this function.
891 */
892 live_status =
893 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
894 }
895 goto end;
896 }
897
898 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
899 stream_iter, curr_msg);
900 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
35e432d7 901 BT_ASSERT(!curr_msg);
4164020e
SM
902 goto end;
903 }
35e432d7 904 if (curr_msg) {
4164020e
SM
905 goto end;
906 }
907 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
908 stream_iter, curr_msg);
909 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
35e432d7 910 BT_ASSERT(!curr_msg);
4164020e 911 }
7cdc2bab
MD
912
913end:
4164020e 914 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
0f5c5d5c
SM
915 BT_CPPLOGD_SPEC(
916 lttng_live_msg_iter->logger,
917 "Ask the relay daemon for an updated view of the data and metadata streams");
4164020e
SM
918 goto retry;
919 }
14f28187 920
0f5c5d5c
SM
921 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
922 "Returning from advancing live stream iterator: status={}, "
923 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 924 live_status, stream_iter->name, stream_iter->viewer_stream_id);
f93afbf9 925
4164020e 926 return live_status;
7cdc2bab
MD
927}
928
35e432d7 929static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
8ec4d5ff 930{
35e432d7
SM
931 return msg.type() == bt2::MessageType::DiscardedEvents ||
932 msg.type() == bt2::MessageType::DiscardedPackets;
8ec4d5ff
JG
933}
934
4164020e
SM
935static enum lttng_live_iterator_status
936adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
35e432d7 937 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
4164020e 938 uint64_t new_begin_ts)
8ec4d5ff 939{
4164020e
SM
940 enum bt_property_availability availability;
941 const bt_clock_snapshot *clock_snapshot;
942 uint64_t end_ts;
943 uint64_t count;
944
945 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
946 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
947
948 availability = bt_message_discarded_packets_get_count(msg_in, &count);
949 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
950
35e432d7 951 const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
4164020e 952 iter, stream, new_begin_ts, end_ts);
35e432d7
SM
953
954 if (!msg) {
955 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
956 }
957
35e432d7
SM
958 bt_message_discarded_packets_set_count(msg, count);
959 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
960 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
961}
962
4164020e
SM
963static enum lttng_live_iterator_status
964adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
35e432d7 965 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
4164020e 966 uint64_t new_begin_ts)
8ec4d5ff 967{
4164020e
SM
968 enum bt_property_availability availability;
969 const bt_clock_snapshot *clock_snapshot;
970 uint64_t end_ts;
971 uint64_t count;
972
973 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
974 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
975
976 availability = bt_message_discarded_events_get_count(msg_in, &count);
977 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
978
35e432d7 979 const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
4164020e 980 iter, stream, new_begin_ts, end_ts);
35e432d7
SM
981
982 if (!msg) {
983 return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
4164020e
SM
984 }
985
35e432d7
SM
986 bt_message_discarded_events_set_count(msg, count);
987 msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
988 return LTTNG_LIVE_ITERATOR_STATUS_OK;
8ec4d5ff
JG
989}
990
4164020e
SM
991static enum lttng_live_iterator_status
992handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
993 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
35e432d7 994 const bt2::ConstMessage& late_msg)
285951be 995{
4164020e
SM
996 const bt_clock_class *clock_class;
997 const bt_stream_class *stream_class;
998 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
999 int64_t last_inactivity_ts_ns;
1000 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1001 enum lttng_live_iterator_status adjust_status;
35e432d7 1002 bt2::ConstMessage::Shared adjusted_message;
4164020e
SM
1003
1004 /*
1005 * The timestamp of the current message is before the last message sent
1006 * by this component. We CANNOT send it as is.
1007 *
1008 * The only expected scenario in which that could happen is the
e7401568 1009 * following, everything else is a bug in this component, relay daemon,
4164020e
SM
1010 * or CTF parser.
1011 *
1012 * Expected scenario: The CTF message iterator emitted discarded
1013 * packets and discarded events with synthesized beginning and end
1014 * timestamps from the bounds of the last known packet and the newly
1015 * decoded packet header. The CTF message iterator is not aware of
1016 * stream inactivity beacons. Hence, we have to adjust the beginning
1017 * timestamp of those types of messages if a stream signalled its
1018 * inactivity up until _after_ the last known packet's beginning
1019 * timestamp.
1020 *
1021 * Otherwise, the monotonicity guarantee of message timestamps would
1022 * not be preserved.
1023 *
1024 * In short, the only scenario in which it's okay and fixable to
1025 * received a late message is when:
1026 * 1. the late message is a discarded packets or discarded events
f8a11f24 1027 * message,
4164020e
SM
1028 * 2. this stream produced an inactivity message downstream, and
1029 * 3. the timestamp of the late message is within the inactivity
f8a11f24 1030 * timespan we sent downstream through the inactivity message.
4164020e
SM
1031 */
1032
0f5c5d5c
SM
1033 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1034 "Handling late message on live stream iterator: "
1035 "stream-name=\"{}\", viewer-stream-id={}",
e28ca558 1036 stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
1037
1038 if (!stream_iter->last_inactivity_ts.is_set) {
0f5c5d5c
SM
1039 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1040 "Invalid live stream state: "
1041 "have a late message when no inactivity message "
1042 "was ever sent for that stream.");
4164020e
SM
1043 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1044 goto end;
1045 }
1046
1047 if (!is_discarded_packet_or_event_message(late_msg)) {
0f5c5d5c
SM
1048 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1049 "Invalid live stream state: "
1050 "have a late message that is not a packet discarded or "
1051 "event discarded message: late-msg-type={}",
35e432d7 1052 late_msg.type());
4164020e
SM
1053 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1054 goto end;
1055 }
1056
0b68f2bc 1057 stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
4164020e
SM
1058 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1059
1060 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1061 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1062 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
0f5c5d5c
SM
1063 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1064 "Error converting last "
1065 "inactivity message timestamp to nanoseconds");
4164020e
SM
1066 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1067 goto end;
1068 }
1069
1070 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
0f5c5d5c
SM
1071 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1072 "Invalid live stream state: "
1073 "have a late message that is none included in a stream "
1074 "inactivity timespan: last-inactivity-ts-ns={}, "
1075 "late-msg-ts-ns={}",
1076 last_inactivity_ts_ns, late_msg_ts_ns);
4164020e
SM
1077 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1078 goto end;
1079 }
1080
1081 /*
1082 * We now know that it's okay for this message to be late, we can now
1083 * adjust its timestamp to ensure monotonicity.
1084 */
0f5c5d5c
SM
1085 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1086 "Adjusting the timestamp of late message: late-msg-type={}, "
1087 "msg-new-ts-ns={}",
35e432d7
SM
1088 late_msg.type(), stream_iter->last_inactivity_ts.value);
1089 switch (late_msg.type()) {
1090 case bt2::MessageType::DiscardedEvents:
4164020e 1091 adjust_status = adjust_discarded_events_message(
35e432d7
SM
1092 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1093 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e 1094 break;
35e432d7 1095 case bt2::MessageType::DiscardedPackets:
4164020e 1096 adjust_status = adjust_discarded_packets_message(
35e432d7
SM
1097 lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
1098 late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
4164020e
SM
1099 break;
1100 default:
1101 bt_common_abort();
1102 }
1103
1104 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1105 stream_iter_status = adjust_status;
1106 goto end;
1107 }
1108
1109 BT_ASSERT_DBG(adjusted_message);
1110 stream_iter->current_msg = adjusted_message;
1111 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
cefd84f8 1112
285951be 1113end:
4164020e 1114 return stream_iter_status;
285951be
FD
1115}
1116
4164020e
SM
1117static enum lttng_live_iterator_status
1118next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1119 struct lttng_live_trace *live_trace,
1120 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
7cdc2bab 1121{
4164020e 1122 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
4164020e 1123 enum lttng_live_iterator_status stream_iter_status;
4164020e
SM
1124 int64_t youngest_candidate_msg_ts = INT64_MAX;
1125 uint64_t stream_iter_idx;
1126
1127 BT_ASSERT_DBG(live_trace);
4164020e 1128
0f5c5d5c
SM
1129 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1130 "Finding the next stream iterator for trace: "
1131 "trace-id={}",
1132 live_trace->id);
4164020e
SM
1133 /*
1134 * Update the current message of every stream iterators of this trace.
1135 * The current msg of every stream must have a timestamp equal or
1136 * larger than the last message returned by this iterator. We must
1137 * ensure monotonicity.
1138 */
1139 stream_iter_idx = 0;
db00b877 1140 while (stream_iter_idx < live_trace->stream_iterators.size()) {
4164020e 1141 bool stream_iter_is_ended = false;
db00b877
SM
1142 lttng_live_stream_iterator *stream_iter =
1143 live_trace->stream_iterators[stream_iter_idx].get();
4164020e
SM
1144
1145 /*
1146 * If there is no current message for this stream, go fetch
1147 * one.
1148 */
1149 while (!stream_iter->current_msg) {
35e432d7 1150 bt2::ConstMessage::Shared msg;
4164020e
SM
1151 int64_t curr_msg_ts_ns = INT64_MAX;
1152
1153 stream_iter_status =
35e432d7 1154 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, msg);
4164020e
SM
1155
1156 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1157 stream_iter_is_ended = true;
1158 break;
1159 }
1160
1161 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1162 goto end;
1163 }
1164
1165 BT_ASSERT_DBG(msg);
1166
0f5c5d5c
SM
1167 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1168 "Live stream iterator returned message: msg-type={}, "
1169 "stream-name=\"{}\", viewer-stream-id={}",
35e432d7 1170 msg->type(), stream_iter->name, stream_iter->viewer_stream_id);
4164020e
SM
1171
1172 /*
1173 * Get the timestamp in nanoseconds from origin of this
e7401568 1174 * message.
4164020e 1175 */
35e432d7
SM
1176 live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
1177 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
4164020e
SM
1178
1179 /*
1180 * Check if the message of the current live stream
1181 * iterator occurred at the exact same time or after the
1182 * last message returned by this component's message
1183 * iterator. If not, we need to handle it with care.
1184 */
1185 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
35e432d7 1186 stream_iter->current_msg = std::move(msg);
4164020e
SM
1187 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1188 } else {
1189 /*
1190 * We received a message from the past. This
1191 * may be fixable but it can also be an error.
1192 */
1193 stream_iter_status =
35e432d7 1194 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, *msg);
4164020e 1195 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
0f5c5d5c
SM
1196 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1197 "Late message could not be handled correctly: "
1198 "lttng-live-msg-iter-addr={}, "
1199 "stream-name=\"{}\", "
1200 "curr-msg-ts={}, last-msg-ts={}",
e28ca558
SM
1201 fmt::ptr(lttng_live_msg_iter), stream_iter->name,
1202 curr_msg_ts_ns,
0f5c5d5c 1203 lttng_live_msg_iter->last_msg_ts_ns);
4164020e
SM
1204 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1205 goto end;
1206 }
1207 }
1208 }
1209
1210 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1211
1212 if (!stream_iter_is_ended) {
1213 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1214 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1215 /*
1216 * Update the current best candidate message
1217 * for the stream iterator of this live trace
1218 * to be forwarded downstream.
1219 */
1220 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1221 youngest_candidate_stream_iter = stream_iter;
1222 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1223 /*
1224 * Order the messages in an arbitrary but
1225 * deterministic way.
1226 */
1227 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1228 int ret = common_muxing_compare_messages(
35e432d7
SM
1229 stream_iter->current_msg->libObjPtr(),
1230 youngest_candidate_stream_iter->current_msg->libObjPtr());
4164020e
SM
1231 if (ret < 0) {
1232 /*
1233 * The `youngest_candidate_stream_iter->current_msg`
1234 * should go first. Update the next
1235 * iterator and the current timestamp.
1236 */
1237 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1238 youngest_candidate_stream_iter = stream_iter;
1239 } else if (ret == 0) {
1240 /*
1241 * Unable to pick which one should go
1242 * first.
1243 */
0f5c5d5c
SM
1244 BT_CPPLOGW_SPEC(
1245 lttng_live_msg_iter->logger,
4164020e 1246 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1247 "stream-iter-addr={}"
1248 "stream-iter-addr={}",
1249 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1250 }
1251 }
1252
1253 stream_iter_idx++;
1254 } else {
1255 /*
1256 * The live stream iterator has ended. That
1257 * iterator is removed from the array, but
1258 * there is no need to increment
1259 * stream_iter_idx as
1260 * g_ptr_array_remove_index_fast replaces the
1261 * removed element with the array's last
1262 * element.
1263 */
db00b877 1264 bt2c::vectorFastRemove(live_trace->stream_iterators, stream_iter_idx);
4164020e
SM
1265 }
1266 }
1267
1268 if (youngest_candidate_stream_iter) {
1269 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1270 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1271 } else {
1272 /*
1273 * The only case where we don't have a candidate for this trace
1274 * is if we reached the end of all the iterators.
1275 */
db00b877 1276 BT_ASSERT(live_trace->stream_iterators.empty());
4164020e
SM
1277 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1278 }
14f28187
FD
1279
1280end:
4164020e 1281 return stream_iter_status;
14f28187
FD
1282}
1283
4164020e
SM
1284static enum lttng_live_iterator_status
1285next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1286 struct lttng_live_session *session,
1287 struct lttng_live_stream_iterator **youngest_session_stream_iter)
14f28187 1288{
4164020e
SM
1289 enum lttng_live_iterator_status stream_iter_status;
1290 uint64_t trace_idx = 0;
1291 int64_t youngest_candidate_msg_ts = INT64_MAX;
1292 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1293
0f5c5d5c
SM
1294 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1295 "Finding the next stream iterator for session: "
1296 "session-id={}",
1297 session->id);
4164020e
SM
1298 /*
1299 * Make sure we are attached to the session and look for new streams
1300 * and metadata.
1301 */
1302 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1303 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1304 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1305 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1306 goto end;
1307 }
1308
1309 BT_ASSERT_DBG(session->traces);
1310
1311 while (trace_idx < session->traces->len) {
1312 bool trace_is_ended = false;
1313 struct lttng_live_stream_iterator *stream_iter;
1314 struct lttng_live_trace *trace =
1315 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1316
1317 stream_iter_status =
1318 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1319 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1320 /*
1321 * All the live stream iterators for this trace are
1322 * ENDed. Remove the trace from this session.
1323 */
1324 trace_is_ended = true;
1325 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1326 goto end;
1327 }
1328
1329 if (!trace_is_ended) {
1330 BT_ASSERT_DBG(stream_iter);
1331
1332 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1333 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1334 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1335 youngest_candidate_stream_iter = stream_iter;
1336 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1337 /*
1338 * Order the messages in an arbitrary but
1339 * deterministic way.
1340 */
1341 int ret = common_muxing_compare_messages(
35e432d7
SM
1342 stream_iter->current_msg->libObjPtr(),
1343 youngest_candidate_stream_iter->current_msg->libObjPtr());
4164020e
SM
1344 if (ret < 0) {
1345 /*
1346 * The `youngest_candidate_stream_iter->current_msg`
1347 * should go first. Update the next iterator
1348 * and the current timestamp.
1349 */
1350 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1351 youngest_candidate_stream_iter = stream_iter;
1352 } else if (ret == 0) {
1353 /* Unable to pick which one should go first. */
0f5c5d5c
SM
1354 BT_CPPLOGW_SPEC(
1355 lttng_live_msg_iter->logger,
4164020e 1356 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
0f5c5d5c
SM
1357 "stream-iter-addr={}"
1358 "youngest-candidate-stream-iter-addr={}",
1359 fmt::ptr(stream_iter), fmt::ptr(youngest_candidate_stream_iter));
4164020e
SM
1360 }
1361 }
1362 trace_idx++;
1363 } else {
1364 /*
1365 * trace_idx is not incremented since
1366 * g_ptr_array_remove_index_fast replaces the
1367 * element at trace_idx with the array's last element.
1368 */
1369 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1370 }
1371 }
1372 if (youngest_candidate_stream_iter) {
1373 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1374 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1375 } else {
1376 /*
1377 * The only cases where we don't have a candidate for this
1378 * trace is:
1379 * 1. if we reached the end of all the iterators of all the
1380 * traces of this session,
1381 * 2. if we never had live stream iterator in the first place.
1382 *
1383 * In either cases, we return END.
1384 */
1385 BT_ASSERT(session->traces->len == 0);
1386 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1387 }
7cdc2bab 1388end:
4164020e 1389 return stream_iter_status;
14f28187
FD
1390}
1391
4164020e 1392static inline void put_messages(bt_message_array_const msgs, uint64_t count)
14f28187 1393{
4164020e 1394 uint64_t i;
14f28187 1395
4164020e
SM
1396 for (i = 0; i < count; i++) {
1397 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1398 }
7cdc2bab
MD
1399}
1400
4164020e
SM
1401bt_message_iterator_class_next_method_status
1402lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1403 uint64_t capacity, uint64_t *count)
d3eb6e8f 1404{
1e690349
SM
1405 try {
1406 bt_message_iterator_class_next_method_status status;
1407 enum lttng_live_viewer_status viewer_status;
1408 struct lttng_live_msg_iter *lttng_live_msg_iter =
1409 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1410 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1411 enum lttng_live_iterator_status stream_iter_status;
1412 uint64_t session_idx;
4164020e 1413
1e690349 1414 *count = 0;
4164020e 1415
1e690349 1416 BT_ASSERT_DBG(lttng_live_msg_iter);
4164020e 1417
1e690349
SM
1418 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1419 /*
f8a11f24
SM
1420 * The iterator was interrupted in a previous call to the
1421 * `_next()` method. We currently do not support generating
1422 * messages after such event. The babeltrace2 CLI should never
1423 * be running the graph after being interrupted. So this check
1424 * is to prevent other graph users from using this live
1425 * iterator in an messed up internal state.
1426 */
1e690349
SM
1427 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1428 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1429 lttng_live_msg_iter->logger,
1430 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1431 goto end;
1432 }
4164020e 1433
1e690349 1434 /*
f8a11f24
SM
1435 * Clear all the invalid message reference that might be left over in
1436 * the output array.
1437 */
1e690349 1438 memset(msgs, 0, capacity * sizeof(*msgs));
4164020e 1439
1e690349 1440 /*
f8a11f24
SM
1441 * If no session are exposed on the relay found at the url provided by
1442 * the user, session count will be 0. In this case, we return status
1443 * end to return gracefully.
1444 */
1e690349
SM
1445 if (lttng_live_msg_iter->sessions->len == 0) {
1446 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1447 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1448 goto end;
1449 } else {
1450 /*
f8a11f24
SM
1451 * The are no more active session for this session
1452 * name. Retry to create a viewer session for the
1453 * requested session name.
1454 */
1e690349
SM
1455 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1456 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1457 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1458 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1459 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1460 "Error creating LTTng live viewer session");
1461 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1462 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1463 } else {
1464 bt_common_abort();
1465 }
1466 goto end;
4164020e 1467 }
4164020e
SM
1468 }
1469 }
4164020e 1470
1e690349
SM
1471 if (lttng_live_msg_iter->active_stream_iter == 0) {
1472 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1473 }
4164020e 1474
1e690349 1475 /*
f8a11f24
SM
1476 * Here the muxing of message is done.
1477 *
1478 * We need to iterate over all the streams of all the traces of all the
1479 * viewer sessions in order to get the message with the smallest
1480 * timestamp. In this case, a session is a viewer session and there is
1481 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1482 * kernel). Each viewer session can have multiple traces, for example,
1483 * 64bit UST viewer sessions could have multiple per-pid traces.
1484 *
1485 * We iterate over the streams of each traces to update and see what is
1486 * their next message's timestamp. From those timestamps, we select the
1487 * message with the smallest timestamp as the best candidate message
1488 * for that trace and do the same thing across all the sessions.
1489 *
1490 * We then compare the timestamp of best candidate message of all the
1491 * sessions to pick the message with the smallest timestamp and we
1492 * return it.
1493 */
1e690349
SM
1494 while (*count < capacity) {
1495 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1496 *candidate_stream_iter = NULL;
1497 int64_t youngest_msg_ts_ns = INT64_MAX;
1498
1499 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1500 session_idx = 0;
1501 while (session_idx < lttng_live_msg_iter->sessions->len) {
1502 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1503 lttng_live_msg_iter->sessions, session_idx);
1504
1505 /* Find the best candidate message to send downstream. */
1506 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1507 &candidate_stream_iter);
1508
1509 /* If we receive an END status, it means that either:
f8a11f24
SM
1510 * - Those traces never had active streams (UST with no
1511 * data produced yet),
1512 * - All live stream iterators have ENDed.
1513 */
1e690349
SM
1514 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1515 if (session->closed && session->traces->len == 0) {
1516 /*
f8a11f24
SM
1517 * Remove the session from the list.
1518 * session_idx is not modified since
1519 * g_ptr_array_remove_index_fast
1520 * replaces the the removed element with
1521 * the array's last element.
1522 */
1e690349
SM
1523 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1524 } else {
1525 session_idx++;
1526 }
1527 continue;
4164020e 1528 }
4164020e 1529
1e690349
SM
1530 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1531 goto return_status;
1532 }
4164020e 1533
1e690349
SM
1534 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1535 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1536 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1537 youngest_stream_iter = candidate_stream_iter;
1538 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1539 /*
f8a11f24
SM
1540 * The currently selected message to be sent
1541 * downstream next has the exact same timestamp
1542 * that of the current candidate message. We
1543 * must break the tie in a predictable manner.
1544 */
1e690349
SM
1545 BT_CPPLOGD_STR_SPEC(
1546 lttng_live_msg_iter->logger,
1547 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1548 /*
f8a11f24
SM
1549 * Order the messages in an arbitrary but
1550 * deterministic way.
1551 */
35e432d7
SM
1552 int ret = common_muxing_compare_messages(
1553 candidate_stream_iter->current_msg->libObjPtr(),
1554 youngest_stream_iter->current_msg->libObjPtr());
1e690349
SM
1555 if (ret < 0) {
1556 /*
f8a11f24
SM
1557 * The `candidate_stream_iter->current_msg`
1558 * should go first. Update the next
1559 * iterator and the current timestamp.
1560 */
1e690349
SM
1561 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1562 youngest_stream_iter = candidate_stream_iter;
1563 } else if (ret == 0) {
1564 /* Unable to pick which one should go first. */
1565 BT_CPPLOGW_SPEC(
1566 lttng_live_msg_iter->logger,
1567 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1568 "next-stream-iter-addr={}"
1569 "candidate-stream-iter-addr={}",
1570 fmt::ptr(youngest_stream_iter), fmt::ptr(candidate_stream_iter));
1571 }
4164020e 1572 }
4164020e 1573
1e690349
SM
1574 session_idx++;
1575 }
4164020e 1576
1e690349
SM
1577 if (!youngest_stream_iter) {
1578 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1579 goto return_status;
1580 }
4164020e 1581
1e690349
SM
1582 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1583 /* Ensure monotonicity. */
1584 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1585 youngest_stream_iter->current_msg_ts_ns);
4164020e 1586
1e690349 1587 /*
f8a11f24
SM
1588 * Insert the next message to the message batch. This will set
1589 * stream iterator current message to NULL so that next time
1590 * we fetch the next message of that stream iterator
1591 */
35e432d7 1592 msgs[*count] = youngest_stream_iter->current_msg.release().libObjPtr();
1e690349 1593 (*count)++;
4164020e 1594
1e690349
SM
1595 /* Update the last timestamp in nanoseconds sent downstream. */
1596 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1597 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
4164020e 1598
1e690349
SM
1599 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1600 }
f79c2d7a
FD
1601
1602return_status:
1e690349
SM
1603 switch (stream_iter_status) {
1604 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1605 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1606 /*
f8a11f24
SM
1607 * If we gathered messages, return _OK even if the graph was
1608 * interrupted. This allows for the components downstream to at
1609 * least get the those messages. If the graph was indeed
1610 * interrupted there should not be another _next() call as the
1611 * application will tear down the graph. This component class
1612 * doesn't support restarting after an interruption.
1613 */
1e690349
SM
1614 if (*count > 0) {
1615 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1616 } else {
1617 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1618 }
1619 break;
1620 case LTTNG_LIVE_ITERATOR_STATUS_END:
1621 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1622 break;
1623 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1624 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1625 "Memory error preparing the next batch of messages: "
1626 "live-iter-status={}",
1627 stream_iter_status);
1628 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1629 break;
1630 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1631 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1632 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1633 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1634 "Error preparing the next batch of messages: "
1635 "live-iter-status={}",
1636 stream_iter_status);
4164020e 1637
1e690349
SM
1638 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1639 /* Put all existing messages on error. */
1640 put_messages(msgs, *count);
1641 break;
1642 default:
1643 bt_common_abort();
1644 }
14f28187 1645
f79c2d7a 1646end:
1e690349
SM
1647 return status;
1648 } catch (const std::bad_alloc&) {
1649 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1650 } catch (const bt2::Error&) {
1651 return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1652 }
7cdc2bab 1653}
41a2b7ae 1654
4164020e
SM
1655static struct lttng_live_msg_iter *
1656lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1657 bt_self_message_iterator *self_msg_it)
4f74db52 1658{
0f5c5d5c
SM
1659 lttng_live_msg_iter *msg_iter = new lttng_live_msg_iter {lttng_live_comp->logger};
1660 msg_iter->self_comp = lttng_live_comp->self_comp;
1661 msg_iter->lttng_live_comp = lttng_live_comp;
1662 msg_iter->self_msg_iter = self_msg_it;
4164020e 1663
0f5c5d5c
SM
1664 msg_iter->active_stream_iter = 0;
1665 msg_iter->last_msg_ts_ns = INT64_MIN;
1666 msg_iter->was_interrupted = false;
4f74db52 1667
0f5c5d5c 1668 msg_iter->sessions =
4164020e 1669 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
0f5c5d5c 1670 BT_ASSERT(msg_iter->sessions);
4164020e 1671
0f5c5d5c 1672 return msg_iter;
4f74db52
FD
1673}
1674
4164020e
SM
1675bt_message_iterator_class_initialize_method_status
1676lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
7d91f1ac 1677 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
7cdc2bab 1678{
1e690349
SM
1679 try {
1680 bt_message_iterator_class_initialize_method_status status;
1681 struct lttng_live_component *lttng_live;
1682 struct lttng_live_msg_iter *lttng_live_msg_iter;
1683 enum lttng_live_viewer_status viewer_status;
1684 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1685
1686 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1687
1688 /* There can be only one downstream iterator at the same time. */
1689 BT_ASSERT(!lttng_live->has_msg_iter);
1690 lttng_live->has_msg_iter = true;
1691
1692 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1693 if (!lttng_live_msg_iter) {
1694 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live->logger,
1695 "Failed to create lttng_live_msg_iter");
1696 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1697 goto error;
1698 }
4164020e 1699
1e690349 1700 viewer_status = live_viewer_connection_create(
2780ec75 1701 lttng_live->params.url.c_str(), false, lttng_live_msg_iter, lttng_live_msg_iter->logger,
1e690349
SM
1702 &lttng_live_msg_iter->viewer_connection);
1703 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1704 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1705 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1706 "Failed to create viewer connection");
1707 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1708 /*
f8a11f24
SM
1709 * Interruption in the _iter_init() method is not
1710 * supported. Return an error.
1711 */
1e690349
SM
1712 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1713 "Interrupted while creating viewer connection");
1714 }
10265ebe 1715
1e690349
SM
1716 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1717 goto error;
1718 }
4164020e 1719
1e690349
SM
1720 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1721 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1722 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1723 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1724 "Failed to create viewer session");
1725 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1726 /*
f8a11f24
SM
1727 * Interruption in the _iter_init() method is not
1728 * supported. Return an error.
1729 */
1e690349
SM
1730 BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
1731 "Interrupted when creating viewer session");
1732 }
4164020e 1733
10265ebe 1734 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e 1735 goto error;
4164020e 1736 }
4164020e 1737
1e690349
SM
1738 if (lttng_live_msg_iter->sessions->len == 0) {
1739 switch (lttng_live->params.sess_not_found_act) {
1740 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1741 BT_CPPLOGI_SPEC(
1742 lttng_live_msg_iter->logger,
1743 "Unable to connect to the requested live viewer session. "
1744 "Keep trying to connect because of {}=\"{}\" component parameter: url=\"{}\"",
1745 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2780ec75 1746 lttng_live->params.url);
1e690349
SM
1747 break;
1748 case SESSION_NOT_FOUND_ACTION_FAIL:
1749 BT_CPPLOGE_APPEND_CAUSE_SPEC(
1750 lttng_live_msg_iter->logger,
1751 "Unable to connect to the requested live viewer session. "
1752 "Fail the message iterator initialization because of {}=\"{}\" "
1753 "component parameter: url =\"{}\"",
1754 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
2780ec75 1755 lttng_live->params.url);
1e690349
SM
1756 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1757 goto error;
1758 case SESSION_NOT_FOUND_ACTION_END:
1759 BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
1760 "Unable to connect to the requested live viewer session. "
1761 "End gracefully at the first _next() call because of {}=\"{}\""
1762 " component parameter: url=\"{}\"",
1763 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
2780ec75 1764 lttng_live->params.url);
1e690349
SM
1765 break;
1766 default:
1767 bt_common_abort();
1768 }
1769 }
1770
1771 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1772 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1773 goto end;
f79c2d7a 1774
14f28187 1775error:
1e690349 1776 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1777end:
1e690349
SM
1778 return status;
1779 } catch (const std::bad_alloc&) {
1780 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1781 } catch (const bt2::Error&) {
1782 return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1783 }
7cdc2bab
MD
1784}
1785
80aff5ef 1786static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
88730e42
SM
1787 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1788 bt_param_validation_value_descr::makeString()},
4164020e
SM
1789 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1790
1791static bt_component_class_query_method_status
1792lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
0f5c5d5c 1793 const bt2c::Logger& logger)
7cdc2bab 1794{
4164020e
SM
1795 bt_component_class_query_method_status status;
1796 const bt_value *url_value = NULL;
1797 const char *url;
1798 struct live_viewer_connection *viewer_connection = NULL;
1799 enum lttng_live_viewer_status viewer_status;
1800 enum bt_param_validation_status validation_status;
1801 gchar *validate_error = NULL;
1802
1803 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1804 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1805 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1806 goto error;
1807 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1808 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
0f5c5d5c 1809 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validate_error);
4164020e
SM
1810 goto error;
1811 }
1812
1813 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1814 url = bt_value_string_get(url_value);
1815
0f5c5d5c 1816 viewer_status = live_viewer_connection_create(url, true, NULL, logger, &viewer_connection);
4164020e
SM
1817 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1818 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c 1819 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to create viewer connection");
4164020e
SM
1820 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1821 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1822 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1823 } else {
1824 bt_common_abort();
1825 }
1826 goto error;
1827 }
1828
1829 status = live_viewer_connection_list_sessions(viewer_connection, result);
1830 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
0f5c5d5c 1831 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Failed to list viewer sessions");
4164020e
SM
1832 goto error;
1833 }
1834
1835 goto end;
c7eee084 1836
7cdc2bab 1837error:
4164020e 1838 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1839
4164020e
SM
1840 if (status >= 0) {
1841 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1842 }
c7eee084 1843
7cdc2bab 1844end:
4164020e
SM
1845 if (viewer_connection) {
1846 live_viewer_connection_destroy(viewer_connection);
1847 }
80aff5ef 1848
4164020e 1849 g_free(validate_error);
80aff5ef 1850
4164020e 1851 return status;
7cdc2bab
MD
1852}
1853
4164020e
SM
1854static bt_component_class_query_method_status
1855lttng_live_query_support_info(const bt_value *params, const bt_value **result,
0f5c5d5c 1856 const bt2c::Logger& logger)
312df793 1857{
4164020e
SM
1858 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1859 const bt_value *input_type_value;
1860 const bt_value *input_value;
1861 double weight = 0;
dd420a9b 1862 struct bt_common_lttng_live_url_parts parts = {};
4164020e
SM
1863
1864 /* Used by the logging macros */
1865 __attribute__((unused)) bt_self_component *self_comp = NULL;
1866
1867 *result = NULL;
1868 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1869 if (!input_type_value) {
0f5c5d5c 1870 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `type` parameter.");
4164020e
SM
1871 goto error;
1872 }
1873
1874 if (!bt_value_is_string(input_type_value)) {
0f5c5d5c 1875 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`type` parameter is not a string value.");
4164020e
SM
1876 goto error;
1877 }
1878
1879 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1880 /* We don't handle file system paths */
1881 goto create_result;
1882 }
1883
1884 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1885 if (!input_value) {
0f5c5d5c 1886 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Missing expected `input` parameter.");
4164020e
SM
1887 goto error;
1888 }
1889
1890 if (!bt_value_is_string(input_value)) {
0f5c5d5c 1891 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "`input` parameter is not a string value.");
4164020e
SM
1892 goto error;
1893 }
1894
1895 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1896 if (parts.session_name) {
1897 /*
1898 * Looks pretty much like an LTTng live URL: we got the
1899 * session name part, which forms a complete URL.
1900 */
1901 weight = .75;
1902 }
312df793
PP
1903
1904create_result:
4164020e
SM
1905 *result = bt_value_real_create_init(weight);
1906 if (!*result) {
1907 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1908 goto error;
1909 }
312df793 1910
4164020e 1911 goto end;
312df793
PP
1912
1913error:
4164020e
SM
1914 if (status >= 0) {
1915 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1916 }
312df793 1917
4164020e 1918 BT_ASSERT(!*result);
312df793
PP
1919
1920end:
4164020e
SM
1921 bt_common_destroy_lttng_live_url_parts(&parts);
1922 return status;
312df793
PP
1923}
1924
4164020e
SM
1925bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
1926 bt_private_query_executor *priv_query_exec,
1927 const char *object, const bt_value *params,
1928 __attribute__((unused)) void *method_data,
1929 const bt_value **result)
7cdc2bab 1930{
1e690349
SM
1931 try {
1932 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1933 bt2c::Logger logger {bt2::SelfComponentClass {comp_class},
1934 bt2::PrivateQueryExecutor {priv_query_exec},
1935 "PLUGIN/SRC.CTF.LTTNG-LIVE/QUERY"};
1936
1937 if (strcmp(object, "sessions") == 0) {
1938 status = lttng_live_query_list_sessions(params, result, logger);
1939 } else if (strcmp(object, "babeltrace.support-info") == 0) {
1940 status = lttng_live_query_support_info(params, result, logger);
1941 } else {
1942 BT_CPPLOGI_SPEC(logger, "Unknown query object `{}`", object);
1943 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
1944 goto end;
1945 }
14f28187
FD
1946
1947end:
1e690349
SM
1948 return status;
1949 } catch (const std::bad_alloc&) {
1950 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1951 } catch (const bt2::Error&) {
1952 return BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1953 }
7cdc2bab
MD
1954}
1955
14f28187 1956void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1957{
ef9e5f5d
SM
1958 lttng_live_component::UP {static_cast<lttng_live_component *>(
1959 bt_self_component_get_data(bt_self_component_source_as_self_component(component)))};
7cdc2bab
MD
1960}
1961
4164020e
SM
1962static enum session_not_found_action
1963parse_session_not_found_action_param(const bt_value *no_session_param)
14f28187 1964{
4164020e
SM
1965 enum session_not_found_action action;
1966 const char *no_session_act_str = bt_value_string_get(no_session_param);
1967
1968 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1969 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1970 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1971 action = SESSION_NOT_FOUND_ACTION_FAIL;
1972 } else {
1973 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
1974 action = SESSION_NOT_FOUND_ACTION_END;
1975 }
1976
1977 return action;
14f28187
FD
1978}
1979
88730e42
SM
1980static bt_param_validation_value_descr inputs_elem_descr =
1981 bt_param_validation_value_descr::makeString();
80aff5ef
SM
1982
1983static const char *sess_not_found_action_choices[] = {
4164020e
SM
1984 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1985 SESS_NOT_FOUND_ACTION_FAIL_STR,
1986 SESS_NOT_FOUND_ACTION_END_STR,
80aff5ef
SM
1987};
1988
1989static struct bt_param_validation_map_value_entry_descr params_descr[] = {
88730e42
SM
1990 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1991 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
1992 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
1993 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
4164020e
SM
1994 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1995
1996static bt_component_class_initialize_method_status
0f5c5d5c 1997lttng_live_component_create(const bt_value *params, bt_self_component_source *self_comp,
ef9e5f5d 1998 lttng_live_component::UP& component)
7cdc2bab 1999{
4164020e
SM
2000 const bt_value *inputs_value;
2001 const bt_value *url_value;
2002 const bt_value *value;
4164020e
SM
2003 enum bt_param_validation_status validation_status;
2004 gchar *validation_error = NULL;
0f5c5d5c 2005 bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
4164020e
SM
2006
2007 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2008 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
ef9e5f5d 2009 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
4164020e 2010 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
6838d8aa 2011 bt2c::GCharUP errorFreer {validation_error};
0f5c5d5c 2012 BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "{}", validation_error);
ef9e5f5d 2013 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
4164020e
SM
2014 }
2015
ef9e5f5d 2016 auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
0f5c5d5c 2017 lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
4164020e
SM
2018 lttng_live->max_query_size = MAX_QUERY_SIZE;
2019 lttng_live->has_msg_iter = false;
2020
2021 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2022 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2780ec75 2023 lttng_live->params.url = bt_value_string_get(url_value);
4164020e
SM
2024
2025 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2026 if (value) {
2027 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2028 } else {
0f5c5d5c
SM
2029 BT_CPPLOGI_SPEC(lttng_live->logger,
2030 "Optional `{}` parameter is missing: defaulting to `{}`.",
2031 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
4164020e
SM
2032 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2033 }
2034
ef9e5f5d
SM
2035 component = std::move(lttng_live);
2036 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
d3e4dcd8
PP
2037}
2038
4164020e
SM
2039bt_component_class_initialize_method_status
2040lttng_live_component_init(bt_self_component_source *self_comp_src,
7d91f1ac 2041 bt_self_component_source_configuration *, const bt_value *params, void *)
f3bc2010 2042{
1e690349 2043 try {
ef9e5f5d 2044 lttng_live_component::UP lttng_live;
1e690349
SM
2045 bt_component_class_initialize_method_status ret;
2046 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2047 bt_self_component_add_port_status add_port_status;
2048
ef9e5f5d 2049 ret = lttng_live_component_create(params, self_comp_src, lttng_live);
1e690349 2050 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
ef9e5f5d 2051 return ret;
1e690349 2052 }
4164020e 2053
1e690349
SM
2054 add_port_status =
2055 bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2056 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2057 ret = (bt_component_class_initialize_method_status) add_port_status;
ef9e5f5d 2058 return ret;
1e690349 2059 }
4164020e 2060
ef9e5f5d 2061 bt_self_component_set_data(self_comp, lttng_live.release());
7cdc2bab 2062
ef9e5f5d 2063 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
1e690349
SM
2064 } catch (const std::bad_alloc&) {
2065 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2066 } catch (const bt2::Error&) {
2067 return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2068 }
d85ef162 2069}
This page took 0.227373 seconds and 4 git commands to generate.