src.ctf.lttng-live: remove unused variable in live_get_msg_ts_ns
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #define BT_COMP_LOG_SELF_COMP self_comp
12 #define BT_LOG_OUTPUT_LEVEL log_level
13 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
14 #include "logging/comp-logging.h"
15
16 #include <inttypes.h>
17 #include <stdbool.h>
18 #include <unistd.h>
19
20 #include <glib.h>
21
22 #include "common/assert.h"
23 #include <babeltrace2/babeltrace.h>
24 #include "compat/compiler.h"
25 #include <babeltrace2/types.h>
26
27 #include "plugins/common/muxing/muxing.h"
28 #include "plugins/common/param-validation/param-validation.h"
29
30 #include "data-stream.hpp"
31 #include "metadata.hpp"
32 #include "lttng-live.hpp"
33
34 #define MAX_QUERY_SIZE (256 * 1024)
35 #define URL_PARAM "url"
36 #define INPUTS_PARAM "inputs"
37 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
38 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
39 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
40 #define SESS_NOT_FOUND_ACTION_END_STR "end"
41
42 #define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__)
43
44 static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
45 {
46 switch (status) {
47 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
48 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
49 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
50 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
51 case LTTNG_LIVE_ITERATOR_STATUS_END:
52 return "LTTNG_LIVE_ITERATOR_STATUS_END";
53 case LTTNG_LIVE_ITERATOR_STATUS_OK:
54 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
55 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
56 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
57 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
58 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
59 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
60 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
61 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
62 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
63 default:
64 bt_common_abort();
65 }
66 }
67
68 static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
69 {
70 switch (state) {
71 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
72 return "ACTIVE_NO_DATA";
73 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
74 return "QUIESCENT_NO_DATA";
75 case LTTNG_LIVE_STREAM_QUIESCENT:
76 return "QUIESCENT";
77 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
78 return "ACTIVE_DATA";
79 case LTTNG_LIVE_STREAM_EOF:
80 return "EOF";
81 default:
82 return "ERROR";
83 }
84 }
85
86 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
87 enum lttng_live_stream_state new_state)
88 {
89 bt_self_component *self_comp = stream_iter->self_comp;
90 bt_logging_level log_level = stream_iter->log_level;
91
92 BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
93 ", old-state=%s, new-state=%s",
94 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
95 lttng_live_stream_state_string(new_state));
96
97 stream_iter->state = new_state;
98 }
99
100 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
101 do { \
102 BT_COMP_LOGD("Live stream iterator state=%s, " \
103 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
104 "curr-inact-ts=%" PRId64, \
105 lttng_live_stream_state_string(live_stream_iter->state), \
106 live_stream_iter->last_inactivity_ts.is_set, \
107 live_stream_iter->last_inactivity_ts.value, \
108 live_stream_iter->current_inactivity_ts); \
109 } while (0);
110
111 BT_HIDDEN
112 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
113 {
114 bool ret;
115
116 if (!msg_iter) {
117 ret = false;
118 goto end;
119 }
120
121 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
122
123 end:
124 return ret;
125 }
126
127 static struct lttng_live_trace *
128 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
129 {
130 uint64_t trace_idx;
131 struct lttng_live_trace *ret_trace = NULL;
132
133 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
134 struct lttng_live_trace *trace =
135 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
136 if (trace->id == trace_id) {
137 ret_trace = trace;
138 goto end;
139 }
140 }
141
142 end:
143 return ret_trace;
144 }
145
146 static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
147 {
148 bt_logging_level log_level = trace->log_level;
149 bt_self_component *self_comp = trace->self_comp;
150
151 BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id);
152
153 BT_ASSERT(trace->stream_iterators);
154 g_ptr_array_free(trace->stream_iterators, TRUE);
155
156 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
157 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
158
159 lttng_live_metadata_fini(trace);
160 g_free(trace);
161 }
162
163 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
164 uint64_t trace_id)
165 {
166 struct lttng_live_trace *trace = NULL;
167 bt_logging_level log_level = session->log_level;
168 bt_self_component *self_comp = session->self_comp;
169
170 BT_COMP_LOGD("Creating live trace: "
171 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
172 session->id, trace_id);
173 trace = g_new0(struct lttng_live_trace, 1);
174 if (!trace) {
175 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
176 goto error;
177 }
178 trace->log_level = session->log_level;
179 trace->self_comp = session->self_comp;
180 trace->session = session;
181 trace->id = trace_id;
182 trace->trace_class = NULL;
183 trace->trace = NULL;
184 trace->stream_iterators =
185 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
186 BT_ASSERT(trace->stream_iterators);
187 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
188 g_ptr_array_add(session->traces, trace);
189
190 goto end;
191 error:
192 g_free(trace);
193 trace = NULL;
194 end:
195 return trace;
196 }
197
198 BT_HIDDEN
199 struct lttng_live_trace *
200 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
201 uint64_t trace_id)
202 {
203 struct lttng_live_trace *trace;
204
205 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
206 if (trace) {
207 goto end;
208 }
209
210 /* The session is the owner of the newly created trace. */
211 trace = lttng_live_create_trace(session, trace_id);
212
213 end:
214 return trace;
215 }
216
217 BT_HIDDEN
218 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
219 const char *hostname, const char *session_name)
220 {
221 int ret = 0;
222 struct lttng_live_session *session;
223 bt_logging_level log_level = lttng_live_msg_iter->log_level;
224 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
225
226 BT_COMP_LOGD("Adding live session: "
227 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
228 session_id, hostname, session_name);
229
230 session = g_new0(struct lttng_live_session, 1);
231 if (!session) {
232 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
233 goto error;
234 }
235
236 session->log_level = lttng_live_msg_iter->log_level;
237 session->self_comp = lttng_live_msg_iter->self_comp;
238 session->id = session_id;
239 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
240 BT_ASSERT(session->traces);
241 session->lttng_live_msg_iter = lttng_live_msg_iter;
242 session->new_streams_needed = true;
243 session->hostname = g_string_new(hostname);
244 BT_ASSERT(session->hostname);
245
246 session->session_name = g_string_new(session_name);
247 BT_ASSERT(session->session_name);
248
249 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
250 goto end;
251 error:
252 g_free(session);
253 ret = -1;
254 end:
255 return ret;
256 }
257
258 static void lttng_live_destroy_session(struct lttng_live_session *session)
259 {
260 bt_logging_level log_level;
261 bt_self_component *self_comp;
262
263 if (!session) {
264 goto end;
265 }
266
267 log_level = session->log_level;
268 self_comp = session->self_comp;
269 BT_COMP_LOGD("Destroying live session: "
270 "session-id=%" PRIu64 ", session-name=\"%s\"",
271 session->id, session->session_name->str);
272 if (session->id != -1ULL) {
273 if (lttng_live_session_detach(session)) {
274 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
275 /* Old relayd cannot detach sessions. */
276 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id);
277 }
278 }
279 session->id = -1ULL;
280 }
281
282 if (session->traces) {
283 g_ptr_array_free(session->traces, TRUE);
284 }
285
286 if (session->hostname) {
287 g_string_free(session->hostname, TRUE);
288 }
289 if (session->session_name) {
290 g_string_free(session->session_name, TRUE);
291 }
292 g_free(session);
293
294 end:
295 return;
296 }
297
298 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
299 {
300 if (!lttng_live_msg_iter) {
301 goto end;
302 }
303
304 if (lttng_live_msg_iter->sessions) {
305 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
306 }
307
308 if (lttng_live_msg_iter->viewer_connection) {
309 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
310 }
311 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
312 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
313
314 /* All stream iterators must be destroyed at this point. */
315 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
316 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
317
318 g_free(lttng_live_msg_iter);
319
320 end:
321 return;
322 }
323
324 BT_HIDDEN
325 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
326 {
327 struct lttng_live_msg_iter *lttng_live_msg_iter;
328
329 BT_ASSERT(self_msg_iter);
330
331 lttng_live_msg_iter =
332 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
333 BT_ASSERT(lttng_live_msg_iter);
334 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
335 }
336
337 static enum lttng_live_iterator_status
338 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
339 {
340 bt_logging_level log_level = lttng_live_stream->log_level;
341 bt_self_component *self_comp = lttng_live_stream->self_comp;
342
343 switch (lttng_live_stream->state) {
344 case LTTNG_LIVE_STREAM_QUIESCENT:
345 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
346 break;
347 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
348 /* Invalid state. */
349 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
350 bt_common_abort();
351 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
352 /* Invalid state. */
353 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
354 bt_common_abort();
355 case LTTNG_LIVE_STREAM_EOF:
356 break;
357 }
358 return LTTNG_LIVE_ITERATOR_STATUS_OK;
359 }
360
361 /*
362 * For active no data stream, fetch next index. As a result of that it can
363 * become either:
364 * - quiescent: won't have events for a bit,
365 * - have data: need to get that data and produce the event,
366 * - have no data on this stream at this point: need to retry (AGAIN) or return
367 * EOF.
368 */
369 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
370 struct lttng_live_msg_iter *lttng_live_msg_iter,
371 struct lttng_live_stream_iterator *lttng_live_stream)
372 {
373 bt_logging_level log_level = lttng_live_msg_iter->log_level;
374 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
375 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
376 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
377 struct packet_index index;
378
379 if (lttng_live_stream->trace->metadata_stream_state ==
380 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
381 BT_COMP_LOGD(
382 "Need to get an update for the metadata stream before proceeding further with this stream: "
383 "stream-name=\"%s\"",
384 lttng_live_stream->name->str);
385 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
386 goto end;
387 }
388
389 if (lttng_live_stream->trace->session->new_streams_needed) {
390 BT_COMP_LOGD(
391 "Need to get an update of all streams before proceeding further with this stream: "
392 "stream-name=\"%s\"",
393 lttng_live_stream->name->str);
394 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
395 goto end;
396 }
397
398 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
399 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
400 goto end;
401 }
402 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
403 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
404 goto end;
405 }
406
407 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
408
409 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
410 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
411 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
412
413 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
414 /*
415 * Because the stream is in the QUIESCENT_NO_DATA
416 * state, we can assert that the last_inactivity_ts was
417 * set and can be safely used in the `if` above.
418 */
419 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
420
421 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
422 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
423 } else {
424 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
425 }
426 goto end;
427 }
428
429 lttng_live_stream->base_offset = index.offset;
430 lttng_live_stream->offset = index.offset;
431 lttng_live_stream->len = index.packet_size / CHAR_BIT;
432
433 BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
434 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
435 ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
436 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
437 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
438
439 end:
440 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
441 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
442 }
443 return ret;
444 }
445
446 /*
447 * Creation of the message requires the ctf trace class to be created
448 * beforehand, but the live protocol gives us all streams (including metadata)
449 * at once. So we split it in three steps: getting streams, getting metadata
450 * (which creates the ctf trace class), and then creating the per-stream
451 * messages.
452 */
453 static enum lttng_live_iterator_status
454 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
455 struct lttng_live_session *session)
456 {
457 bt_logging_level log_level = lttng_live_msg_iter->log_level;
458 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
459 enum lttng_live_iterator_status status;
460 uint64_t trace_idx;
461
462 if (!session->attached) {
463 BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id);
464 enum lttng_live_viewer_status attach_status =
465 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
466 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
467 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
468 /*
469 * Clear any causes appended in
470 * `lttng_live_attach_session()` as we want to
471 * return gracefully since the graph was
472 * cancelled.
473 */
474 bt_current_thread_clear_error();
475 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
476 } else {
477 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
478 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session");
479 }
480 goto end;
481 }
482 }
483
484 BT_COMP_LOGD("Updating all data streams: "
485 "session-id=%" PRIu64 ", session-name=\"%s\"",
486 session->id, session->session_name->str);
487
488 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
489 switch (status) {
490 case LTTNG_LIVE_ITERATOR_STATUS_OK:
491 break;
492 case LTTNG_LIVE_ITERATOR_STATUS_END:
493 /*
494 * We received a `_END` from the `_get_new_streams()` function,
495 * which means no more data will ever be received from the data
496 * streams of this session. But it's possible that the metadata
497 * is incomplete.
498 * The live protocol guarantees that we receive all the
499 * metadata needed before we receive data streams needing it.
500 * But it's possible to receive metadata NOT needed by
501 * data streams after the session was closed. For example, this
502 * could happen if a new event is registered and the session is
503 * stopped before any tracepoint for that event is actually
504 * fired.
505 */
506 BT_COMP_LOGD(
507 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
508 "session-id=%" PRIu64 ", session-name=\"%s\"",
509 session->id, session->session_name->str);
510 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
511 break;
512 default:
513 goto end;
514 }
515
516 BT_COMP_LOGD("Updating metadata stream for session: "
517 "session-id=%" PRIu64 ", session-name=\"%s\"",
518 session->id, session->session_name->str);
519
520 trace_idx = 0;
521 while (trace_idx < session->traces->len) {
522 struct lttng_live_trace *trace =
523 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
524
525 status = lttng_live_metadata_update(trace);
526 switch (status) {
527 case LTTNG_LIVE_ITERATOR_STATUS_END:
528 case LTTNG_LIVE_ITERATOR_STATUS_OK:
529 trace_idx++;
530 break;
531 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
532 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
533 goto end;
534 default:
535 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
536 "Error updating trace metadata: "
537 "stream-iter-status=%s, trace-id=%" PRIu64,
538 lttng_live_iterator_status_string(status), trace->id);
539 goto end;
540 }
541 }
542
543 /*
544 * Now that we have the metadata we can initialize the downstream
545 * iterator.
546 */
547 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
548
549 end:
550 return status;
551 }
552
553 static void
554 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
555 {
556 uint64_t session_idx, trace_idx;
557 bt_logging_level log_level = lttng_live_msg_iter->log_level;
558 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
559
560 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
561 struct lttng_live_session *session =
562 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
563 BT_COMP_LOGD("Force marking session as needing new streams: "
564 "session-id=%" PRIu64,
565 session->id);
566 session->new_streams_needed = true;
567 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
568 struct lttng_live_trace *trace =
569 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
570 BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
571 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
572 session->id, trace->id);
573
574 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
575
576 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
577 }
578 }
579 }
580
581 static enum lttng_live_iterator_status
582 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
583 {
584 enum lttng_live_iterator_status status;
585 enum lttng_live_viewer_status viewer_status;
586 bt_logging_level log_level = lttng_live_msg_iter->log_level;
587 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
588 uint64_t session_idx = 0, nr_sessions_opened = 0;
589 struct lttng_live_session *session;
590 enum session_not_found_action sess_not_found_act =
591 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
592
593 BT_COMP_LOGD("Update data and metadata of all sessions: "
594 "live-msg-iter-addr=%p",
595 lttng_live_msg_iter);
596 /*
597 * In a remotely distant future, we could add a "new
598 * session" flag to the protocol, which would tell us that we
599 * need to query for new sessions even though we have sessions
600 * currently ongoing.
601 */
602 if (lttng_live_msg_iter->sessions->len == 0) {
603 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
604 BT_COMP_LOGD(
605 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
606 status = LTTNG_LIVE_ITERATOR_STATUS_END;
607 goto end;
608 } else {
609 BT_COMP_LOGD(
610 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
611 /*
612 * Retry to create a viewer session for the requested
613 * session name.
614 */
615 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
616 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
617 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
618 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
619 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
620 "Error creating LTTng live viewer session");
621 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
622 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
623 } else {
624 bt_common_abort();
625 }
626 goto end;
627 }
628 }
629 }
630
631 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
632 session =
633 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
634 status = lttng_live_get_session(lttng_live_msg_iter, session);
635 switch (status) {
636 case LTTNG_LIVE_ITERATOR_STATUS_OK:
637 case LTTNG_LIVE_ITERATOR_STATUS_END:
638 /*
639 * A session returned `_END`. Other sessions may still
640 * be active so we override the status and continue
641 * looping if needed.
642 */
643 break;
644 default:
645 goto end;
646 }
647 if (!session->closed) {
648 nr_sessions_opened++;
649 }
650 }
651
652 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
653 status = LTTNG_LIVE_ITERATOR_STATUS_END;
654 } else {
655 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
656 }
657
658 end:
659 return status;
660 }
661
662 static enum lttng_live_iterator_status
663 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
664 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
665 uint64_t timestamp)
666 {
667 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
668 bt_logging_level log_level = lttng_live_msg_iter->log_level;
669 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
670 bt_message *msg = NULL;
671
672 BT_ASSERT(stream_iter->trace->clock_class);
673
674 BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
675 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
676 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
677
678 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
679 stream_iter->trace->clock_class, timestamp);
680 if (!msg) {
681 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message");
682 goto error;
683 }
684
685 *message = msg;
686 end:
687 return ret;
688
689 error:
690 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
691 bt_message_put_ref(msg);
692 goto end;
693 }
694
695 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
696 struct lttng_live_msg_iter *lttng_live_msg_iter,
697 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
698 {
699 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
700
701 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
702 return LTTNG_LIVE_ITERATOR_STATUS_OK;
703 }
704
705 /*
706 * Check if we already sent an inactivty message downstream for this
707 * `current_inactivity_ts` value.
708 */
709 if (lttng_live_stream->last_inactivity_ts.is_set &&
710 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
711 lttng_live_stream_iterator_set_state(lttng_live_stream,
712 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
713
714 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
715 goto end;
716 }
717
718 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
719 lttng_live_stream->current_inactivity_ts);
720
721 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
722 lttng_live_stream->last_inactivity_ts.is_set = true;
723 end:
724 return ret;
725 }
726
727 static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
728 struct lttng_live_msg_iter *lttng_live_msg_iter,
729 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
730 {
731 const bt_clock_snapshot *clock_snapshot = NULL;
732 int ret = 0;
733 bt_logging_level log_level = lttng_live_msg_iter->log_level;
734 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
735
736 BT_ASSERT_DBG(msg);
737 BT_ASSERT_DBG(ts_ns);
738
739 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
740 "last-msg-ts=%" PRId64,
741 lttng_live_msg_iter, msg, last_msg_ts_ns);
742
743 switch (bt_message_get_type(msg)) {
744 case BT_MESSAGE_TYPE_EVENT:
745 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
746 break;
747 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
748 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
749 break;
750 case BT_MESSAGE_TYPE_PACKET_END:
751 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
752 break;
753 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
754 clock_snapshot =
755 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
756 break;
757 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
758 clock_snapshot =
759 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
760 break;
761 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
762 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
763 break;
764 default:
765 /* All the other messages have a higher priority */
766 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
767 *ts_ns = last_msg_ts_ns;
768 goto end;
769 }
770
771 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
772 if (ret) {
773 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
774 "Cannot get nanoseconds from Epoch of clock snapshot: "
775 "clock-snapshot-addr=%p",
776 clock_snapshot);
777 goto error;
778 }
779
780 goto end;
781
782 error:
783 ret = -1;
784
785 end:
786 if (ret == 0) {
787 BT_COMP_LOGD("Found message's timestamp: "
788 "iter-data-addr=%p, msg-addr=%p, "
789 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
790 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
791 }
792
793 return ret;
794 }
795
796 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
797 struct lttng_live_msg_iter *lttng_live_msg_iter,
798 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
799 {
800 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
801 bt_logging_level log_level = lttng_live_msg_iter->log_level;
802 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
803 enum ctf_msg_iter_status status;
804 uint64_t session_idx, trace_idx;
805
806 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
807 struct lttng_live_session *session =
808 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
809
810 if (session->new_streams_needed) {
811 BT_COMP_LOGD("Need an update for streams: "
812 "session-id=%" PRIu64,
813 session->id);
814 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
815 goto end;
816 }
817 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
818 struct lttng_live_trace *trace =
819 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
820 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
821 BT_COMP_LOGD("Need an update for metadata stream: "
822 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
823 session->id, trace->id);
824 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
825 goto end;
826 }
827 }
828 }
829
830 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
831 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
832 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
833 "Invalid state of live stream iterator"
834 "stream-iter-status=%s",
835 lttng_live_stream_state_string(lttng_live_stream->state));
836 goto end;
837 }
838
839 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
840 switch (status) {
841 case CTF_MSG_ITER_STATUS_EOF:
842 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
843 break;
844 case CTF_MSG_ITER_STATUS_OK:
845 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
846 break;
847 case CTF_MSG_ITER_STATUS_AGAIN:
848 /*
849 * Continue immediately (end of packet). The next
850 * get_index may return AGAIN to delay the following
851 * attempt.
852 */
853 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
854 break;
855 case CTF_MSG_ITER_STATUS_ERROR:
856 default:
857 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
858 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
859 "CTF message iterator failed to get next message: "
860 "msg-iter=%p, msg-iter-status=%s",
861 lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status));
862 break;
863 }
864
865 end:
866 return ret;
867 }
868
869 static enum lttng_live_iterator_status
870 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
871 struct lttng_live_stream_iterator *stream_iter,
872 const bt_message **curr_msg)
873 {
874 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
875 bt_logging_level log_level = lttng_live_msg_iter->log_level;
876 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
877
878 BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
879 "viewer-stream-id=%" PRIu64,
880 stream_iter->name->str, stream_iter->viewer_stream_id);
881
882 /*
883 * The viewer has hung up on us so we are closing the stream. The
884 * `ctf_msg_iter` should simply realize that it needs to close the
885 * stream properly by emitting the necessary stream end message.
886 */
887 enum ctf_msg_iter_status status =
888 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
889
890 if (status == CTF_MSG_ITER_STATUS_ERROR) {
891 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
892 "Error getting the next message from CTF message iterator");
893 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
894 goto end;
895 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
896 BT_COMP_LOGI("Reached the end of the live stream iterator.");
897 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
898 goto end;
899 }
900
901 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
902
903 end:
904 return live_status;
905 }
906
907 /*
908 * helper function:
909 * handle_no_data_streams()
910 * retry:
911 * - for each ACTIVE_NO_DATA stream:
912 * - query relayd for stream data, or quiescence info.
913 * - if need metadata, get metadata, goto retry.
914 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
915 * - if quiescent, move to QUIESCENT streams
916 * - if fetched data, move to ACTIVE_DATA streams
917 * (at this point each stream either has data, or is quiescent)
918 *
919 *
920 * iterator_next:
921 * handle_new_streams_and_metadata()
922 * - query relayd for known streams, add them as ACTIVE_NO_DATA
923 * - query relayd for metadata
924 *
925 * call handle_active_no_data_streams()
926 *
927 * handle_quiescent_streams()
928 * - if at least one stream is ACTIVE_DATA:
929 * - peek stream event with lowest timestamp -> next_ts
930 * - for each quiescent stream
931 * - if next_ts >= quiescent end
932 * - set state to ACTIVE_NO_DATA
933 * - else
934 * - for each quiescent stream
935 * - set state to ACTIVE_NO_DATA
936 *
937 * call handle_active_no_data_streams()
938 *
939 * handle_active_data_streams()
940 * - if at least one stream is ACTIVE_DATA:
941 * - get stream event with lowest timestamp from heap
942 * - make that stream event the current message.
943 * - move this stream heap position to its next event
944 * - if we need to fetch data from relayd, move
945 * stream to ACTIVE_NO_DATA.
946 * - return OK
947 * - return AGAIN
948 *
949 * end criterion: ctrl-c on client. If relayd exits or the session
950 * closes on the relay daemon side, we keep on waiting for streams.
951 * Eventually handle --end timestamp (also an end criterion).
952 *
953 * When disconnected from relayd: try to re-connect endlessly.
954 */
955 static enum lttng_live_iterator_status
956 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
957 struct lttng_live_stream_iterator *stream_iter,
958 const bt_message **curr_msg)
959 {
960 bt_logging_level log_level = lttng_live_msg_iter->log_level;
961 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
962 enum lttng_live_iterator_status live_status;
963
964 BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
965 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
966 stream_iter->name->str, stream_iter->viewer_stream_id);
967
968 if (stream_iter->has_stream_hung_up) {
969 /*
970 * The stream has hung up and the stream was properly closed
971 * during the last call to the current function. Return _END
972 * status now so that this stream iterator is removed for the
973 * stream iterator list.
974 */
975 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
976 goto end;
977 }
978
979 retry:
980 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
981
982 /*
983 * Make sure we have the most recent metadata and possibly some new
984 * streams.
985 */
986 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
987 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
988 goto end;
989 }
990
991 live_status =
992 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
993 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
994 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
995 /*
996 * We overwrite `live_status` since `curr_msg` is
997 * likely set to a valid message in this function.
998 */
999 live_status =
1000 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
1001 }
1002 goto end;
1003 }
1004
1005 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
1006 stream_iter, curr_msg);
1007 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1008 BT_ASSERT(!*curr_msg);
1009 goto end;
1010 }
1011 if (*curr_msg) {
1012 goto end;
1013 }
1014 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
1015 stream_iter, curr_msg);
1016 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1017 BT_ASSERT(!*curr_msg);
1018 }
1019
1020 end:
1021 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
1022 BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
1023 goto retry;
1024 }
1025
1026 BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, "
1027 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1028 lttng_live_iterator_status_string(live_status), stream_iter->name->str,
1029 stream_iter->viewer_stream_id);
1030
1031 return live_status;
1032 }
1033
1034 static bool is_discarded_packet_or_event_message(const bt_message *msg)
1035 {
1036 const enum bt_message_type msg_type = bt_message_get_type(msg);
1037
1038 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
1039 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
1040 }
1041
1042 static enum lttng_live_iterator_status
1043 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
1044 const bt_message *msg_in, bt_message **msg_out,
1045 uint64_t new_begin_ts)
1046 {
1047 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1048 enum bt_property_availability availability;
1049 const bt_clock_snapshot *clock_snapshot;
1050 uint64_t end_ts;
1051 uint64_t count;
1052
1053 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
1054 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1055
1056 availability = bt_message_discarded_packets_get_count(msg_in, &count);
1057 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1058
1059 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
1060 iter, stream, new_begin_ts, end_ts);
1061 if (!*msg_out) {
1062 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1063 goto end;
1064 }
1065
1066 bt_message_discarded_packets_set_count(*msg_out, count);
1067 end:
1068 return status;
1069 }
1070
1071 static enum lttng_live_iterator_status
1072 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
1073 const bt_message *msg_in, bt_message **msg_out,
1074 uint64_t new_begin_ts)
1075 {
1076 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1077 enum bt_property_availability availability;
1078 const bt_clock_snapshot *clock_snapshot;
1079 uint64_t end_ts;
1080 uint64_t count;
1081
1082 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
1083 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1084
1085 availability = bt_message_discarded_events_get_count(msg_in, &count);
1086 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1087
1088 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
1089 iter, stream, new_begin_ts, end_ts);
1090 if (!*msg_out) {
1091 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1092 goto end;
1093 }
1094
1095 bt_message_discarded_events_set_count(*msg_out, count);
1096 end:
1097 return status;
1098 }
1099
1100 static enum lttng_live_iterator_status
1101 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1102 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1103 const bt_message *late_msg)
1104 {
1105 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1106 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1107 const bt_clock_class *clock_class;
1108 const bt_stream_class *stream_class;
1109 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1110 int64_t last_inactivity_ts_ns;
1111 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1112 enum lttng_live_iterator_status adjust_status;
1113 bt_message *adjusted_message;
1114
1115 /*
1116 * The timestamp of the current message is before the last message sent
1117 * by this component. We CANNOT send it as is.
1118 *
1119 * The only expected scenario in which that could happen is the
1120 * following, everything else is a bug in this component, relay deamon,
1121 * or CTF parser.
1122 *
1123 * Expected scenario: The CTF message iterator emitted discarded
1124 * packets and discarded events with synthesized beginning and end
1125 * timestamps from the bounds of the last known packet and the newly
1126 * decoded packet header. The CTF message iterator is not aware of
1127 * stream inactivity beacons. Hence, we have to adjust the beginning
1128 * timestamp of those types of messages if a stream signalled its
1129 * inactivity up until _after_ the last known packet's beginning
1130 * timestamp.
1131 *
1132 * Otherwise, the monotonicity guarantee of message timestamps would
1133 * not be preserved.
1134 *
1135 * In short, the only scenario in which it's okay and fixable to
1136 * received a late message is when:
1137 * 1. the late message is a discarded packets or discarded events
1138 * message,
1139 * 2. this stream produced an inactivity message downstream, and
1140 * 3. the timestamp of the late message is within the inactivity
1141 * timespan we sent downstream through the inactivity message.
1142 */
1143
1144 BT_COMP_LOGD("Handling late message on live stream iterator: "
1145 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1146 stream_iter->name->str, stream_iter->viewer_stream_id);
1147
1148 if (!stream_iter->last_inactivity_ts.is_set) {
1149 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: "
1150 "have a late message when no inactivity message "
1151 "was ever sent for that stream.");
1152 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1153 goto end;
1154 }
1155
1156 if (!is_discarded_packet_or_event_message(late_msg)) {
1157 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1158 "Invalid live stream state: "
1159 "have a late message that is not a packet discarded or "
1160 "event discarded message: late-msg-type=%s",
1161 bt_common_message_type_string(bt_message_get_type(late_msg)));
1162 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1163 goto end;
1164 }
1165
1166 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1167 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1168
1169 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1170 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1171 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1172 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last "
1173 "inactivity message timestamp to nanoseconds");
1174 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1175 goto end;
1176 }
1177
1178 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1179 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1180 "Invalid live stream state: "
1181 "have a late message that is none included in a stream "
1182 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1183 "late-msg-ts-ns=%" PRIu64,
1184 last_inactivity_ts_ns, late_msg_ts_ns);
1185 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1186 goto end;
1187 }
1188
1189 /*
1190 * We now know that it's okay for this message to be late, we can now
1191 * adjust its timestamp to ensure monotonicity.
1192 */
1193 BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1194 "msg-new-ts-ns=%" PRIu64,
1195 bt_common_message_type_string(bt_message_get_type(late_msg)),
1196 stream_iter->last_inactivity_ts.value);
1197 switch (bt_message_get_type(late_msg)) {
1198 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1199 adjust_status = adjust_discarded_events_message(
1200 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1201 stream_iter->last_inactivity_ts.value);
1202 break;
1203 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1204 adjust_status = adjust_discarded_packets_message(
1205 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1206 stream_iter->last_inactivity_ts.value);
1207 break;
1208 default:
1209 bt_common_abort();
1210 }
1211
1212 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1213 stream_iter_status = adjust_status;
1214 goto end;
1215 }
1216
1217 BT_ASSERT_DBG(adjusted_message);
1218 stream_iter->current_msg = adjusted_message;
1219 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1220 bt_message_put_ref(late_msg);
1221
1222 end:
1223 return stream_iter_status;
1224 }
1225
1226 static enum lttng_live_iterator_status
1227 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1228 struct lttng_live_trace *live_trace,
1229 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1230 {
1231 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1232 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1233 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1234 enum lttng_live_iterator_status stream_iter_status;
1235 ;
1236 int64_t youngest_candidate_msg_ts = INT64_MAX;
1237 uint64_t stream_iter_idx;
1238
1239 BT_ASSERT_DBG(live_trace);
1240 BT_ASSERT_DBG(live_trace->stream_iterators);
1241
1242 BT_COMP_LOGD("Finding the next stream iterator for trace: "
1243 "trace-id=%" PRIu64,
1244 live_trace->id);
1245 /*
1246 * Update the current message of every stream iterators of this trace.
1247 * The current msg of every stream must have a timestamp equal or
1248 * larger than the last message returned by this iterator. We must
1249 * ensure monotonicity.
1250 */
1251 stream_iter_idx = 0;
1252 while (stream_iter_idx < live_trace->stream_iterators->len) {
1253 bool stream_iter_is_ended = false;
1254 struct lttng_live_stream_iterator *stream_iter =
1255 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1256 stream_iter_idx);
1257
1258 /*
1259 * If there is no current message for this stream, go fetch
1260 * one.
1261 */
1262 while (!stream_iter->current_msg) {
1263 const bt_message *msg = NULL;
1264 int64_t curr_msg_ts_ns = INT64_MAX;
1265
1266 stream_iter_status =
1267 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1268
1269 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1270 stream_iter_is_ended = true;
1271 break;
1272 }
1273
1274 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1275 goto end;
1276 }
1277
1278 BT_ASSERT_DBG(msg);
1279
1280 BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, "
1281 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1282 bt_common_message_type_string(bt_message_get_type(msg)),
1283 stream_iter->name->str, stream_iter->viewer_stream_id);
1284
1285 /*
1286 * Get the timestamp in nanoseconds from origin of this
1287 * messsage.
1288 */
1289 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, msg,
1290 lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
1291
1292 /*
1293 * Check if the message of the current live stream
1294 * iterator occurred at the exact same time or after the
1295 * last message returned by this component's message
1296 * iterator. If not, we need to handle it with care.
1297 */
1298 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1299 stream_iter->current_msg = msg;
1300 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1301 } else {
1302 /*
1303 * We received a message from the past. This
1304 * may be fixable but it can also be an error.
1305 */
1306 stream_iter_status =
1307 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1308 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1309 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1310 "Late message could not be handled correctly: "
1311 "lttng-live-msg-iter-addr=%p, "
1312 "stream-name=\"%s\", "
1313 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1314 lttng_live_msg_iter, stream_iter->name->str,
1315 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1316 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1317 goto end;
1318 }
1319 }
1320 }
1321
1322 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1323
1324 if (!stream_iter_is_ended) {
1325 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1326 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1327 /*
1328 * Update the current best candidate message
1329 * for the stream iterator of this live trace
1330 * to be forwarded downstream.
1331 */
1332 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1333 youngest_candidate_stream_iter = stream_iter;
1334 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1335 /*
1336 * Order the messages in an arbitrary but
1337 * deterministic way.
1338 */
1339 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1340 int ret = common_muxing_compare_messages(
1341 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1342 if (ret < 0) {
1343 /*
1344 * The `youngest_candidate_stream_iter->current_msg`
1345 * should go first. Update the next
1346 * iterator and the current timestamp.
1347 */
1348 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1349 youngest_candidate_stream_iter = stream_iter;
1350 } else if (ret == 0) {
1351 /*
1352 * Unable to pick which one should go
1353 * first.
1354 */
1355 BT_COMP_LOGW(
1356 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1357 "stream-iter-addr=%p"
1358 "stream-iter-addr=%p",
1359 stream_iter, youngest_candidate_stream_iter);
1360 }
1361 }
1362
1363 stream_iter_idx++;
1364 } else {
1365 /*
1366 * The live stream iterator has ended. That
1367 * iterator is removed from the array, but
1368 * there is no need to increment
1369 * stream_iter_idx as
1370 * g_ptr_array_remove_index_fast replaces the
1371 * removed element with the array's last
1372 * element.
1373 */
1374 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1375 }
1376 }
1377
1378 if (youngest_candidate_stream_iter) {
1379 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1380 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1381 } else {
1382 /*
1383 * The only case where we don't have a candidate for this trace
1384 * is if we reached the end of all the iterators.
1385 */
1386 BT_ASSERT(live_trace->stream_iterators->len == 0);
1387 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1388 }
1389
1390 end:
1391 return stream_iter_status;
1392 }
1393
1394 static enum lttng_live_iterator_status
1395 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1396 struct lttng_live_session *session,
1397 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1398 {
1399 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1400 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1401 enum lttng_live_iterator_status stream_iter_status;
1402 uint64_t trace_idx = 0;
1403 int64_t youngest_candidate_msg_ts = INT64_MAX;
1404 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1405
1406 BT_COMP_LOGD("Finding the next stream iterator for session: "
1407 "session-id=%" PRIu64,
1408 session->id);
1409 /*
1410 * Make sure we are attached to the session and look for new streams
1411 * and metadata.
1412 */
1413 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1414 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1415 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1416 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1417 goto end;
1418 }
1419
1420 BT_ASSERT_DBG(session->traces);
1421
1422 while (trace_idx < session->traces->len) {
1423 bool trace_is_ended = false;
1424 struct lttng_live_stream_iterator *stream_iter;
1425 struct lttng_live_trace *trace =
1426 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1427
1428 stream_iter_status =
1429 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1430 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1431 /*
1432 * All the live stream iterators for this trace are
1433 * ENDed. Remove the trace from this session.
1434 */
1435 trace_is_ended = true;
1436 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1437 goto end;
1438 }
1439
1440 if (!trace_is_ended) {
1441 BT_ASSERT_DBG(stream_iter);
1442
1443 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1444 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1445 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1446 youngest_candidate_stream_iter = stream_iter;
1447 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1448 /*
1449 * Order the messages in an arbitrary but
1450 * deterministic way.
1451 */
1452 int ret = common_muxing_compare_messages(
1453 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1454 if (ret < 0) {
1455 /*
1456 * The `youngest_candidate_stream_iter->current_msg`
1457 * should go first. Update the next iterator
1458 * and the current timestamp.
1459 */
1460 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1461 youngest_candidate_stream_iter = stream_iter;
1462 } else if (ret == 0) {
1463 /* Unable to pick which one should go first. */
1464 BT_COMP_LOGW(
1465 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1466 "stream-iter-addr=%p"
1467 "stream-iter-addr=%p",
1468 stream_iter, youngest_candidate_stream_iter);
1469 }
1470 }
1471 trace_idx++;
1472 } else {
1473 /*
1474 * trace_idx is not incremented since
1475 * g_ptr_array_remove_index_fast replaces the
1476 * element at trace_idx with the array's last element.
1477 */
1478 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1479 }
1480 }
1481 if (youngest_candidate_stream_iter) {
1482 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1483 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1484 } else {
1485 /*
1486 * The only cases where we don't have a candidate for this
1487 * trace is:
1488 * 1. if we reached the end of all the iterators of all the
1489 * traces of this session,
1490 * 2. if we never had live stream iterator in the first place.
1491 *
1492 * In either cases, we return END.
1493 */
1494 BT_ASSERT(session->traces->len == 0);
1495 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1496 }
1497 end:
1498 return stream_iter_status;
1499 }
1500
1501 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1502 {
1503 uint64_t i;
1504
1505 for (i = 0; i < count; i++) {
1506 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1507 }
1508 }
1509
1510 BT_HIDDEN
1511 bt_message_iterator_class_next_method_status
1512 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1513 uint64_t capacity, uint64_t *count)
1514 {
1515 bt_message_iterator_class_next_method_status status;
1516 enum lttng_live_viewer_status viewer_status;
1517 struct lttng_live_msg_iter *lttng_live_msg_iter =
1518 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1519 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1520 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1521 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1522 enum lttng_live_iterator_status stream_iter_status;
1523 uint64_t session_idx;
1524
1525 *count = 0;
1526
1527 BT_ASSERT_DBG(lttng_live_msg_iter);
1528
1529 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1530 /*
1531 * The iterator was interrupted in a previous call to the
1532 * `_next()` method. We currently do not support generating
1533 * messages after such event. The babeltrace2 CLI should never
1534 * be running the graph after being interrupted. So this check
1535 * is to prevent other graph users from using this live
1536 * iterator in an messed up internal state.
1537 */
1538 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1539 BT_COMP_LOGE_APPEND_CAUSE(
1540 self_comp,
1541 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1542 goto end;
1543 }
1544
1545 /*
1546 * Clear all the invalid message reference that might be left over in
1547 * the output array.
1548 */
1549 memset(msgs, 0, capacity * sizeof(*msgs));
1550
1551 /*
1552 * If no session are exposed on the relay found at the url provided by
1553 * the user, session count will be 0. In this case, we return status
1554 * end to return gracefully.
1555 */
1556 if (lttng_live_msg_iter->sessions->len == 0) {
1557 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1558 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1559 goto end;
1560 } else {
1561 /*
1562 * The are no more active session for this session
1563 * name. Retry to create a viewer session for the
1564 * requested session name.
1565 */
1566 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1567 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1568 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1569 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1570 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1571 "Error creating LTTng live viewer session");
1572 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1573 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1574 } else {
1575 bt_common_abort();
1576 }
1577 goto end;
1578 }
1579 }
1580 }
1581
1582 if (lttng_live_msg_iter->active_stream_iter == 0) {
1583 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1584 }
1585
1586 /*
1587 * Here the muxing of message is done.
1588 *
1589 * We need to iterate over all the streams of all the traces of all the
1590 * viewer sessions in order to get the message with the smallest
1591 * timestamp. In this case, a session is a viewer session and there is
1592 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1593 * kernel). Each viewer session can have multiple traces, for example,
1594 * 64bit UST viewer sessions could have multiple per-pid traces.
1595 *
1596 * We iterate over the streams of each traces to update and see what is
1597 * their next message's timestamp. From those timestamps, we select the
1598 * message with the smallest timestamp as the best candidate message
1599 * for that trace and do the same thing across all the sessions.
1600 *
1601 * We then compare the timestamp of best candidate message of all the
1602 * sessions to pick the message with the smallest timestamp and we
1603 * return it.
1604 */
1605 while (*count < capacity) {
1606 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1607 *candidate_stream_iter = NULL;
1608 int64_t youngest_msg_ts_ns = INT64_MAX;
1609
1610 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1611 session_idx = 0;
1612 while (session_idx < lttng_live_msg_iter->sessions->len) {
1613 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1614 lttng_live_msg_iter->sessions, session_idx);
1615
1616 /* Find the best candidate message to send downstream. */
1617 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1618 &candidate_stream_iter);
1619
1620 /* If we receive an END status, it means that either:
1621 * - Those traces never had active streams (UST with no
1622 * data produced yet),
1623 * - All live stream iterators have ENDed.*/
1624 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1625 if (session->closed && session->traces->len == 0) {
1626 /*
1627 * Remove the session from the list.
1628 * session_idx is not modified since
1629 * g_ptr_array_remove_index_fast
1630 * replaces the the removed element with
1631 * the array's last element.
1632 */
1633 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1634 } else {
1635 session_idx++;
1636 }
1637 continue;
1638 }
1639
1640 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1641 goto return_status;
1642 }
1643
1644 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1645 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1646 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1647 youngest_stream_iter = candidate_stream_iter;
1648 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1649 /*
1650 * The currently selected message to be sent
1651 * downstream next has the exact same timestamp
1652 * that of the current candidate message. We
1653 * must break the tie in a predictable manner.
1654 */
1655 BT_COMP_LOGD_STR(
1656 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1657 /*
1658 * Order the messages in an arbitrary but
1659 * deterministic way.
1660 */
1661 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1662 youngest_stream_iter->current_msg);
1663 if (ret < 0) {
1664 /*
1665 * The `candidate_stream_iter->current_msg`
1666 * should go first. Update the next
1667 * iterator and the current timestamp.
1668 */
1669 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1670 youngest_stream_iter = candidate_stream_iter;
1671 } else if (ret == 0) {
1672 /* Unable to pick which one should go first. */
1673 BT_COMP_LOGW(
1674 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1675 "next-stream-iter-addr=%p"
1676 "candidate-stream-iter-addr=%p",
1677 youngest_stream_iter, candidate_stream_iter);
1678 }
1679 }
1680
1681 session_idx++;
1682 }
1683
1684 if (!youngest_stream_iter) {
1685 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1686 goto return_status;
1687 }
1688
1689 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1690 /* Ensure monotonicity. */
1691 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1692 youngest_stream_iter->current_msg_ts_ns);
1693
1694 /*
1695 * Insert the next message to the message batch. This will set
1696 * stream iterator current messsage to NULL so that next time
1697 * we fetch the next message of that stream iterator
1698 */
1699 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1700 (*count)++;
1701
1702 /* Update the last timestamp in nanoseconds sent downstream. */
1703 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1704 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1705
1706 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1707 }
1708
1709 return_status:
1710 switch (stream_iter_status) {
1711 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1712 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1713 /*
1714 * If we gathered messages, return _OK even if the graph was
1715 * interrupted. This allows for the components downstream to at
1716 * least get the thoses messages. If the graph was indeed
1717 * interrupted there should not be another _next() call as the
1718 * application will tear down the graph. This component class
1719 * doesn't support restarting after an interruption.
1720 */
1721 if (*count > 0) {
1722 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1723 } else {
1724 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1725 }
1726 break;
1727 case LTTNG_LIVE_ITERATOR_STATUS_END:
1728 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1729 break;
1730 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1731 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1732 "Memory error preparing the next batch of messages: "
1733 "live-iter-status=%s",
1734 lttng_live_iterator_status_string(stream_iter_status));
1735 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1736 break;
1737 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1738 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1739 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1740 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1741 "Error preparing the next batch of messages: "
1742 "live-iter-status=%s",
1743 lttng_live_iterator_status_string(stream_iter_status));
1744
1745 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1746 /* Put all existing messages on error. */
1747 put_messages(msgs, *count);
1748 break;
1749 default:
1750 bt_common_abort();
1751 }
1752
1753 end:
1754 return status;
1755 }
1756
1757 static struct lttng_live_msg_iter *
1758 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1759 bt_self_message_iterator *self_msg_it)
1760 {
1761 bt_self_component *self_comp = lttng_live_comp->self_comp;
1762 bt_logging_level log_level = lttng_live_comp->log_level;
1763
1764 struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1765 if (!lttng_live_msg_iter) {
1766 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
1767 goto end;
1768 }
1769
1770 lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
1771 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1772 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1773 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1774
1775 lttng_live_msg_iter->active_stream_iter = 0;
1776 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1777 lttng_live_msg_iter->was_interrupted = false;
1778
1779 lttng_live_msg_iter->sessions =
1780 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1781 BT_ASSERT(lttng_live_msg_iter->sessions);
1782
1783 end:
1784 return lttng_live_msg_iter;
1785 }
1786
1787 BT_HIDDEN
1788 bt_message_iterator_class_initialize_method_status
1789 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1790 bt_self_message_iterator_configuration *config,
1791 bt_self_component_port_output *self_port)
1792 {
1793 bt_message_iterator_class_initialize_method_status status;
1794 struct lttng_live_component *lttng_live;
1795 struct lttng_live_msg_iter *lttng_live_msg_iter;
1796 enum lttng_live_viewer_status viewer_status;
1797 bt_logging_level log_level;
1798 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1799
1800 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1801 log_level = lttng_live->log_level;
1802 self_comp = lttng_live->self_comp;
1803
1804 /* There can be only one downstream iterator at the same time. */
1805 BT_ASSERT(!lttng_live->has_msg_iter);
1806 lttng_live->has_msg_iter = true;
1807
1808 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1809 if (!lttng_live_msg_iter) {
1810 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter");
1811 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1812 goto error;
1813 }
1814
1815 viewer_status = live_viewer_connection_create(
1816 self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter,
1817 &lttng_live_msg_iter->viewer_connection);
1818 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1819 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1820 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection");
1821 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1822 /*
1823 * Interruption in the _iter_init() method is not
1824 * supported. Return an error.
1825 */
1826 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection");
1827 }
1828
1829 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1830 goto error;
1831 }
1832
1833 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1834 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1835 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1836 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session");
1837 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1838 /*
1839 * Interruption in the _iter_init() method is not
1840 * supported. Return an error.
1841 */
1842 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session");
1843 }
1844
1845 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1846 goto error;
1847 }
1848
1849 if (lttng_live_msg_iter->sessions->len == 0) {
1850 switch (lttng_live->params.sess_not_found_act) {
1851 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1852 BT_COMP_LOGI(
1853 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1854 "%s=\"%s\" component parameter: url=\"%s\"",
1855 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1856 lttng_live->params.url->str);
1857 break;
1858 case SESSION_NOT_FOUND_ACTION_FAIL:
1859 BT_COMP_LOGE_APPEND_CAUSE(
1860 self_comp,
1861 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1862 "component parameter: url =\"%s\"",
1863 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1864 lttng_live->params.url->str);
1865 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1866 goto error;
1867 case SESSION_NOT_FOUND_ACTION_END:
1868 BT_COMP_LOGI(
1869 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1870 "call because of %s=\"%s\" component parameter: "
1871 "url=\"%s\"",
1872 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1873 lttng_live->params.url->str);
1874 break;
1875 default:
1876 bt_common_abort();
1877 }
1878 }
1879
1880 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1881 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1882 goto end;
1883
1884 error:
1885 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1886 end:
1887 return status;
1888 }
1889
1890 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1891 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1892 bt_param_validation_value_descr::makeString()},
1893 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1894
1895 static bt_component_class_query_method_status
1896 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1897 bt_self_component_class *self_comp_class, bt_logging_level log_level)
1898 {
1899 bt_component_class_query_method_status status;
1900 const bt_value *url_value = NULL;
1901 const char *url;
1902 struct live_viewer_connection *viewer_connection = NULL;
1903 enum lttng_live_viewer_status viewer_status;
1904 enum bt_param_validation_status validation_status;
1905 gchar *validate_error = NULL;
1906
1907 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1908 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1909 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1910 goto error;
1911 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1912 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1913 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
1914 goto error;
1915 }
1916
1917 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1918 url = bt_value_string_get(url_value);
1919
1920 viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL,
1921 &viewer_connection);
1922 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1923 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1924 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection");
1925 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1926 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1927 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1928 } else {
1929 bt_common_abort();
1930 }
1931 goto error;
1932 }
1933
1934 status = live_viewer_connection_list_sessions(viewer_connection, result);
1935 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1936 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions");
1937 goto error;
1938 }
1939
1940 goto end;
1941
1942 error:
1943 BT_VALUE_PUT_REF_AND_RESET(*result);
1944
1945 if (status >= 0) {
1946 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1947 }
1948
1949 end:
1950 if (viewer_connection) {
1951 live_viewer_connection_destroy(viewer_connection);
1952 }
1953
1954 g_free(validate_error);
1955
1956 return status;
1957 }
1958
1959 static bt_component_class_query_method_status
1960 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1961 bt_self_component_class *self_comp_class, bt_logging_level log_level)
1962 {
1963 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1964 const bt_value *input_type_value;
1965 const bt_value *input_value;
1966 double weight = 0;
1967 struct bt_common_lttng_live_url_parts parts = {0};
1968
1969 /* Used by the logging macros */
1970 __attribute__((unused)) bt_self_component *self_comp = NULL;
1971
1972 *result = NULL;
1973 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1974 if (!input_type_value) {
1975 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter.");
1976 goto error;
1977 }
1978
1979 if (!bt_value_is_string(input_type_value)) {
1980 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value.");
1981 goto error;
1982 }
1983
1984 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1985 /* We don't handle file system paths */
1986 goto create_result;
1987 }
1988
1989 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1990 if (!input_value) {
1991 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter.");
1992 goto error;
1993 }
1994
1995 if (!bt_value_is_string(input_value)) {
1996 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1997 "`input` parameter is not a string value.");
1998 goto error;
1999 }
2000
2001 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
2002 if (parts.session_name) {
2003 /*
2004 * Looks pretty much like an LTTng live URL: we got the
2005 * session name part, which forms a complete URL.
2006 */
2007 weight = .75;
2008 }
2009
2010 create_result:
2011 *result = bt_value_real_create_init(weight);
2012 if (!*result) {
2013 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
2014 goto error;
2015 }
2016
2017 goto end;
2018
2019 error:
2020 if (status >= 0) {
2021 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
2022 }
2023
2024 BT_ASSERT(!*result);
2025
2026 end:
2027 bt_common_destroy_lttng_live_url_parts(&parts);
2028 return status;
2029 }
2030
2031 BT_HIDDEN
2032 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
2033 bt_private_query_executor *priv_query_exec,
2034 const char *object, const bt_value *params,
2035 __attribute__((unused)) void *method_data,
2036 const bt_value **result)
2037 {
2038 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2039 bt_self_component *self_comp = NULL;
2040 bt_self_component_class *self_comp_class =
2041 bt_self_component_class_source_as_self_component_class(comp_class);
2042 bt_logging_level log_level = bt_query_executor_get_logging_level(
2043 bt_private_query_executor_as_query_executor_const(priv_query_exec));
2044
2045 if (strcmp(object, "sessions") == 0) {
2046 status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level);
2047 } else if (strcmp(object, "babeltrace.support-info") == 0) {
2048 status = lttng_live_query_support_info(params, result, self_comp_class, log_level);
2049 } else {
2050 BT_COMP_LOGI("Unknown query object `%s`", object);
2051 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
2052 goto end;
2053 }
2054
2055 end:
2056 return status;
2057 }
2058
2059 static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
2060 {
2061 if (!lttng_live) {
2062 return;
2063 }
2064 if (lttng_live->params.url) {
2065 g_string_free(lttng_live->params.url, TRUE);
2066 }
2067 g_free(lttng_live);
2068 }
2069
2070 BT_HIDDEN
2071 void lttng_live_component_finalize(bt_self_component_source *component)
2072 {
2073 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
2074 bt_self_component_source_as_self_component(component));
2075
2076 if (!data) {
2077 return;
2078 }
2079 lttng_live_component_destroy_data(data);
2080 }
2081
2082 static enum session_not_found_action
2083 parse_session_not_found_action_param(const bt_value *no_session_param)
2084 {
2085 enum session_not_found_action action;
2086 const char *no_session_act_str = bt_value_string_get(no_session_param);
2087
2088 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2089 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2090 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2091 action = SESSION_NOT_FOUND_ACTION_FAIL;
2092 } else {
2093 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2094 action = SESSION_NOT_FOUND_ACTION_END;
2095 }
2096
2097 return action;
2098 }
2099
2100 static bt_param_validation_value_descr inputs_elem_descr =
2101 bt_param_validation_value_descr::makeString();
2102
2103 static const char *sess_not_found_action_choices[] = {
2104 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2105 SESS_NOT_FOUND_ACTION_FAIL_STR,
2106 SESS_NOT_FOUND_ACTION_END_STR,
2107 };
2108
2109 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
2110 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2111 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2112 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2113 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
2114 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2115
2116 static bt_component_class_initialize_method_status
2117 lttng_live_component_create(const bt_value *params, bt_logging_level log_level,
2118 bt_self_component *self_comp, struct lttng_live_component **component)
2119 {
2120 struct lttng_live_component *lttng_live = NULL;
2121 const bt_value *inputs_value;
2122 const bt_value *url_value;
2123 const bt_value *value;
2124 const char *url;
2125 enum bt_param_validation_status validation_status;
2126 gchar *validation_error = NULL;
2127 bt_component_class_initialize_method_status status;
2128
2129 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2130 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2131 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2132 goto error;
2133 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2134 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
2135 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2136 goto error;
2137 }
2138
2139 lttng_live = g_new0(struct lttng_live_component, 1);
2140 if (!lttng_live) {
2141 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2142 goto end;
2143 }
2144 lttng_live->log_level = log_level;
2145 lttng_live->self_comp = self_comp;
2146 lttng_live->max_query_size = MAX_QUERY_SIZE;
2147 lttng_live->has_msg_iter = false;
2148
2149 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2150 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2151 url = bt_value_string_get(url_value);
2152
2153 lttng_live->params.url = g_string_new(url);
2154 if (!lttng_live->params.url) {
2155 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2156 goto error;
2157 }
2158
2159 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2160 if (value) {
2161 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2162 } else {
2163 BT_COMP_LOGI("Optional `%s` parameter is missing: "
2164 "defaulting to `%s`.",
2165 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2166 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2167 }
2168
2169 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2170 goto end;
2171
2172 error:
2173 lttng_live_component_destroy_data(lttng_live);
2174 lttng_live = NULL;
2175 end:
2176 g_free(validation_error);
2177
2178 *component = lttng_live;
2179 return status;
2180 }
2181
2182 BT_HIDDEN
2183 bt_component_class_initialize_method_status
2184 lttng_live_component_init(bt_self_component_source *self_comp_src,
2185 bt_self_component_source_configuration *config, const bt_value *params,
2186 __attribute__((unused)) void *init_method_data)
2187 {
2188 struct lttng_live_component *lttng_live;
2189 bt_component_class_initialize_method_status ret;
2190 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2191 bt_logging_level log_level =
2192 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2193 bt_self_component_add_port_status add_port_status;
2194
2195 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
2196 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2197 goto error;
2198 }
2199
2200 add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2201 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2202 ret = (bt_component_class_initialize_method_status) add_port_status;
2203 goto end;
2204 }
2205
2206 bt_self_component_set_data(self_comp, lttng_live);
2207 goto end;
2208
2209 error:
2210 lttng_live_component_destroy_data(lttng_live);
2211 lttng_live = NULL;
2212 end:
2213 return ret;
2214 }
This page took 0.149848 seconds and 4 git commands to generate.