Remove stdbool.h includes from C++ files
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #include <glib.h>
12 #include <inttypes.h>
13 #include <unistd.h>
14
15 #include <babeltrace2/babeltrace.h>
16
17 #define BT_COMP_LOG_SELF_COMP self_comp
18 #define BT_LOG_OUTPUT_LEVEL log_level
19 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
20 #include "logging/comp-logging.h"
21
22 #include "common/assert.h"
23 #include "compat/compiler.h"
24
25 #include "plugins/common/muxing/muxing.h"
26 #include "plugins/common/param-validation/param-validation.h"
27
28 #include "data-stream.hpp"
29 #include "lttng-live.hpp"
30 #include "metadata.hpp"
31
32 #define MAX_QUERY_SIZE (256 * 1024)
33 #define URL_PARAM "url"
34 #define INPUTS_PARAM "inputs"
35 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
36 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
37 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
38 #define SESS_NOT_FOUND_ACTION_END_STR "end"
39
40 #define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__)
41
42 static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
43 {
44 switch (status) {
45 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
46 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
47 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
48 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
49 case LTTNG_LIVE_ITERATOR_STATUS_END:
50 return "LTTNG_LIVE_ITERATOR_STATUS_END";
51 case LTTNG_LIVE_ITERATOR_STATUS_OK:
52 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
53 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
54 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
55 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
56 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
57 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
58 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
59 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
60 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
61 default:
62 bt_common_abort();
63 }
64 }
65
66 static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
67 {
68 switch (state) {
69 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
70 return "ACTIVE_NO_DATA";
71 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
72 return "QUIESCENT_NO_DATA";
73 case LTTNG_LIVE_STREAM_QUIESCENT:
74 return "QUIESCENT";
75 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
76 return "ACTIVE_DATA";
77 case LTTNG_LIVE_STREAM_EOF:
78 return "EOF";
79 default:
80 return "ERROR";
81 }
82 }
83
84 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
85 enum lttng_live_stream_state new_state)
86 {
87 bt_self_component *self_comp = stream_iter->self_comp;
88 bt_logging_level log_level = stream_iter->log_level;
89
90 BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
91 ", old-state=%s, new-state=%s",
92 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
93 lttng_live_stream_state_string(new_state));
94
95 stream_iter->state = new_state;
96 }
97
98 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
99 do { \
100 BT_COMP_LOGD("Live stream iterator state=%s, " \
101 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
102 "curr-inact-ts=%" PRId64, \
103 lttng_live_stream_state_string(live_stream_iter->state), \
104 live_stream_iter->last_inactivity_ts.is_set, \
105 live_stream_iter->last_inactivity_ts.value, \
106 live_stream_iter->current_inactivity_ts); \
107 } while (0);
108
109 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
110 {
111 bool ret;
112
113 if (!msg_iter) {
114 ret = false;
115 goto end;
116 }
117
118 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
119
120 end:
121 return ret;
122 }
123
124 static struct lttng_live_trace *
125 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
126 {
127 uint64_t trace_idx;
128 struct lttng_live_trace *ret_trace = NULL;
129
130 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
131 struct lttng_live_trace *trace =
132 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
133 if (trace->id == trace_id) {
134 ret_trace = trace;
135 goto end;
136 }
137 }
138
139 end:
140 return ret_trace;
141 }
142
143 static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
144 {
145 bt_logging_level log_level = trace->log_level;
146 bt_self_component *self_comp = trace->self_comp;
147
148 BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id);
149
150 BT_ASSERT(trace->stream_iterators);
151 g_ptr_array_free(trace->stream_iterators, TRUE);
152
153 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
154 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
155
156 lttng_live_metadata_fini(trace);
157 g_free(trace);
158 }
159
160 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
161 uint64_t trace_id)
162 {
163 struct lttng_live_trace *trace = NULL;
164 bt_logging_level log_level = session->log_level;
165 bt_self_component *self_comp = session->self_comp;
166
167 BT_COMP_LOGD("Creating live trace: "
168 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
169 session->id, trace_id);
170 trace = g_new0(struct lttng_live_trace, 1);
171 if (!trace) {
172 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
173 goto error;
174 }
175 trace->log_level = session->log_level;
176 trace->self_comp = session->self_comp;
177 trace->session = session;
178 trace->id = trace_id;
179 trace->trace_class = NULL;
180 trace->trace = NULL;
181 trace->stream_iterators =
182 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
183 BT_ASSERT(trace->stream_iterators);
184 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
185 g_ptr_array_add(session->traces, trace);
186
187 goto end;
188 error:
189 g_free(trace);
190 trace = NULL;
191 end:
192 return trace;
193 }
194
195 struct lttng_live_trace *
196 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
197 uint64_t trace_id)
198 {
199 struct lttng_live_trace *trace;
200
201 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
202 if (trace) {
203 goto end;
204 }
205
206 /* The session is the owner of the newly created trace. */
207 trace = lttng_live_create_trace(session, trace_id);
208
209 end:
210 return trace;
211 }
212
213 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
214 const char *hostname, const char *session_name)
215 {
216 int ret = 0;
217 struct lttng_live_session *session;
218 bt_logging_level log_level = lttng_live_msg_iter->log_level;
219 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
220
221 BT_COMP_LOGD("Adding live session: "
222 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
223 session_id, hostname, session_name);
224
225 session = g_new0(struct lttng_live_session, 1);
226 if (!session) {
227 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
228 goto error;
229 }
230
231 session->log_level = lttng_live_msg_iter->log_level;
232 session->self_comp = lttng_live_msg_iter->self_comp;
233 session->id = session_id;
234 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
235 BT_ASSERT(session->traces);
236 session->lttng_live_msg_iter = lttng_live_msg_iter;
237 session->new_streams_needed = true;
238 session->hostname = g_string_new(hostname);
239 BT_ASSERT(session->hostname);
240
241 session->session_name = g_string_new(session_name);
242 BT_ASSERT(session->session_name);
243
244 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
245 goto end;
246 error:
247 g_free(session);
248 ret = -1;
249 end:
250 return ret;
251 }
252
253 static void lttng_live_destroy_session(struct lttng_live_session *session)
254 {
255 bt_logging_level log_level;
256 bt_self_component *self_comp;
257
258 if (!session) {
259 goto end;
260 }
261
262 log_level = session->log_level;
263 self_comp = session->self_comp;
264 BT_COMP_LOGD("Destroying live session: "
265 "session-id=%" PRIu64 ", session-name=\"%s\"",
266 session->id, session->session_name->str);
267 if (session->id != -1ULL) {
268 if (lttng_live_session_detach(session)) {
269 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
270 /* Old relayd cannot detach sessions. */
271 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id);
272 }
273 }
274 session->id = -1ULL;
275 }
276
277 if (session->traces) {
278 g_ptr_array_free(session->traces, TRUE);
279 }
280
281 if (session->hostname) {
282 g_string_free(session->hostname, TRUE);
283 }
284 if (session->session_name) {
285 g_string_free(session->session_name, TRUE);
286 }
287 g_free(session);
288
289 end:
290 return;
291 }
292
293 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
294 {
295 if (!lttng_live_msg_iter) {
296 goto end;
297 }
298
299 if (lttng_live_msg_iter->sessions) {
300 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
301 }
302
303 if (lttng_live_msg_iter->viewer_connection) {
304 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
305 }
306 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
307 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
308
309 /* All stream iterators must be destroyed at this point. */
310 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
311 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
312
313 g_free(lttng_live_msg_iter);
314
315 end:
316 return;
317 }
318
319 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
320 {
321 struct lttng_live_msg_iter *lttng_live_msg_iter;
322
323 BT_ASSERT(self_msg_iter);
324
325 lttng_live_msg_iter =
326 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
327 BT_ASSERT(lttng_live_msg_iter);
328 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
329 }
330
331 static enum lttng_live_iterator_status
332 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
333 {
334 bt_logging_level log_level = lttng_live_stream->log_level;
335 bt_self_component *self_comp = lttng_live_stream->self_comp;
336
337 switch (lttng_live_stream->state) {
338 case LTTNG_LIVE_STREAM_QUIESCENT:
339 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
340 break;
341 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
342 /* Invalid state. */
343 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
344 bt_common_abort();
345 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
346 /* Invalid state. */
347 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
348 bt_common_abort();
349 case LTTNG_LIVE_STREAM_EOF:
350 break;
351 }
352 return LTTNG_LIVE_ITERATOR_STATUS_OK;
353 }
354
355 /*
356 * For active no data stream, fetch next index. As a result of that it can
357 * become either:
358 * - quiescent: won't have events for a bit,
359 * - have data: need to get that data and produce the event,
360 * - have no data on this stream at this point: need to retry (AGAIN) or return
361 * EOF.
362 */
363 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
364 struct lttng_live_msg_iter *lttng_live_msg_iter,
365 struct lttng_live_stream_iterator *lttng_live_stream)
366 {
367 bt_logging_level log_level = lttng_live_msg_iter->log_level;
368 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
369 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
370 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
371 struct packet_index index;
372
373 if (lttng_live_stream->trace->metadata_stream_state ==
374 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
375 BT_COMP_LOGD(
376 "Need to get an update for the metadata stream before proceeding further with this stream: "
377 "stream-name=\"%s\"",
378 lttng_live_stream->name->str);
379 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
380 goto end;
381 }
382
383 if (lttng_live_stream->trace->session->new_streams_needed) {
384 BT_COMP_LOGD(
385 "Need to get an update of all streams before proceeding further with this stream: "
386 "stream-name=\"%s\"",
387 lttng_live_stream->name->str);
388 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
389 goto end;
390 }
391
392 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
393 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
394 goto end;
395 }
396 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
397 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
398 goto end;
399 }
400
401 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
402
403 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
404 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
405 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
406
407 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
408 /*
409 * Because the stream is in the QUIESCENT_NO_DATA
410 * state, we can assert that the last_inactivity_ts was
411 * set and can be safely used in the `if` above.
412 */
413 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
414
415 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
416 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
417 } else {
418 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
419 }
420 goto end;
421 }
422
423 lttng_live_stream->base_offset = index.offset;
424 lttng_live_stream->offset = index.offset;
425 lttng_live_stream->len = index.packet_size / CHAR_BIT;
426
427 BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
428 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
429 ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
430 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
431 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
432
433 end:
434 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
435 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
436 }
437 return ret;
438 }
439
440 /*
441 * Creation of the message requires the ctf trace class to be created
442 * beforehand, but the live protocol gives us all streams (including metadata)
443 * at once. So we split it in three steps: getting streams, getting metadata
444 * (which creates the ctf trace class), and then creating the per-stream
445 * messages.
446 */
447 static enum lttng_live_iterator_status
448 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
449 struct lttng_live_session *session)
450 {
451 bt_logging_level log_level = lttng_live_msg_iter->log_level;
452 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
453 enum lttng_live_iterator_status status;
454 uint64_t trace_idx;
455
456 if (!session->attached) {
457 BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id);
458 enum lttng_live_viewer_status attach_status =
459 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
460 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
461 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
462 /*
463 * Clear any causes appended in
464 * `lttng_live_attach_session()` as we want to
465 * return gracefully since the graph was
466 * cancelled.
467 */
468 bt_current_thread_clear_error();
469 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
470 } else {
471 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
472 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session");
473 }
474 goto end;
475 }
476 }
477
478 BT_COMP_LOGD("Updating all data streams: "
479 "session-id=%" PRIu64 ", session-name=\"%s\"",
480 session->id, session->session_name->str);
481
482 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
483 switch (status) {
484 case LTTNG_LIVE_ITERATOR_STATUS_OK:
485 break;
486 case LTTNG_LIVE_ITERATOR_STATUS_END:
487 /*
488 * We received a `_END` from the `_get_new_streams()` function,
489 * which means no more data will ever be received from the data
490 * streams of this session. But it's possible that the metadata
491 * is incomplete.
492 * The live protocol guarantees that we receive all the
493 * metadata needed before we receive data streams needing it.
494 * But it's possible to receive metadata NOT needed by
495 * data streams after the session was closed. For example, this
496 * could happen if a new event is registered and the session is
497 * stopped before any tracepoint for that event is actually
498 * fired.
499 */
500 BT_COMP_LOGD(
501 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
502 "session-id=%" PRIu64 ", session-name=\"%s\"",
503 session->id, session->session_name->str);
504 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
505 break;
506 default:
507 goto end;
508 }
509
510 BT_COMP_LOGD("Updating metadata stream for session: "
511 "session-id=%" PRIu64 ", session-name=\"%s\"",
512 session->id, session->session_name->str);
513
514 trace_idx = 0;
515 while (trace_idx < session->traces->len) {
516 struct lttng_live_trace *trace =
517 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
518
519 status = lttng_live_metadata_update(trace);
520 switch (status) {
521 case LTTNG_LIVE_ITERATOR_STATUS_END:
522 case LTTNG_LIVE_ITERATOR_STATUS_OK:
523 trace_idx++;
524 break;
525 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
526 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
527 goto end;
528 default:
529 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
530 "Error updating trace metadata: "
531 "stream-iter-status=%s, trace-id=%" PRIu64,
532 lttng_live_iterator_status_string(status), trace->id);
533 goto end;
534 }
535 }
536
537 /*
538 * Now that we have the metadata we can initialize the downstream
539 * iterator.
540 */
541 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
542
543 end:
544 return status;
545 }
546
547 static void
548 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
549 {
550 uint64_t session_idx, trace_idx;
551 bt_logging_level log_level = lttng_live_msg_iter->log_level;
552 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
553
554 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
555 struct lttng_live_session *session =
556 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
557 BT_COMP_LOGD("Force marking session as needing new streams: "
558 "session-id=%" PRIu64,
559 session->id);
560 session->new_streams_needed = true;
561 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
562 struct lttng_live_trace *trace =
563 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
564 BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
565 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
566 session->id, trace->id);
567
568 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
569
570 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
571 }
572 }
573 }
574
575 static enum lttng_live_iterator_status
576 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
577 {
578 enum lttng_live_iterator_status status;
579 enum lttng_live_viewer_status viewer_status;
580 bt_logging_level log_level = lttng_live_msg_iter->log_level;
581 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
582 uint64_t session_idx = 0, nr_sessions_opened = 0;
583 struct lttng_live_session *session;
584 enum session_not_found_action sess_not_found_act =
585 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
586
587 BT_COMP_LOGD("Update data and metadata of all sessions: "
588 "live-msg-iter-addr=%p",
589 lttng_live_msg_iter);
590 /*
591 * In a remotely distant future, we could add a "new
592 * session" flag to the protocol, which would tell us that we
593 * need to query for new sessions even though we have sessions
594 * currently ongoing.
595 */
596 if (lttng_live_msg_iter->sessions->len == 0) {
597 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
598 BT_COMP_LOGD(
599 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
600 status = LTTNG_LIVE_ITERATOR_STATUS_END;
601 goto end;
602 } else {
603 BT_COMP_LOGD(
604 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
605 /*
606 * Retry to create a viewer session for the requested
607 * session name.
608 */
609 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
610 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
611 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
612 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
613 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
614 "Error creating LTTng live viewer session");
615 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
616 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
617 } else {
618 bt_common_abort();
619 }
620 goto end;
621 }
622 }
623 }
624
625 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
626 session =
627 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
628 status = lttng_live_get_session(lttng_live_msg_iter, session);
629 switch (status) {
630 case LTTNG_LIVE_ITERATOR_STATUS_OK:
631 case LTTNG_LIVE_ITERATOR_STATUS_END:
632 /*
633 * A session returned `_END`. Other sessions may still
634 * be active so we override the status and continue
635 * looping if needed.
636 */
637 break;
638 default:
639 goto end;
640 }
641 if (!session->closed) {
642 nr_sessions_opened++;
643 }
644 }
645
646 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
647 status = LTTNG_LIVE_ITERATOR_STATUS_END;
648 } else {
649 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
650 }
651
652 end:
653 return status;
654 }
655
656 static enum lttng_live_iterator_status
657 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
658 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
659 uint64_t timestamp)
660 {
661 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
662 bt_logging_level log_level = lttng_live_msg_iter->log_level;
663 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
664 bt_message *msg = NULL;
665
666 BT_ASSERT(stream_iter->trace->clock_class);
667
668 BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
669 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
670 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
671
672 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
673 stream_iter->trace->clock_class, timestamp);
674 if (!msg) {
675 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message");
676 goto error;
677 }
678
679 *message = msg;
680 end:
681 return ret;
682
683 error:
684 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
685 bt_message_put_ref(msg);
686 goto end;
687 }
688
689 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
690 struct lttng_live_msg_iter *lttng_live_msg_iter,
691 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
692 {
693 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
694
695 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
696 return LTTNG_LIVE_ITERATOR_STATUS_OK;
697 }
698
699 /*
700 * Check if we already sent an inactivty message downstream for this
701 * `current_inactivity_ts` value.
702 */
703 if (lttng_live_stream->last_inactivity_ts.is_set &&
704 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
705 lttng_live_stream_iterator_set_state(lttng_live_stream,
706 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
707
708 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
709 goto end;
710 }
711
712 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
713 lttng_live_stream->current_inactivity_ts);
714
715 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
716 lttng_live_stream->last_inactivity_ts.is_set = true;
717 end:
718 return ret;
719 }
720
721 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
722 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
723 {
724 const bt_clock_snapshot *clock_snapshot = NULL;
725 int ret = 0;
726 bt_logging_level log_level = lttng_live_msg_iter->log_level;
727 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
728
729 BT_ASSERT_DBG(msg);
730 BT_ASSERT_DBG(ts_ns);
731
732 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
733 "last-msg-ts=%" PRId64,
734 lttng_live_msg_iter, msg, last_msg_ts_ns);
735
736 switch (bt_message_get_type(msg)) {
737 case BT_MESSAGE_TYPE_EVENT:
738 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
739 break;
740 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
741 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
742 break;
743 case BT_MESSAGE_TYPE_PACKET_END:
744 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
745 break;
746 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
747 clock_snapshot =
748 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
749 break;
750 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
751 clock_snapshot =
752 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
753 break;
754 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
755 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
756 break;
757 default:
758 /* All the other messages have a higher priority */
759 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
760 *ts_ns = last_msg_ts_ns;
761 goto end;
762 }
763
764 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
765 if (ret) {
766 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
767 "Cannot get nanoseconds from Epoch of clock snapshot: "
768 "clock-snapshot-addr=%p",
769 clock_snapshot);
770 goto error;
771 }
772
773 goto end;
774
775 error:
776 ret = -1;
777
778 end:
779 if (ret == 0) {
780 BT_COMP_LOGD("Found message's timestamp: "
781 "iter-data-addr=%p, msg-addr=%p, "
782 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
783 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
784 }
785
786 return ret;
787 }
788
789 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
790 struct lttng_live_msg_iter *lttng_live_msg_iter,
791 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
792 {
793 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
794 bt_logging_level log_level = lttng_live_msg_iter->log_level;
795 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
796 enum ctf_msg_iter_status status;
797 uint64_t session_idx, trace_idx;
798
799 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
800 struct lttng_live_session *session =
801 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
802
803 if (session->new_streams_needed) {
804 BT_COMP_LOGD("Need an update for streams: "
805 "session-id=%" PRIu64,
806 session->id);
807 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
808 goto end;
809 }
810 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
811 struct lttng_live_trace *trace =
812 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
813 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
814 BT_COMP_LOGD("Need an update for metadata stream: "
815 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
816 session->id, trace->id);
817 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
818 goto end;
819 }
820 }
821 }
822
823 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
824 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
825 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
826 "Invalid state of live stream iterator"
827 "stream-iter-status=%s",
828 lttng_live_stream_state_string(lttng_live_stream->state));
829 goto end;
830 }
831
832 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
833 switch (status) {
834 case CTF_MSG_ITER_STATUS_EOF:
835 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
836 break;
837 case CTF_MSG_ITER_STATUS_OK:
838 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
839 break;
840 case CTF_MSG_ITER_STATUS_AGAIN:
841 /*
842 * Continue immediately (end of packet). The next
843 * get_index may return AGAIN to delay the following
844 * attempt.
845 */
846 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
847 break;
848 case CTF_MSG_ITER_STATUS_ERROR:
849 default:
850 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
851 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
852 "CTF message iterator failed to get next message: "
853 "msg-iter=%p, msg-iter-status=%s",
854 lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status));
855 break;
856 }
857
858 end:
859 return ret;
860 }
861
862 static enum lttng_live_iterator_status
863 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
864 struct lttng_live_stream_iterator *stream_iter,
865 const bt_message **curr_msg)
866 {
867 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
868 bt_logging_level log_level = lttng_live_msg_iter->log_level;
869 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
870
871 BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
872 "viewer-stream-id=%" PRIu64,
873 stream_iter->name->str, stream_iter->viewer_stream_id);
874
875 /*
876 * The viewer has hung up on us so we are closing the stream. The
877 * `ctf_msg_iter` should simply realize that it needs to close the
878 * stream properly by emitting the necessary stream end message.
879 */
880 enum ctf_msg_iter_status status =
881 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
882
883 if (status == CTF_MSG_ITER_STATUS_ERROR) {
884 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
885 "Error getting the next message from CTF message iterator");
886 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
887 goto end;
888 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
889 BT_COMP_LOGI("Reached the end of the live stream iterator.");
890 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
891 goto end;
892 }
893
894 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
895
896 end:
897 return live_status;
898 }
899
900 /*
901 * helper function:
902 * handle_no_data_streams()
903 * retry:
904 * - for each ACTIVE_NO_DATA stream:
905 * - query relayd for stream data, or quiescence info.
906 * - if need metadata, get metadata, goto retry.
907 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
908 * - if quiescent, move to QUIESCENT streams
909 * - if fetched data, move to ACTIVE_DATA streams
910 * (at this point each stream either has data, or is quiescent)
911 *
912 *
913 * iterator_next:
914 * handle_new_streams_and_metadata()
915 * - query relayd for known streams, add them as ACTIVE_NO_DATA
916 * - query relayd for metadata
917 *
918 * call handle_active_no_data_streams()
919 *
920 * handle_quiescent_streams()
921 * - if at least one stream is ACTIVE_DATA:
922 * - peek stream event with lowest timestamp -> next_ts
923 * - for each quiescent stream
924 * - if next_ts >= quiescent end
925 * - set state to ACTIVE_NO_DATA
926 * - else
927 * - for each quiescent stream
928 * - set state to ACTIVE_NO_DATA
929 *
930 * call handle_active_no_data_streams()
931 *
932 * handle_active_data_streams()
933 * - if at least one stream is ACTIVE_DATA:
934 * - get stream event with lowest timestamp from heap
935 * - make that stream event the current message.
936 * - move this stream heap position to its next event
937 * - if we need to fetch data from relayd, move
938 * stream to ACTIVE_NO_DATA.
939 * - return OK
940 * - return AGAIN
941 *
942 * end criterion: ctrl-c on client. If relayd exits or the session
943 * closes on the relay daemon side, we keep on waiting for streams.
944 * Eventually handle --end timestamp (also an end criterion).
945 *
946 * When disconnected from relayd: try to re-connect endlessly.
947 */
948 static enum lttng_live_iterator_status
949 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
950 struct lttng_live_stream_iterator *stream_iter,
951 const bt_message **curr_msg)
952 {
953 bt_logging_level log_level = lttng_live_msg_iter->log_level;
954 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
955 enum lttng_live_iterator_status live_status;
956
957 BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
958 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
959 stream_iter->name->str, stream_iter->viewer_stream_id);
960
961 if (stream_iter->has_stream_hung_up) {
962 /*
963 * The stream has hung up and the stream was properly closed
964 * during the last call to the current function. Return _END
965 * status now so that this stream iterator is removed for the
966 * stream iterator list.
967 */
968 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
969 goto end;
970 }
971
972 retry:
973 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
974
975 /*
976 * Make sure we have the most recent metadata and possibly some new
977 * streams.
978 */
979 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
980 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
981 goto end;
982 }
983
984 live_status =
985 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
986 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
987 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
988 /*
989 * We overwrite `live_status` since `curr_msg` is
990 * likely set to a valid message in this function.
991 */
992 live_status =
993 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
994 }
995 goto end;
996 }
997
998 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
999 stream_iter, curr_msg);
1000 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1001 BT_ASSERT(!*curr_msg);
1002 goto end;
1003 }
1004 if (*curr_msg) {
1005 goto end;
1006 }
1007 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
1008 stream_iter, curr_msg);
1009 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1010 BT_ASSERT(!*curr_msg);
1011 }
1012
1013 end:
1014 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
1015 BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
1016 goto retry;
1017 }
1018
1019 BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, "
1020 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1021 lttng_live_iterator_status_string(live_status), stream_iter->name->str,
1022 stream_iter->viewer_stream_id);
1023
1024 return live_status;
1025 }
1026
1027 static bool is_discarded_packet_or_event_message(const bt_message *msg)
1028 {
1029 const enum bt_message_type msg_type = bt_message_get_type(msg);
1030
1031 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
1032 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
1033 }
1034
1035 static enum lttng_live_iterator_status
1036 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
1037 const bt_message *msg_in, bt_message **msg_out,
1038 uint64_t new_begin_ts)
1039 {
1040 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1041 enum bt_property_availability availability;
1042 const bt_clock_snapshot *clock_snapshot;
1043 uint64_t end_ts;
1044 uint64_t count;
1045
1046 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
1047 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1048
1049 availability = bt_message_discarded_packets_get_count(msg_in, &count);
1050 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1051
1052 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
1053 iter, stream, new_begin_ts, end_ts);
1054 if (!*msg_out) {
1055 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1056 goto end;
1057 }
1058
1059 bt_message_discarded_packets_set_count(*msg_out, count);
1060 end:
1061 return status;
1062 }
1063
1064 static enum lttng_live_iterator_status
1065 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
1066 const bt_message *msg_in, bt_message **msg_out,
1067 uint64_t new_begin_ts)
1068 {
1069 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1070 enum bt_property_availability availability;
1071 const bt_clock_snapshot *clock_snapshot;
1072 uint64_t end_ts;
1073 uint64_t count;
1074
1075 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
1076 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1077
1078 availability = bt_message_discarded_events_get_count(msg_in, &count);
1079 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1080
1081 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
1082 iter, stream, new_begin_ts, end_ts);
1083 if (!*msg_out) {
1084 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1085 goto end;
1086 }
1087
1088 bt_message_discarded_events_set_count(*msg_out, count);
1089 end:
1090 return status;
1091 }
1092
1093 static enum lttng_live_iterator_status
1094 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1095 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1096 const bt_message *late_msg)
1097 {
1098 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1099 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1100 const bt_clock_class *clock_class;
1101 const bt_stream_class *stream_class;
1102 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1103 int64_t last_inactivity_ts_ns;
1104 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1105 enum lttng_live_iterator_status adjust_status;
1106 bt_message *adjusted_message;
1107
1108 /*
1109 * The timestamp of the current message is before the last message sent
1110 * by this component. We CANNOT send it as is.
1111 *
1112 * The only expected scenario in which that could happen is the
1113 * following, everything else is a bug in this component, relay deamon,
1114 * or CTF parser.
1115 *
1116 * Expected scenario: The CTF message iterator emitted discarded
1117 * packets and discarded events with synthesized beginning and end
1118 * timestamps from the bounds of the last known packet and the newly
1119 * decoded packet header. The CTF message iterator is not aware of
1120 * stream inactivity beacons. Hence, we have to adjust the beginning
1121 * timestamp of those types of messages if a stream signalled its
1122 * inactivity up until _after_ the last known packet's beginning
1123 * timestamp.
1124 *
1125 * Otherwise, the monotonicity guarantee of message timestamps would
1126 * not be preserved.
1127 *
1128 * In short, the only scenario in which it's okay and fixable to
1129 * received a late message is when:
1130 * 1. the late message is a discarded packets or discarded events
1131 * message,
1132 * 2. this stream produced an inactivity message downstream, and
1133 * 3. the timestamp of the late message is within the inactivity
1134 * timespan we sent downstream through the inactivity message.
1135 */
1136
1137 BT_COMP_LOGD("Handling late message on live stream iterator: "
1138 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1139 stream_iter->name->str, stream_iter->viewer_stream_id);
1140
1141 if (!stream_iter->last_inactivity_ts.is_set) {
1142 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: "
1143 "have a late message when no inactivity message "
1144 "was ever sent for that stream.");
1145 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1146 goto end;
1147 }
1148
1149 if (!is_discarded_packet_or_event_message(late_msg)) {
1150 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1151 "Invalid live stream state: "
1152 "have a late message that is not a packet discarded or "
1153 "event discarded message: late-msg-type=%s",
1154 bt_common_message_type_string(bt_message_get_type(late_msg)));
1155 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1156 goto end;
1157 }
1158
1159 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1160 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1161
1162 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1163 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1164 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1165 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last "
1166 "inactivity message timestamp to nanoseconds");
1167 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1168 goto end;
1169 }
1170
1171 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1172 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1173 "Invalid live stream state: "
1174 "have a late message that is none included in a stream "
1175 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1176 "late-msg-ts-ns=%" PRIu64,
1177 last_inactivity_ts_ns, late_msg_ts_ns);
1178 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1179 goto end;
1180 }
1181
1182 /*
1183 * We now know that it's okay for this message to be late, we can now
1184 * adjust its timestamp to ensure monotonicity.
1185 */
1186 BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1187 "msg-new-ts-ns=%" PRIu64,
1188 bt_common_message_type_string(bt_message_get_type(late_msg)),
1189 stream_iter->last_inactivity_ts.value);
1190 switch (bt_message_get_type(late_msg)) {
1191 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1192 adjust_status = adjust_discarded_events_message(
1193 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1194 stream_iter->last_inactivity_ts.value);
1195 break;
1196 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1197 adjust_status = adjust_discarded_packets_message(
1198 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1199 stream_iter->last_inactivity_ts.value);
1200 break;
1201 default:
1202 bt_common_abort();
1203 }
1204
1205 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1206 stream_iter_status = adjust_status;
1207 goto end;
1208 }
1209
1210 BT_ASSERT_DBG(adjusted_message);
1211 stream_iter->current_msg = adjusted_message;
1212 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1213 bt_message_put_ref(late_msg);
1214
1215 end:
1216 return stream_iter_status;
1217 }
1218
1219 static enum lttng_live_iterator_status
1220 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1221 struct lttng_live_trace *live_trace,
1222 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1223 {
1224 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1225 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1226 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1227 enum lttng_live_iterator_status stream_iter_status;
1228 ;
1229 int64_t youngest_candidate_msg_ts = INT64_MAX;
1230 uint64_t stream_iter_idx;
1231
1232 BT_ASSERT_DBG(live_trace);
1233 BT_ASSERT_DBG(live_trace->stream_iterators);
1234
1235 BT_COMP_LOGD("Finding the next stream iterator for trace: "
1236 "trace-id=%" PRIu64,
1237 live_trace->id);
1238 /*
1239 * Update the current message of every stream iterators of this trace.
1240 * The current msg of every stream must have a timestamp equal or
1241 * larger than the last message returned by this iterator. We must
1242 * ensure monotonicity.
1243 */
1244 stream_iter_idx = 0;
1245 while (stream_iter_idx < live_trace->stream_iterators->len) {
1246 bool stream_iter_is_ended = false;
1247 struct lttng_live_stream_iterator *stream_iter =
1248 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1249 stream_iter_idx);
1250
1251 /*
1252 * If there is no current message for this stream, go fetch
1253 * one.
1254 */
1255 while (!stream_iter->current_msg) {
1256 const bt_message *msg = NULL;
1257 int64_t curr_msg_ts_ns = INT64_MAX;
1258
1259 stream_iter_status =
1260 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1261
1262 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1263 stream_iter_is_ended = true;
1264 break;
1265 }
1266
1267 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1268 goto end;
1269 }
1270
1271 BT_ASSERT_DBG(msg);
1272
1273 BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, "
1274 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1275 bt_common_message_type_string(bt_message_get_type(msg)),
1276 stream_iter->name->str, stream_iter->viewer_stream_id);
1277
1278 /*
1279 * Get the timestamp in nanoseconds from origin of this
1280 * messsage.
1281 */
1282 live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
1283 &curr_msg_ts_ns);
1284
1285 /*
1286 * Check if the message of the current live stream
1287 * iterator occurred at the exact same time or after the
1288 * last message returned by this component's message
1289 * iterator. If not, we need to handle it with care.
1290 */
1291 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1292 stream_iter->current_msg = msg;
1293 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1294 } else {
1295 /*
1296 * We received a message from the past. This
1297 * may be fixable but it can also be an error.
1298 */
1299 stream_iter_status =
1300 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1301 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1302 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1303 "Late message could not be handled correctly: "
1304 "lttng-live-msg-iter-addr=%p, "
1305 "stream-name=\"%s\", "
1306 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1307 lttng_live_msg_iter, stream_iter->name->str,
1308 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1309 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1310 goto end;
1311 }
1312 }
1313 }
1314
1315 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1316
1317 if (!stream_iter_is_ended) {
1318 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1319 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1320 /*
1321 * Update the current best candidate message
1322 * for the stream iterator of this live trace
1323 * to be forwarded downstream.
1324 */
1325 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1326 youngest_candidate_stream_iter = stream_iter;
1327 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1328 /*
1329 * Order the messages in an arbitrary but
1330 * deterministic way.
1331 */
1332 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1333 int ret = common_muxing_compare_messages(
1334 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1335 if (ret < 0) {
1336 /*
1337 * The `youngest_candidate_stream_iter->current_msg`
1338 * should go first. Update the next
1339 * iterator and the current timestamp.
1340 */
1341 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1342 youngest_candidate_stream_iter = stream_iter;
1343 } else if (ret == 0) {
1344 /*
1345 * Unable to pick which one should go
1346 * first.
1347 */
1348 BT_COMP_LOGW(
1349 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1350 "stream-iter-addr=%p"
1351 "stream-iter-addr=%p",
1352 stream_iter, youngest_candidate_stream_iter);
1353 }
1354 }
1355
1356 stream_iter_idx++;
1357 } else {
1358 /*
1359 * The live stream iterator has ended. That
1360 * iterator is removed from the array, but
1361 * there is no need to increment
1362 * stream_iter_idx as
1363 * g_ptr_array_remove_index_fast replaces the
1364 * removed element with the array's last
1365 * element.
1366 */
1367 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1368 }
1369 }
1370
1371 if (youngest_candidate_stream_iter) {
1372 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1373 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1374 } else {
1375 /*
1376 * The only case where we don't have a candidate for this trace
1377 * is if we reached the end of all the iterators.
1378 */
1379 BT_ASSERT(live_trace->stream_iterators->len == 0);
1380 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1381 }
1382
1383 end:
1384 return stream_iter_status;
1385 }
1386
1387 static enum lttng_live_iterator_status
1388 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1389 struct lttng_live_session *session,
1390 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1391 {
1392 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1393 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1394 enum lttng_live_iterator_status stream_iter_status;
1395 uint64_t trace_idx = 0;
1396 int64_t youngest_candidate_msg_ts = INT64_MAX;
1397 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1398
1399 BT_COMP_LOGD("Finding the next stream iterator for session: "
1400 "session-id=%" PRIu64,
1401 session->id);
1402 /*
1403 * Make sure we are attached to the session and look for new streams
1404 * and metadata.
1405 */
1406 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1407 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1408 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1409 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1410 goto end;
1411 }
1412
1413 BT_ASSERT_DBG(session->traces);
1414
1415 while (trace_idx < session->traces->len) {
1416 bool trace_is_ended = false;
1417 struct lttng_live_stream_iterator *stream_iter;
1418 struct lttng_live_trace *trace =
1419 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1420
1421 stream_iter_status =
1422 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1423 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1424 /*
1425 * All the live stream iterators for this trace are
1426 * ENDed. Remove the trace from this session.
1427 */
1428 trace_is_ended = true;
1429 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1430 goto end;
1431 }
1432
1433 if (!trace_is_ended) {
1434 BT_ASSERT_DBG(stream_iter);
1435
1436 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1437 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1438 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1439 youngest_candidate_stream_iter = stream_iter;
1440 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1441 /*
1442 * Order the messages in an arbitrary but
1443 * deterministic way.
1444 */
1445 int ret = common_muxing_compare_messages(
1446 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1447 if (ret < 0) {
1448 /*
1449 * The `youngest_candidate_stream_iter->current_msg`
1450 * should go first. Update the next iterator
1451 * and the current timestamp.
1452 */
1453 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1454 youngest_candidate_stream_iter = stream_iter;
1455 } else if (ret == 0) {
1456 /* Unable to pick which one should go first. */
1457 BT_COMP_LOGW(
1458 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1459 "stream-iter-addr=%p"
1460 "stream-iter-addr=%p",
1461 stream_iter, youngest_candidate_stream_iter);
1462 }
1463 }
1464 trace_idx++;
1465 } else {
1466 /*
1467 * trace_idx is not incremented since
1468 * g_ptr_array_remove_index_fast replaces the
1469 * element at trace_idx with the array's last element.
1470 */
1471 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1472 }
1473 }
1474 if (youngest_candidate_stream_iter) {
1475 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1476 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1477 } else {
1478 /*
1479 * The only cases where we don't have a candidate for this
1480 * trace is:
1481 * 1. if we reached the end of all the iterators of all the
1482 * traces of this session,
1483 * 2. if we never had live stream iterator in the first place.
1484 *
1485 * In either cases, we return END.
1486 */
1487 BT_ASSERT(session->traces->len == 0);
1488 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1489 }
1490 end:
1491 return stream_iter_status;
1492 }
1493
1494 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1495 {
1496 uint64_t i;
1497
1498 for (i = 0; i < count; i++) {
1499 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1500 }
1501 }
1502
1503 bt_message_iterator_class_next_method_status
1504 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1505 uint64_t capacity, uint64_t *count)
1506 {
1507 bt_message_iterator_class_next_method_status status;
1508 enum lttng_live_viewer_status viewer_status;
1509 struct lttng_live_msg_iter *lttng_live_msg_iter =
1510 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1511 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1512 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1513 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1514 enum lttng_live_iterator_status stream_iter_status;
1515 uint64_t session_idx;
1516
1517 *count = 0;
1518
1519 BT_ASSERT_DBG(lttng_live_msg_iter);
1520
1521 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1522 /*
1523 * The iterator was interrupted in a previous call to the
1524 * `_next()` method. We currently do not support generating
1525 * messages after such event. The babeltrace2 CLI should never
1526 * be running the graph after being interrupted. So this check
1527 * is to prevent other graph users from using this live
1528 * iterator in an messed up internal state.
1529 */
1530 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1531 BT_COMP_LOGE_APPEND_CAUSE(
1532 self_comp,
1533 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1534 goto end;
1535 }
1536
1537 /*
1538 * Clear all the invalid message reference that might be left over in
1539 * the output array.
1540 */
1541 memset(msgs, 0, capacity * sizeof(*msgs));
1542
1543 /*
1544 * If no session are exposed on the relay found at the url provided by
1545 * the user, session count will be 0. In this case, we return status
1546 * end to return gracefully.
1547 */
1548 if (lttng_live_msg_iter->sessions->len == 0) {
1549 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1550 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1551 goto end;
1552 } else {
1553 /*
1554 * The are no more active session for this session
1555 * name. Retry to create a viewer session for the
1556 * requested session name.
1557 */
1558 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1559 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1560 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1561 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1562 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1563 "Error creating LTTng live viewer session");
1564 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1565 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1566 } else {
1567 bt_common_abort();
1568 }
1569 goto end;
1570 }
1571 }
1572 }
1573
1574 if (lttng_live_msg_iter->active_stream_iter == 0) {
1575 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1576 }
1577
1578 /*
1579 * Here the muxing of message is done.
1580 *
1581 * We need to iterate over all the streams of all the traces of all the
1582 * viewer sessions in order to get the message with the smallest
1583 * timestamp. In this case, a session is a viewer session and there is
1584 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1585 * kernel). Each viewer session can have multiple traces, for example,
1586 * 64bit UST viewer sessions could have multiple per-pid traces.
1587 *
1588 * We iterate over the streams of each traces to update and see what is
1589 * their next message's timestamp. From those timestamps, we select the
1590 * message with the smallest timestamp as the best candidate message
1591 * for that trace and do the same thing across all the sessions.
1592 *
1593 * We then compare the timestamp of best candidate message of all the
1594 * sessions to pick the message with the smallest timestamp and we
1595 * return it.
1596 */
1597 while (*count < capacity) {
1598 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1599 *candidate_stream_iter = NULL;
1600 int64_t youngest_msg_ts_ns = INT64_MAX;
1601
1602 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1603 session_idx = 0;
1604 while (session_idx < lttng_live_msg_iter->sessions->len) {
1605 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1606 lttng_live_msg_iter->sessions, session_idx);
1607
1608 /* Find the best candidate message to send downstream. */
1609 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1610 &candidate_stream_iter);
1611
1612 /* If we receive an END status, it means that either:
1613 * - Those traces never had active streams (UST with no
1614 * data produced yet),
1615 * - All live stream iterators have ENDed.*/
1616 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1617 if (session->closed && session->traces->len == 0) {
1618 /*
1619 * Remove the session from the list.
1620 * session_idx is not modified since
1621 * g_ptr_array_remove_index_fast
1622 * replaces the the removed element with
1623 * the array's last element.
1624 */
1625 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1626 } else {
1627 session_idx++;
1628 }
1629 continue;
1630 }
1631
1632 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1633 goto return_status;
1634 }
1635
1636 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1637 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1638 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1639 youngest_stream_iter = candidate_stream_iter;
1640 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1641 /*
1642 * The currently selected message to be sent
1643 * downstream next has the exact same timestamp
1644 * that of the current candidate message. We
1645 * must break the tie in a predictable manner.
1646 */
1647 BT_COMP_LOGD_STR(
1648 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1649 /*
1650 * Order the messages in an arbitrary but
1651 * deterministic way.
1652 */
1653 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1654 youngest_stream_iter->current_msg);
1655 if (ret < 0) {
1656 /*
1657 * The `candidate_stream_iter->current_msg`
1658 * should go first. Update the next
1659 * iterator and the current timestamp.
1660 */
1661 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1662 youngest_stream_iter = candidate_stream_iter;
1663 } else if (ret == 0) {
1664 /* Unable to pick which one should go first. */
1665 BT_COMP_LOGW(
1666 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1667 "next-stream-iter-addr=%p"
1668 "candidate-stream-iter-addr=%p",
1669 youngest_stream_iter, candidate_stream_iter);
1670 }
1671 }
1672
1673 session_idx++;
1674 }
1675
1676 if (!youngest_stream_iter) {
1677 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1678 goto return_status;
1679 }
1680
1681 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1682 /* Ensure monotonicity. */
1683 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1684 youngest_stream_iter->current_msg_ts_ns);
1685
1686 /*
1687 * Insert the next message to the message batch. This will set
1688 * stream iterator current messsage to NULL so that next time
1689 * we fetch the next message of that stream iterator
1690 */
1691 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1692 (*count)++;
1693
1694 /* Update the last timestamp in nanoseconds sent downstream. */
1695 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1696 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1697
1698 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1699 }
1700
1701 return_status:
1702 switch (stream_iter_status) {
1703 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1704 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1705 /*
1706 * If we gathered messages, return _OK even if the graph was
1707 * interrupted. This allows for the components downstream to at
1708 * least get the thoses messages. If the graph was indeed
1709 * interrupted there should not be another _next() call as the
1710 * application will tear down the graph. This component class
1711 * doesn't support restarting after an interruption.
1712 */
1713 if (*count > 0) {
1714 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1715 } else {
1716 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1717 }
1718 break;
1719 case LTTNG_LIVE_ITERATOR_STATUS_END:
1720 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1721 break;
1722 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1723 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1724 "Memory error preparing the next batch of messages: "
1725 "live-iter-status=%s",
1726 lttng_live_iterator_status_string(stream_iter_status));
1727 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1728 break;
1729 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1730 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1731 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1732 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1733 "Error preparing the next batch of messages: "
1734 "live-iter-status=%s",
1735 lttng_live_iterator_status_string(stream_iter_status));
1736
1737 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1738 /* Put all existing messages on error. */
1739 put_messages(msgs, *count);
1740 break;
1741 default:
1742 bt_common_abort();
1743 }
1744
1745 end:
1746 return status;
1747 }
1748
1749 static struct lttng_live_msg_iter *
1750 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1751 bt_self_message_iterator *self_msg_it)
1752 {
1753 bt_self_component *self_comp = lttng_live_comp->self_comp;
1754 bt_logging_level log_level = lttng_live_comp->log_level;
1755
1756 struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1757 if (!lttng_live_msg_iter) {
1758 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
1759 goto end;
1760 }
1761
1762 lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
1763 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1764 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1765 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1766
1767 lttng_live_msg_iter->active_stream_iter = 0;
1768 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1769 lttng_live_msg_iter->was_interrupted = false;
1770
1771 lttng_live_msg_iter->sessions =
1772 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1773 BT_ASSERT(lttng_live_msg_iter->sessions);
1774
1775 end:
1776 return lttng_live_msg_iter;
1777 }
1778
1779 bt_message_iterator_class_initialize_method_status
1780 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1781 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
1782 {
1783 bt_message_iterator_class_initialize_method_status status;
1784 struct lttng_live_component *lttng_live;
1785 struct lttng_live_msg_iter *lttng_live_msg_iter;
1786 enum lttng_live_viewer_status viewer_status;
1787 bt_logging_level log_level;
1788 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1789
1790 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1791 log_level = lttng_live->log_level;
1792 self_comp = lttng_live->self_comp;
1793
1794 /* There can be only one downstream iterator at the same time. */
1795 BT_ASSERT(!lttng_live->has_msg_iter);
1796 lttng_live->has_msg_iter = true;
1797
1798 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1799 if (!lttng_live_msg_iter) {
1800 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter");
1801 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1802 goto error;
1803 }
1804
1805 viewer_status = live_viewer_connection_create(
1806 self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter,
1807 &lttng_live_msg_iter->viewer_connection);
1808 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1809 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1810 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection");
1811 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1812 /*
1813 * Interruption in the _iter_init() method is not
1814 * supported. Return an error.
1815 */
1816 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection");
1817 }
1818
1819 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1820 goto error;
1821 }
1822
1823 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1824 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1825 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1826 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session");
1827 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1828 /*
1829 * Interruption in the _iter_init() method is not
1830 * supported. Return an error.
1831 */
1832 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session");
1833 }
1834
1835 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1836 goto error;
1837 }
1838
1839 if (lttng_live_msg_iter->sessions->len == 0) {
1840 switch (lttng_live->params.sess_not_found_act) {
1841 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1842 BT_COMP_LOGI(
1843 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1844 "%s=\"%s\" component parameter: url=\"%s\"",
1845 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1846 lttng_live->params.url->str);
1847 break;
1848 case SESSION_NOT_FOUND_ACTION_FAIL:
1849 BT_COMP_LOGE_APPEND_CAUSE(
1850 self_comp,
1851 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1852 "component parameter: url =\"%s\"",
1853 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1854 lttng_live->params.url->str);
1855 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1856 goto error;
1857 case SESSION_NOT_FOUND_ACTION_END:
1858 BT_COMP_LOGI(
1859 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1860 "call because of %s=\"%s\" component parameter: "
1861 "url=\"%s\"",
1862 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1863 lttng_live->params.url->str);
1864 break;
1865 default:
1866 bt_common_abort();
1867 }
1868 }
1869
1870 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1871 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1872 goto end;
1873
1874 error:
1875 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1876 end:
1877 return status;
1878 }
1879
1880 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1881 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1882 bt_param_validation_value_descr::makeString()},
1883 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1884
1885 static bt_component_class_query_method_status
1886 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1887 bt_self_component_class *self_comp_class, bt_logging_level log_level)
1888 {
1889 bt_component_class_query_method_status status;
1890 const bt_value *url_value = NULL;
1891 const char *url;
1892 struct live_viewer_connection *viewer_connection = NULL;
1893 enum lttng_live_viewer_status viewer_status;
1894 enum bt_param_validation_status validation_status;
1895 gchar *validate_error = NULL;
1896
1897 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1898 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1899 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1900 goto error;
1901 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1902 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1903 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
1904 goto error;
1905 }
1906
1907 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1908 url = bt_value_string_get(url_value);
1909
1910 viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL,
1911 &viewer_connection);
1912 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1913 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1914 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection");
1915 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1916 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1917 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1918 } else {
1919 bt_common_abort();
1920 }
1921 goto error;
1922 }
1923
1924 status = live_viewer_connection_list_sessions(viewer_connection, result);
1925 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1926 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions");
1927 goto error;
1928 }
1929
1930 goto end;
1931
1932 error:
1933 BT_VALUE_PUT_REF_AND_RESET(*result);
1934
1935 if (status >= 0) {
1936 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1937 }
1938
1939 end:
1940 if (viewer_connection) {
1941 live_viewer_connection_destroy(viewer_connection);
1942 }
1943
1944 g_free(validate_error);
1945
1946 return status;
1947 }
1948
1949 static bt_component_class_query_method_status
1950 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1951 bt_self_component_class *self_comp_class, bt_logging_level log_level)
1952 {
1953 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1954 const bt_value *input_type_value;
1955 const bt_value *input_value;
1956 double weight = 0;
1957 struct bt_common_lttng_live_url_parts parts = {};
1958
1959 /* Used by the logging macros */
1960 __attribute__((unused)) bt_self_component *self_comp = NULL;
1961
1962 *result = NULL;
1963 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1964 if (!input_type_value) {
1965 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter.");
1966 goto error;
1967 }
1968
1969 if (!bt_value_is_string(input_type_value)) {
1970 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value.");
1971 goto error;
1972 }
1973
1974 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1975 /* We don't handle file system paths */
1976 goto create_result;
1977 }
1978
1979 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1980 if (!input_value) {
1981 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter.");
1982 goto error;
1983 }
1984
1985 if (!bt_value_is_string(input_value)) {
1986 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1987 "`input` parameter is not a string value.");
1988 goto error;
1989 }
1990
1991 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1992 if (parts.session_name) {
1993 /*
1994 * Looks pretty much like an LTTng live URL: we got the
1995 * session name part, which forms a complete URL.
1996 */
1997 weight = .75;
1998 }
1999
2000 create_result:
2001 *result = bt_value_real_create_init(weight);
2002 if (!*result) {
2003 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
2004 goto error;
2005 }
2006
2007 goto end;
2008
2009 error:
2010 if (status >= 0) {
2011 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
2012 }
2013
2014 BT_ASSERT(!*result);
2015
2016 end:
2017 bt_common_destroy_lttng_live_url_parts(&parts);
2018 return status;
2019 }
2020
2021 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
2022 bt_private_query_executor *priv_query_exec,
2023 const char *object, const bt_value *params,
2024 __attribute__((unused)) void *method_data,
2025 const bt_value **result)
2026 {
2027 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2028 bt_self_component *self_comp = NULL;
2029 bt_self_component_class *self_comp_class =
2030 bt_self_component_class_source_as_self_component_class(comp_class);
2031 bt_logging_level log_level = bt_query_executor_get_logging_level(
2032 bt_private_query_executor_as_query_executor_const(priv_query_exec));
2033
2034 if (strcmp(object, "sessions") == 0) {
2035 status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level);
2036 } else if (strcmp(object, "babeltrace.support-info") == 0) {
2037 status = lttng_live_query_support_info(params, result, self_comp_class, log_level);
2038 } else {
2039 BT_COMP_LOGI("Unknown query object `%s`", object);
2040 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
2041 goto end;
2042 }
2043
2044 end:
2045 return status;
2046 }
2047
2048 static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
2049 {
2050 if (!lttng_live) {
2051 return;
2052 }
2053 if (lttng_live->params.url) {
2054 g_string_free(lttng_live->params.url, TRUE);
2055 }
2056 g_free(lttng_live);
2057 }
2058
2059 void lttng_live_component_finalize(bt_self_component_source *component)
2060 {
2061 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
2062 bt_self_component_source_as_self_component(component));
2063
2064 if (!data) {
2065 return;
2066 }
2067 lttng_live_component_destroy_data(data);
2068 }
2069
2070 static enum session_not_found_action
2071 parse_session_not_found_action_param(const bt_value *no_session_param)
2072 {
2073 enum session_not_found_action action;
2074 const char *no_session_act_str = bt_value_string_get(no_session_param);
2075
2076 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2077 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2078 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2079 action = SESSION_NOT_FOUND_ACTION_FAIL;
2080 } else {
2081 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2082 action = SESSION_NOT_FOUND_ACTION_END;
2083 }
2084
2085 return action;
2086 }
2087
2088 static bt_param_validation_value_descr inputs_elem_descr =
2089 bt_param_validation_value_descr::makeString();
2090
2091 static const char *sess_not_found_action_choices[] = {
2092 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2093 SESS_NOT_FOUND_ACTION_FAIL_STR,
2094 SESS_NOT_FOUND_ACTION_END_STR,
2095 };
2096
2097 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
2098 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2099 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2100 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2101 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
2102 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2103
2104 static bt_component_class_initialize_method_status
2105 lttng_live_component_create(const bt_value *params, bt_logging_level log_level,
2106 bt_self_component *self_comp, struct lttng_live_component **component)
2107 {
2108 struct lttng_live_component *lttng_live = NULL;
2109 const bt_value *inputs_value;
2110 const bt_value *url_value;
2111 const bt_value *value;
2112 const char *url;
2113 enum bt_param_validation_status validation_status;
2114 gchar *validation_error = NULL;
2115 bt_component_class_initialize_method_status status;
2116
2117 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2118 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2119 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2120 goto error;
2121 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2122 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
2123 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2124 goto error;
2125 }
2126
2127 lttng_live = g_new0(struct lttng_live_component, 1);
2128 if (!lttng_live) {
2129 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2130 goto end;
2131 }
2132 lttng_live->log_level = log_level;
2133 lttng_live->self_comp = self_comp;
2134 lttng_live->max_query_size = MAX_QUERY_SIZE;
2135 lttng_live->has_msg_iter = false;
2136
2137 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2138 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2139 url = bt_value_string_get(url_value);
2140
2141 lttng_live->params.url = g_string_new(url);
2142 if (!lttng_live->params.url) {
2143 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2144 goto error;
2145 }
2146
2147 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2148 if (value) {
2149 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2150 } else {
2151 BT_COMP_LOGI("Optional `%s` parameter is missing: "
2152 "defaulting to `%s`.",
2153 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2154 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2155 }
2156
2157 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2158 goto end;
2159
2160 error:
2161 lttng_live_component_destroy_data(lttng_live);
2162 lttng_live = NULL;
2163 end:
2164 g_free(validation_error);
2165
2166 *component = lttng_live;
2167 return status;
2168 }
2169
2170 bt_component_class_initialize_method_status
2171 lttng_live_component_init(bt_self_component_source *self_comp_src,
2172 bt_self_component_source_configuration *, const bt_value *params, void *)
2173 {
2174 struct lttng_live_component *lttng_live;
2175 bt_component_class_initialize_method_status ret;
2176 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2177 bt_logging_level log_level =
2178 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2179 bt_self_component_add_port_status add_port_status;
2180
2181 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
2182 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2183 goto error;
2184 }
2185
2186 add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2187 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2188 ret = (bt_component_class_initialize_method_status) add_port_status;
2189 goto end;
2190 }
2191
2192 bt_self_component_set_data(self_comp, lttng_live);
2193 goto end;
2194
2195 error:
2196 lttng_live_component_destroy_data(lttng_live);
2197 lttng_live = NULL;
2198 end:
2199 return ret;
2200 }
This page took 0.119267 seconds and 4 git commands to generate.