826a1b532996e1fddc9fb988bf8c2fbeff99b09b
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Babeltrace CTF LTTng-live Client Component
9 */
10
11 #define BT_COMP_LOG_SELF_COMP self_comp
12 #define BT_LOG_OUTPUT_LEVEL log_level
13 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
14 #include "logging/comp-logging.h"
15
16 #include <inttypes.h>
17 #include <stdbool.h>
18 #include <unistd.h>
19
20 #include <glib.h>
21
22 #include "common/assert.h"
23 #include <babeltrace2/babeltrace.h>
24 #include "compat/compiler.h"
25
26 #include "plugins/common/muxing/muxing.h"
27 #include "plugins/common/param-validation/param-validation.h"
28
29 #include "data-stream.hpp"
30 #include "metadata.hpp"
31 #include "lttng-live.hpp"
32
33 #define MAX_QUERY_SIZE (256 * 1024)
34 #define URL_PARAM "url"
35 #define INPUTS_PARAM "inputs"
36 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
37 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
38 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
39 #define SESS_NOT_FOUND_ACTION_END_STR "end"
40
41 #define print_dbg(fmt, ...) BT_COMP_LOGD(fmt, ##__VA_ARGS__)
42
43 static const char *lttng_live_iterator_status_string(enum lttng_live_iterator_status status)
44 {
45 switch (status) {
46 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
47 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
48 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
49 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
50 case LTTNG_LIVE_ITERATOR_STATUS_END:
51 return "LTTNG_LIVE_ITERATOR_STATUS_END";
52 case LTTNG_LIVE_ITERATOR_STATUS_OK:
53 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
54 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
55 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
56 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
57 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
58 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
59 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
60 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
61 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
62 default:
63 bt_common_abort();
64 }
65 }
66
67 static const char *lttng_live_stream_state_string(enum lttng_live_stream_state state)
68 {
69 switch (state) {
70 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
71 return "ACTIVE_NO_DATA";
72 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
73 return "QUIESCENT_NO_DATA";
74 case LTTNG_LIVE_STREAM_QUIESCENT:
75 return "QUIESCENT";
76 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
77 return "ACTIVE_DATA";
78 case LTTNG_LIVE_STREAM_EOF:
79 return "EOF";
80 default:
81 return "ERROR";
82 }
83 }
84
85 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
86 enum lttng_live_stream_state new_state)
87 {
88 bt_self_component *self_comp = stream_iter->self_comp;
89 bt_logging_level log_level = stream_iter->log_level;
90
91 BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64
92 ", old-state=%s, new-state=%s",
93 stream_iter->viewer_stream_id, lttng_live_stream_state_string(stream_iter->state),
94 lttng_live_stream_state_string(new_state));
95
96 stream_iter->state = new_state;
97 }
98
99 #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \
100 do { \
101 BT_COMP_LOGD("Live stream iterator state=%s, " \
102 "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \
103 "curr-inact-ts=%" PRId64, \
104 lttng_live_stream_state_string(live_stream_iter->state), \
105 live_stream_iter->last_inactivity_ts.is_set, \
106 live_stream_iter->last_inactivity_ts.value, \
107 live_stream_iter->current_inactivity_ts); \
108 } while (0);
109
110 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
111 {
112 bool ret;
113
114 if (!msg_iter) {
115 ret = false;
116 goto end;
117 }
118
119 ret = bt_self_message_iterator_is_interrupted(msg_iter->self_msg_iter);
120
121 end:
122 return ret;
123 }
124
125 static struct lttng_live_trace *
126 lttng_live_session_borrow_trace_by_id(struct lttng_live_session *session, uint64_t trace_id)
127 {
128 uint64_t trace_idx;
129 struct lttng_live_trace *ret_trace = NULL;
130
131 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
132 struct lttng_live_trace *trace =
133 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
134 if (trace->id == trace_id) {
135 ret_trace = trace;
136 goto end;
137 }
138 }
139
140 end:
141 return ret_trace;
142 }
143
144 static void lttng_live_destroy_trace(struct lttng_live_trace *trace)
145 {
146 bt_logging_level log_level = trace->log_level;
147 bt_self_component *self_comp = trace->self_comp;
148
149 BT_COMP_LOGD("Destroying live trace: trace-id=%" PRIu64, trace->id);
150
151 BT_ASSERT(trace->stream_iterators);
152 g_ptr_array_free(trace->stream_iterators, TRUE);
153
154 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
155 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
156
157 lttng_live_metadata_fini(trace);
158 g_free(trace);
159 }
160
161 static struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
162 uint64_t trace_id)
163 {
164 struct lttng_live_trace *trace = NULL;
165 bt_logging_level log_level = session->log_level;
166 bt_self_component *self_comp = session->self_comp;
167
168 BT_COMP_LOGD("Creating live trace: "
169 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
170 session->id, trace_id);
171 trace = g_new0(struct lttng_live_trace, 1);
172 if (!trace) {
173 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live trace");
174 goto error;
175 }
176 trace->log_level = session->log_level;
177 trace->self_comp = session->self_comp;
178 trace->session = session;
179 trace->id = trace_id;
180 trace->trace_class = NULL;
181 trace->trace = NULL;
182 trace->stream_iterators =
183 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_stream_iterator_destroy);
184 BT_ASSERT(trace->stream_iterators);
185 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
186 g_ptr_array_add(session->traces, trace);
187
188 goto end;
189 error:
190 g_free(trace);
191 trace = NULL;
192 end:
193 return trace;
194 }
195
196 struct lttng_live_trace *
197 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
198 uint64_t trace_id)
199 {
200 struct lttng_live_trace *trace;
201
202 trace = lttng_live_session_borrow_trace_by_id(session, trace_id);
203 if (trace) {
204 goto end;
205 }
206
207 /* The session is the owner of the newly created trace. */
208 trace = lttng_live_create_trace(session, trace_id);
209
210 end:
211 return trace;
212 }
213
214 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint64_t session_id,
215 const char *hostname, const char *session_name)
216 {
217 int ret = 0;
218 struct lttng_live_session *session;
219 bt_logging_level log_level = lttng_live_msg_iter->log_level;
220 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
221
222 BT_COMP_LOGD("Adding live session: "
223 "session-id=%" PRIu64 ", hostname=\"%s\" session-name=\"%s\"",
224 session_id, hostname, session_name);
225
226 session = g_new0(struct lttng_live_session, 1);
227 if (!session) {
228 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate live session");
229 goto error;
230 }
231
232 session->log_level = lttng_live_msg_iter->log_level;
233 session->self_comp = lttng_live_msg_iter->self_comp;
234 session->id = session_id;
235 session->traces = g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_trace);
236 BT_ASSERT(session->traces);
237 session->lttng_live_msg_iter = lttng_live_msg_iter;
238 session->new_streams_needed = true;
239 session->hostname = g_string_new(hostname);
240 BT_ASSERT(session->hostname);
241
242 session->session_name = g_string_new(session_name);
243 BT_ASSERT(session->session_name);
244
245 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
246 goto end;
247 error:
248 g_free(session);
249 ret = -1;
250 end:
251 return ret;
252 }
253
254 static void lttng_live_destroy_session(struct lttng_live_session *session)
255 {
256 bt_logging_level log_level;
257 bt_self_component *self_comp;
258
259 if (!session) {
260 goto end;
261 }
262
263 log_level = session->log_level;
264 self_comp = session->self_comp;
265 BT_COMP_LOGD("Destroying live session: "
266 "session-id=%" PRIu64 ", session-name=\"%s\"",
267 session->id, session->session_name->str);
268 if (session->id != -1ULL) {
269 if (lttng_live_session_detach(session)) {
270 if (!lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
271 /* Old relayd cannot detach sessions. */
272 BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id);
273 }
274 }
275 session->id = -1ULL;
276 }
277
278 if (session->traces) {
279 g_ptr_array_free(session->traces, TRUE);
280 }
281
282 if (session->hostname) {
283 g_string_free(session->hostname, TRUE);
284 }
285 if (session->session_name) {
286 g_string_free(session->session_name, TRUE);
287 }
288 g_free(session);
289
290 end:
291 return;
292 }
293
294 static void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
295 {
296 if (!lttng_live_msg_iter) {
297 goto end;
298 }
299
300 if (lttng_live_msg_iter->sessions) {
301 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
302 }
303
304 if (lttng_live_msg_iter->viewer_connection) {
305 live_viewer_connection_destroy(lttng_live_msg_iter->viewer_connection);
306 }
307 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
308 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
309
310 /* All stream iterators must be destroyed at this point. */
311 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
312 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
313
314 g_free(lttng_live_msg_iter);
315
316 end:
317 return;
318 }
319
320 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
321 {
322 struct lttng_live_msg_iter *lttng_live_msg_iter;
323
324 BT_ASSERT(self_msg_iter);
325
326 lttng_live_msg_iter =
327 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_iter);
328 BT_ASSERT(lttng_live_msg_iter);
329 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
330 }
331
332 static enum lttng_live_iterator_status
333 lttng_live_iterator_next_check_stream_state(struct lttng_live_stream_iterator *lttng_live_stream)
334 {
335 bt_logging_level log_level = lttng_live_stream->log_level;
336 bt_self_component *self_comp = lttng_live_stream->self_comp;
337
338 switch (lttng_live_stream->state) {
339 case LTTNG_LIVE_STREAM_QUIESCENT:
340 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
341 break;
342 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
343 /* Invalid state. */
344 BT_COMP_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
345 bt_common_abort();
346 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
347 /* Invalid state. */
348 BT_COMP_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
349 bt_common_abort();
350 case LTTNG_LIVE_STREAM_EOF:
351 break;
352 }
353 return LTTNG_LIVE_ITERATOR_STATUS_OK;
354 }
355
356 /*
357 * For active no data stream, fetch next index. As a result of that it can
358 * become either:
359 * - quiescent: won't have events for a bit,
360 * - have data: need to get that data and produce the event,
361 * - have no data on this stream at this point: need to retry (AGAIN) or return
362 * EOF.
363 */
364 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
365 struct lttng_live_msg_iter *lttng_live_msg_iter,
366 struct lttng_live_stream_iterator *lttng_live_stream)
367 {
368 bt_logging_level log_level = lttng_live_msg_iter->log_level;
369 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
370 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
371 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
372 struct packet_index index;
373
374 if (lttng_live_stream->trace->metadata_stream_state ==
375 LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
376 BT_COMP_LOGD(
377 "Need to get an update for the metadata stream before proceeding further with this stream: "
378 "stream-name=\"%s\"",
379 lttng_live_stream->name->str);
380 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
381 goto end;
382 }
383
384 if (lttng_live_stream->trace->session->new_streams_needed) {
385 BT_COMP_LOGD(
386 "Need to get an update of all streams before proceeding further with this stream: "
387 "stream-name=\"%s\"",
388 lttng_live_stream->name->str);
389 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
390 goto end;
391 }
392
393 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
394 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
395 goto end;
396 }
397 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, &index);
398 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
399 goto end;
400 }
401
402 BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
403
404 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
405 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value,
406 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
407
408 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) {
409 /*
410 * Because the stream is in the QUIESCENT_NO_DATA
411 * state, we can assert that the last_inactivity_ts was
412 * set and can be safely used in the `if` above.
413 */
414 BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set);
415
416 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
417 LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream);
418 } else {
419 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
420 }
421 goto end;
422 }
423
424 lttng_live_stream->base_offset = index.offset;
425 lttng_live_stream->offset = index.offset;
426 lttng_live_stream->len = index.packet_size / CHAR_BIT;
427
428 BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", "
429 "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64
430 ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64,
431 lttng_live_stream->name->str, lttng_live_stream->viewer_stream_id,
432 lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
433
434 end:
435 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
436 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
437 }
438 return ret;
439 }
440
441 /*
442 * Creation of the message requires the ctf trace class to be created
443 * beforehand, but the live protocol gives us all streams (including metadata)
444 * at once. So we split it in three steps: getting streams, getting metadata
445 * (which creates the ctf trace class), and then creating the per-stream
446 * messages.
447 */
448 static enum lttng_live_iterator_status
449 lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
450 struct lttng_live_session *session)
451 {
452 bt_logging_level log_level = lttng_live_msg_iter->log_level;
453 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
454 enum lttng_live_iterator_status status;
455 uint64_t trace_idx;
456
457 if (!session->attached) {
458 BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, session->id);
459 enum lttng_live_viewer_status attach_status =
460 lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
461 if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
462 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
463 /*
464 * Clear any causes appended in
465 * `lttng_live_attach_session()` as we want to
466 * return gracefully since the graph was
467 * cancelled.
468 */
469 bt_current_thread_clear_error();
470 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
471 } else {
472 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
473 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error attaching to LTTng live session");
474 }
475 goto end;
476 }
477 }
478
479 BT_COMP_LOGD("Updating all data streams: "
480 "session-id=%" PRIu64 ", session-name=\"%s\"",
481 session->id, session->session_name->str);
482
483 status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
484 switch (status) {
485 case LTTNG_LIVE_ITERATOR_STATUS_OK:
486 break;
487 case LTTNG_LIVE_ITERATOR_STATUS_END:
488 /*
489 * We received a `_END` from the `_get_new_streams()` function,
490 * which means no more data will ever be received from the data
491 * streams of this session. But it's possible that the metadata
492 * is incomplete.
493 * The live protocol guarantees that we receive all the
494 * metadata needed before we receive data streams needing it.
495 * But it's possible to receive metadata NOT needed by
496 * data streams after the session was closed. For example, this
497 * could happen if a new event is registered and the session is
498 * stopped before any tracepoint for that event is actually
499 * fired.
500 */
501 BT_COMP_LOGD(
502 "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
503 "session-id=%" PRIu64 ", session-name=\"%s\"",
504 session->id, session->session_name->str);
505 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
506 break;
507 default:
508 goto end;
509 }
510
511 BT_COMP_LOGD("Updating metadata stream for session: "
512 "session-id=%" PRIu64 ", session-name=\"%s\"",
513 session->id, session->session_name->str);
514
515 trace_idx = 0;
516 while (trace_idx < session->traces->len) {
517 struct lttng_live_trace *trace =
518 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
519
520 status = lttng_live_metadata_update(trace);
521 switch (status) {
522 case LTTNG_LIVE_ITERATOR_STATUS_END:
523 case LTTNG_LIVE_ITERATOR_STATUS_OK:
524 trace_idx++;
525 break;
526 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
527 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
528 goto end;
529 default:
530 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
531 "Error updating trace metadata: "
532 "stream-iter-status=%s, trace-id=%" PRIu64,
533 lttng_live_iterator_status_string(status), trace->id);
534 goto end;
535 }
536 }
537
538 /*
539 * Now that we have the metadata we can initialize the downstream
540 * iterator.
541 */
542 status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter);
543
544 end:
545 return status;
546 }
547
548 static void
549 lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
550 {
551 uint64_t session_idx, trace_idx;
552 bt_logging_level log_level = lttng_live_msg_iter->log_level;
553 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
554
555 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
556 struct lttng_live_session *session =
557 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
558 BT_COMP_LOGD("Force marking session as needing new streams: "
559 "session-id=%" PRIu64,
560 session->id);
561 session->new_streams_needed = true;
562 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
563 struct lttng_live_trace *trace =
564 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
565 BT_COMP_LOGD("Force marking trace metadata state as needing an update: "
566 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
567 session->id, trace->id);
568
569 BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED);
570
571 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
572 }
573 }
574 }
575
576 static enum lttng_live_iterator_status
577 lttng_live_iterator_handle_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
578 {
579 enum lttng_live_iterator_status status;
580 enum lttng_live_viewer_status viewer_status;
581 bt_logging_level log_level = lttng_live_msg_iter->log_level;
582 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
583 uint64_t session_idx = 0, nr_sessions_opened = 0;
584 struct lttng_live_session *session;
585 enum session_not_found_action sess_not_found_act =
586 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
587
588 BT_COMP_LOGD("Update data and metadata of all sessions: "
589 "live-msg-iter-addr=%p",
590 lttng_live_msg_iter);
591 /*
592 * In a remotely distant future, we could add a "new
593 * session" flag to the protocol, which would tell us that we
594 * need to query for new sessions even though we have sessions
595 * currently ongoing.
596 */
597 if (lttng_live_msg_iter->sessions->len == 0) {
598 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
599 BT_COMP_LOGD(
600 "No session found. Exiting in accordance with the `session-not-found-action` parameter");
601 status = LTTNG_LIVE_ITERATOR_STATUS_END;
602 goto end;
603 } else {
604 BT_COMP_LOGD(
605 "No session found. Try creating a new one in accordance with the `session-not-found-action` parameter");
606 /*
607 * Retry to create a viewer session for the requested
608 * session name.
609 */
610 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
611 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
612 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
613 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
614 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
615 "Error creating LTTng live viewer session");
616 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
617 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
618 } else {
619 bt_common_abort();
620 }
621 goto end;
622 }
623 }
624 }
625
626 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
627 session =
628 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
629 status = lttng_live_get_session(lttng_live_msg_iter, session);
630 switch (status) {
631 case LTTNG_LIVE_ITERATOR_STATUS_OK:
632 case LTTNG_LIVE_ITERATOR_STATUS_END:
633 /*
634 * A session returned `_END`. Other sessions may still
635 * be active so we override the status and continue
636 * looping if needed.
637 */
638 break;
639 default:
640 goto end;
641 }
642 if (!session->closed) {
643 nr_sessions_opened++;
644 }
645 }
646
647 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE && nr_sessions_opened == 0) {
648 status = LTTNG_LIVE_ITERATOR_STATUS_END;
649 } else {
650 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
651 }
652
653 end:
654 return status;
655 }
656
657 static enum lttng_live_iterator_status
658 emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
659 struct lttng_live_stream_iterator *stream_iter, const bt_message **message,
660 uint64_t timestamp)
661 {
662 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
663 bt_logging_level log_level = lttng_live_msg_iter->log_level;
664 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
665 bt_message *msg = NULL;
666
667 BT_ASSERT(stream_iter->trace->clock_class);
668
669 BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64
670 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
671 stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
672
673 msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
674 stream_iter->trace->clock_class, timestamp);
675 if (!msg) {
676 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error emitting message iterator inactivity message");
677 goto error;
678 }
679
680 *message = msg;
681 end:
682 return ret;
683
684 error:
685 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
686 bt_message_put_ref(msg);
687 goto end;
688 }
689
690 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
691 struct lttng_live_msg_iter *lttng_live_msg_iter,
692 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
693 {
694 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
695
696 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
697 return LTTNG_LIVE_ITERATOR_STATUS_OK;
698 }
699
700 /*
701 * Check if we already sent an inactivty message downstream for this
702 * `current_inactivity_ts` value.
703 */
704 if (lttng_live_stream->last_inactivity_ts.is_set &&
705 lttng_live_stream->current_inactivity_ts == lttng_live_stream->last_inactivity_ts.value) {
706 lttng_live_stream_iterator_set_state(lttng_live_stream,
707 LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA);
708
709 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
710 goto end;
711 }
712
713 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message,
714 lttng_live_stream->current_inactivity_ts);
715
716 lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts;
717 lttng_live_stream->last_inactivity_ts.is_set = true;
718 end:
719 return ret;
720 }
721
722 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
723 const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
724 {
725 const bt_clock_snapshot *clock_snapshot = NULL;
726 int ret = 0;
727 bt_logging_level log_level = lttng_live_msg_iter->log_level;
728 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
729
730 BT_ASSERT_DBG(msg);
731 BT_ASSERT_DBG(ts_ns);
732
733 BT_COMP_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
734 "last-msg-ts=%" PRId64,
735 lttng_live_msg_iter, msg, last_msg_ts_ns);
736
737 switch (bt_message_get_type(msg)) {
738 case BT_MESSAGE_TYPE_EVENT:
739 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
740 break;
741 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
742 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
743 break;
744 case BT_MESSAGE_TYPE_PACKET_END:
745 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
746 break;
747 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
748 clock_snapshot =
749 bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
750 break;
751 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
752 clock_snapshot =
753 bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
754 break;
755 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
756 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
757 break;
758 default:
759 /* All the other messages have a higher priority */
760 BT_COMP_LOGD_STR("Message has no timestamp: using the last message timestamp.");
761 *ts_ns = last_msg_ts_ns;
762 goto end;
763 }
764
765 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
766 if (ret) {
767 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
768 "Cannot get nanoseconds from Epoch of clock snapshot: "
769 "clock-snapshot-addr=%p",
770 clock_snapshot);
771 goto error;
772 }
773
774 goto end;
775
776 error:
777 ret = -1;
778
779 end:
780 if (ret == 0) {
781 BT_COMP_LOGD("Found message's timestamp: "
782 "iter-data-addr=%p, msg-addr=%p, "
783 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
784 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
785 }
786
787 return ret;
788 }
789
790 static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
791 struct lttng_live_msg_iter *lttng_live_msg_iter,
792 struct lttng_live_stream_iterator *lttng_live_stream, const bt_message **message)
793 {
794 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
795 bt_logging_level log_level = lttng_live_msg_iter->log_level;
796 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
797 enum ctf_msg_iter_status status;
798 uint64_t session_idx, trace_idx;
799
800 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
801 struct lttng_live_session *session =
802 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
803
804 if (session->new_streams_needed) {
805 BT_COMP_LOGD("Need an update for streams: "
806 "session-id=%" PRIu64,
807 session->id);
808 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
809 goto end;
810 }
811 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
812 struct lttng_live_trace *trace =
813 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
814 if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
815 BT_COMP_LOGD("Need an update for metadata stream: "
816 "session-id=%" PRIu64 ", trace-id=%" PRIu64,
817 session->id, trace->id);
818 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
819 goto end;
820 }
821 }
822 }
823
824 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
825 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
826 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
827 "Invalid state of live stream iterator"
828 "stream-iter-status=%s",
829 lttng_live_stream_state_string(lttng_live_stream->state));
830 goto end;
831 }
832
833 status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter, message);
834 switch (status) {
835 case CTF_MSG_ITER_STATUS_EOF:
836 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
837 break;
838 case CTF_MSG_ITER_STATUS_OK:
839 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
840 break;
841 case CTF_MSG_ITER_STATUS_AGAIN:
842 /*
843 * Continue immediately (end of packet). The next
844 * get_index may return AGAIN to delay the following
845 * attempt.
846 */
847 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
848 break;
849 case CTF_MSG_ITER_STATUS_ERROR:
850 default:
851 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
852 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
853 "CTF message iterator failed to get next message: "
854 "msg-iter=%p, msg-iter-status=%s",
855 lttng_live_stream->msg_iter, ctf_msg_iter_status_string(status));
856 break;
857 }
858
859 end:
860 return ret;
861 }
862
863 static enum lttng_live_iterator_status
864 lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
865 struct lttng_live_stream_iterator *stream_iter,
866 const bt_message **curr_msg)
867 {
868 enum lttng_live_iterator_status live_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
869 bt_logging_level log_level = lttng_live_msg_iter->log_level;
870 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
871
872 BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", "
873 "viewer-stream-id=%" PRIu64,
874 stream_iter->name->str, stream_iter->viewer_stream_id);
875
876 /*
877 * The viewer has hung up on us so we are closing the stream. The
878 * `ctf_msg_iter` should simply realize that it needs to close the
879 * stream properly by emitting the necessary stream end message.
880 */
881 enum ctf_msg_iter_status status =
882 ctf_msg_iter_get_next_message(stream_iter->msg_iter, curr_msg);
883
884 if (status == CTF_MSG_ITER_STATUS_ERROR) {
885 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
886 "Error getting the next message from CTF message iterator");
887 live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
888 goto end;
889 } else if (status == CTF_MSG_ITER_STATUS_EOF) {
890 BT_COMP_LOGI("Reached the end of the live stream iterator.");
891 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
892 goto end;
893 }
894
895 BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
896
897 end:
898 return live_status;
899 }
900
901 /*
902 * helper function:
903 * handle_no_data_streams()
904 * retry:
905 * - for each ACTIVE_NO_DATA stream:
906 * - query relayd for stream data, or quiescence info.
907 * - if need metadata, get metadata, goto retry.
908 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
909 * - if quiescent, move to QUIESCENT streams
910 * - if fetched data, move to ACTIVE_DATA streams
911 * (at this point each stream either has data, or is quiescent)
912 *
913 *
914 * iterator_next:
915 * handle_new_streams_and_metadata()
916 * - query relayd for known streams, add them as ACTIVE_NO_DATA
917 * - query relayd for metadata
918 *
919 * call handle_active_no_data_streams()
920 *
921 * handle_quiescent_streams()
922 * - if at least one stream is ACTIVE_DATA:
923 * - peek stream event with lowest timestamp -> next_ts
924 * - for each quiescent stream
925 * - if next_ts >= quiescent end
926 * - set state to ACTIVE_NO_DATA
927 * - else
928 * - for each quiescent stream
929 * - set state to ACTIVE_NO_DATA
930 *
931 * call handle_active_no_data_streams()
932 *
933 * handle_active_data_streams()
934 * - if at least one stream is ACTIVE_DATA:
935 * - get stream event with lowest timestamp from heap
936 * - make that stream event the current message.
937 * - move this stream heap position to its next event
938 * - if we need to fetch data from relayd, move
939 * stream to ACTIVE_NO_DATA.
940 * - return OK
941 * - return AGAIN
942 *
943 * end criterion: ctrl-c on client. If relayd exits or the session
944 * closes on the relay daemon side, we keep on waiting for streams.
945 * Eventually handle --end timestamp (also an end criterion).
946 *
947 * When disconnected from relayd: try to re-connect endlessly.
948 */
949 static enum lttng_live_iterator_status
950 lttng_live_iterator_next_msg_on_stream(struct lttng_live_msg_iter *lttng_live_msg_iter,
951 struct lttng_live_stream_iterator *stream_iter,
952 const bt_message **curr_msg)
953 {
954 bt_logging_level log_level = lttng_live_msg_iter->log_level;
955 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
956 enum lttng_live_iterator_status live_status;
957
958 BT_COMP_LOGD("Advancing live stream iterator until next message if possible: "
959 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
960 stream_iter->name->str, stream_iter->viewer_stream_id);
961
962 if (stream_iter->has_stream_hung_up) {
963 /*
964 * The stream has hung up and the stream was properly closed
965 * during the last call to the current function. Return _END
966 * status now so that this stream iterator is removed for the
967 * stream iterator list.
968 */
969 live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
970 goto end;
971 }
972
973 retry:
974 LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter);
975
976 /*
977 * Make sure we have the most recent metadata and possibly some new
978 * streams.
979 */
980 live_status = lttng_live_iterator_handle_new_streams_and_metadata(lttng_live_msg_iter);
981 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
982 goto end;
983 }
984
985 live_status =
986 lttng_live_iterator_next_handle_one_no_data_stream(lttng_live_msg_iter, stream_iter);
987 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
988 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
989 /*
990 * We overwrite `live_status` since `curr_msg` is
991 * likely set to a valid message in this function.
992 */
993 live_status =
994 lttng_live_iterator_close_stream(lttng_live_msg_iter, stream_iter, curr_msg);
995 }
996 goto end;
997 }
998
999 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(lttng_live_msg_iter,
1000 stream_iter, curr_msg);
1001 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1002 BT_ASSERT(!*curr_msg);
1003 goto end;
1004 }
1005 if (*curr_msg) {
1006 goto end;
1007 }
1008 live_status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live_msg_iter,
1009 stream_iter, curr_msg);
1010 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1011 BT_ASSERT(!*curr_msg);
1012 }
1013
1014 end:
1015 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
1016 BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams");
1017 goto retry;
1018 }
1019
1020 BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, "
1021 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1022 lttng_live_iterator_status_string(live_status), stream_iter->name->str,
1023 stream_iter->viewer_stream_id);
1024
1025 return live_status;
1026 }
1027
1028 static bool is_discarded_packet_or_event_message(const bt_message *msg)
1029 {
1030 const enum bt_message_type msg_type = bt_message_get_type(msg);
1031
1032 return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS ||
1033 msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS;
1034 }
1035
1036 static enum lttng_live_iterator_status
1037 adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
1038 const bt_message *msg_in, bt_message **msg_out,
1039 uint64_t new_begin_ts)
1040 {
1041 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1042 enum bt_property_availability availability;
1043 const bt_clock_snapshot *clock_snapshot;
1044 uint64_t end_ts;
1045 uint64_t count;
1046
1047 clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
1048 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1049
1050 availability = bt_message_discarded_packets_get_count(msg_in, &count);
1051 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1052
1053 *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots(
1054 iter, stream, new_begin_ts, end_ts);
1055 if (!*msg_out) {
1056 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1057 goto end;
1058 }
1059
1060 bt_message_discarded_packets_set_count(*msg_out, count);
1061 end:
1062 return status;
1063 }
1064
1065 static enum lttng_live_iterator_status
1066 adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
1067 const bt_message *msg_in, bt_message **msg_out,
1068 uint64_t new_begin_ts)
1069 {
1070 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1071 enum bt_property_availability availability;
1072 const bt_clock_snapshot *clock_snapshot;
1073 uint64_t end_ts;
1074 uint64_t count;
1075
1076 clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
1077 end_ts = bt_clock_snapshot_get_value(clock_snapshot);
1078
1079 availability = bt_message_discarded_events_get_count(msg_in, &count);
1080 BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
1081
1082 *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots(
1083 iter, stream, new_begin_ts, end_ts);
1084 if (!*msg_out) {
1085 status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
1086 goto end;
1087 }
1088
1089 bt_message_discarded_events_set_count(*msg_out, count);
1090 end:
1091 return status;
1092 }
1093
1094 static enum lttng_live_iterator_status
1095 handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
1096 struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
1097 const bt_message *late_msg)
1098 {
1099 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1100 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1101 const bt_clock_class *clock_class;
1102 const bt_stream_class *stream_class;
1103 enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
1104 int64_t last_inactivity_ts_ns;
1105 enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1106 enum lttng_live_iterator_status adjust_status;
1107 bt_message *adjusted_message;
1108
1109 /*
1110 * The timestamp of the current message is before the last message sent
1111 * by this component. We CANNOT send it as is.
1112 *
1113 * The only expected scenario in which that could happen is the
1114 * following, everything else is a bug in this component, relay deamon,
1115 * or CTF parser.
1116 *
1117 * Expected scenario: The CTF message iterator emitted discarded
1118 * packets and discarded events with synthesized beginning and end
1119 * timestamps from the bounds of the last known packet and the newly
1120 * decoded packet header. The CTF message iterator is not aware of
1121 * stream inactivity beacons. Hence, we have to adjust the beginning
1122 * timestamp of those types of messages if a stream signalled its
1123 * inactivity up until _after_ the last known packet's beginning
1124 * timestamp.
1125 *
1126 * Otherwise, the monotonicity guarantee of message timestamps would
1127 * not be preserved.
1128 *
1129 * In short, the only scenario in which it's okay and fixable to
1130 * received a late message is when:
1131 * 1. the late message is a discarded packets or discarded events
1132 * message,
1133 * 2. this stream produced an inactivity message downstream, and
1134 * 3. the timestamp of the late message is within the inactivity
1135 * timespan we sent downstream through the inactivity message.
1136 */
1137
1138 BT_COMP_LOGD("Handling late message on live stream iterator: "
1139 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1140 stream_iter->name->str, stream_iter->viewer_stream_id);
1141
1142 if (!stream_iter->last_inactivity_ts.is_set) {
1143 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: "
1144 "have a late message when no inactivity message "
1145 "was ever sent for that stream.");
1146 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1147 goto end;
1148 }
1149
1150 if (!is_discarded_packet_or_event_message(late_msg)) {
1151 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1152 "Invalid live stream state: "
1153 "have a late message that is not a packet discarded or "
1154 "event discarded message: late-msg-type=%s",
1155 bt_common_message_type_string(bt_message_get_type(late_msg)));
1156 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1157 goto end;
1158 }
1159
1160 stream_class = bt_stream_borrow_class_const(stream_iter->stream);
1161 clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
1162
1163 ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
1164 clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
1165 if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
1166 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error converting last "
1167 "inactivity message timestamp to nanoseconds");
1168 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1169 goto end;
1170 }
1171
1172 if (last_inactivity_ts_ns <= late_msg_ts_ns) {
1173 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1174 "Invalid live stream state: "
1175 "have a late message that is none included in a stream "
1176 "inactivity timespan: last-inactivity-ts-ns=%" PRIu64
1177 "late-msg-ts-ns=%" PRIu64,
1178 last_inactivity_ts_ns, late_msg_ts_ns);
1179 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1180 goto end;
1181 }
1182
1183 /*
1184 * We now know that it's okay for this message to be late, we can now
1185 * adjust its timestamp to ensure monotonicity.
1186 */
1187 BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
1188 "msg-new-ts-ns=%" PRIu64,
1189 bt_common_message_type_string(bt_message_get_type(late_msg)),
1190 stream_iter->last_inactivity_ts.value);
1191 switch (bt_message_get_type(late_msg)) {
1192 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1193 adjust_status = adjust_discarded_events_message(
1194 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1195 stream_iter->last_inactivity_ts.value);
1196 break;
1197 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1198 adjust_status = adjust_discarded_packets_message(
1199 lttng_live_msg_iter->self_msg_iter, stream_iter->stream, late_msg, &adjusted_message,
1200 stream_iter->last_inactivity_ts.value);
1201 break;
1202 default:
1203 bt_common_abort();
1204 }
1205
1206 if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1207 stream_iter_status = adjust_status;
1208 goto end;
1209 }
1210
1211 BT_ASSERT_DBG(adjusted_message);
1212 stream_iter->current_msg = adjusted_message;
1213 stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
1214 bt_message_put_ref(late_msg);
1215
1216 end:
1217 return stream_iter_status;
1218 }
1219
1220 static enum lttng_live_iterator_status
1221 next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
1222 struct lttng_live_trace *live_trace,
1223 struct lttng_live_stream_iterator **youngest_trace_stream_iter)
1224 {
1225 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1226 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1227 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1228 enum lttng_live_iterator_status stream_iter_status;
1229 ;
1230 int64_t youngest_candidate_msg_ts = INT64_MAX;
1231 uint64_t stream_iter_idx;
1232
1233 BT_ASSERT_DBG(live_trace);
1234 BT_ASSERT_DBG(live_trace->stream_iterators);
1235
1236 BT_COMP_LOGD("Finding the next stream iterator for trace: "
1237 "trace-id=%" PRIu64,
1238 live_trace->id);
1239 /*
1240 * Update the current message of every stream iterators of this trace.
1241 * The current msg of every stream must have a timestamp equal or
1242 * larger than the last message returned by this iterator. We must
1243 * ensure monotonicity.
1244 */
1245 stream_iter_idx = 0;
1246 while (stream_iter_idx < live_trace->stream_iterators->len) {
1247 bool stream_iter_is_ended = false;
1248 struct lttng_live_stream_iterator *stream_iter =
1249 (lttng_live_stream_iterator *) g_ptr_array_index(live_trace->stream_iterators,
1250 stream_iter_idx);
1251
1252 /*
1253 * If there is no current message for this stream, go fetch
1254 * one.
1255 */
1256 while (!stream_iter->current_msg) {
1257 const bt_message *msg = NULL;
1258 int64_t curr_msg_ts_ns = INT64_MAX;
1259
1260 stream_iter_status =
1261 lttng_live_iterator_next_msg_on_stream(lttng_live_msg_iter, stream_iter, &msg);
1262
1263 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1264 stream_iter_is_ended = true;
1265 break;
1266 }
1267
1268 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1269 goto end;
1270 }
1271
1272 BT_ASSERT_DBG(msg);
1273
1274 BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, "
1275 "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
1276 bt_common_message_type_string(bt_message_get_type(msg)),
1277 stream_iter->name->str, stream_iter->viewer_stream_id);
1278
1279 /*
1280 * Get the timestamp in nanoseconds from origin of this
1281 * messsage.
1282 */
1283 live_get_msg_ts_ns(lttng_live_msg_iter, msg, lttng_live_msg_iter->last_msg_ts_ns,
1284 &curr_msg_ts_ns);
1285
1286 /*
1287 * Check if the message of the current live stream
1288 * iterator occurred at the exact same time or after the
1289 * last message returned by this component's message
1290 * iterator. If not, we need to handle it with care.
1291 */
1292 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
1293 stream_iter->current_msg = msg;
1294 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
1295 } else {
1296 /*
1297 * We received a message from the past. This
1298 * may be fixable but it can also be an error.
1299 */
1300 stream_iter_status =
1301 handle_late_message(lttng_live_msg_iter, stream_iter, curr_msg_ts_ns, msg);
1302 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1303 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1304 "Late message could not be handled correctly: "
1305 "lttng-live-msg-iter-addr=%p, "
1306 "stream-name=\"%s\", "
1307 "curr-msg-ts=%" PRId64 ", last-msg-ts=%" PRId64,
1308 lttng_live_msg_iter, stream_iter->name->str,
1309 curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns);
1310 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1311 goto end;
1312 }
1313 }
1314 }
1315
1316 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1317
1318 if (!stream_iter_is_ended) {
1319 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1320 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1321 /*
1322 * Update the current best candidate message
1323 * for the stream iterator of this live trace
1324 * to be forwarded downstream.
1325 */
1326 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1327 youngest_candidate_stream_iter = stream_iter;
1328 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1329 /*
1330 * Order the messages in an arbitrary but
1331 * deterministic way.
1332 */
1333 BT_ASSERT_DBG(stream_iter != youngest_candidate_stream_iter);
1334 int ret = common_muxing_compare_messages(
1335 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1336 if (ret < 0) {
1337 /*
1338 * The `youngest_candidate_stream_iter->current_msg`
1339 * should go first. Update the next
1340 * iterator and the current timestamp.
1341 */
1342 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1343 youngest_candidate_stream_iter = stream_iter;
1344 } else if (ret == 0) {
1345 /*
1346 * Unable to pick which one should go
1347 * first.
1348 */
1349 BT_COMP_LOGW(
1350 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1351 "stream-iter-addr=%p"
1352 "stream-iter-addr=%p",
1353 stream_iter, youngest_candidate_stream_iter);
1354 }
1355 }
1356
1357 stream_iter_idx++;
1358 } else {
1359 /*
1360 * The live stream iterator has ended. That
1361 * iterator is removed from the array, but
1362 * there is no need to increment
1363 * stream_iter_idx as
1364 * g_ptr_array_remove_index_fast replaces the
1365 * removed element with the array's last
1366 * element.
1367 */
1368 g_ptr_array_remove_index_fast(live_trace->stream_iterators, stream_iter_idx);
1369 }
1370 }
1371
1372 if (youngest_candidate_stream_iter) {
1373 *youngest_trace_stream_iter = youngest_candidate_stream_iter;
1374 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1375 } else {
1376 /*
1377 * The only case where we don't have a candidate for this trace
1378 * is if we reached the end of all the iterators.
1379 */
1380 BT_ASSERT(live_trace->stream_iterators->len == 0);
1381 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1382 }
1383
1384 end:
1385 return stream_iter_status;
1386 }
1387
1388 static enum lttng_live_iterator_status
1389 next_stream_iterator_for_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
1390 struct lttng_live_session *session,
1391 struct lttng_live_stream_iterator **youngest_session_stream_iter)
1392 {
1393 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1394 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1395 enum lttng_live_iterator_status stream_iter_status;
1396 uint64_t trace_idx = 0;
1397 int64_t youngest_candidate_msg_ts = INT64_MAX;
1398 struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
1399
1400 BT_COMP_LOGD("Finding the next stream iterator for session: "
1401 "session-id=%" PRIu64,
1402 session->id);
1403 /*
1404 * Make sure we are attached to the session and look for new streams
1405 * and metadata.
1406 */
1407 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1408 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1409 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1410 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1411 goto end;
1412 }
1413
1414 BT_ASSERT_DBG(session->traces);
1415
1416 while (trace_idx < session->traces->len) {
1417 bool trace_is_ended = false;
1418 struct lttng_live_stream_iterator *stream_iter;
1419 struct lttng_live_trace *trace =
1420 (lttng_live_trace *) g_ptr_array_index(session->traces, trace_idx);
1421
1422 stream_iter_status =
1423 next_stream_iterator_for_trace(lttng_live_msg_iter, trace, &stream_iter);
1424 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1425 /*
1426 * All the live stream iterators for this trace are
1427 * ENDed. Remove the trace from this session.
1428 */
1429 trace_is_ended = true;
1430 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1431 goto end;
1432 }
1433
1434 if (!trace_is_ended) {
1435 BT_ASSERT_DBG(stream_iter);
1436
1437 if (G_UNLIKELY(youngest_candidate_stream_iter == NULL) ||
1438 stream_iter->current_msg_ts_ns < youngest_candidate_msg_ts) {
1439 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1440 youngest_candidate_stream_iter = stream_iter;
1441 } else if (stream_iter->current_msg_ts_ns == youngest_candidate_msg_ts) {
1442 /*
1443 * Order the messages in an arbitrary but
1444 * deterministic way.
1445 */
1446 int ret = common_muxing_compare_messages(
1447 stream_iter->current_msg, youngest_candidate_stream_iter->current_msg);
1448 if (ret < 0) {
1449 /*
1450 * The `youngest_candidate_stream_iter->current_msg`
1451 * should go first. Update the next iterator
1452 * and the current timestamp.
1453 */
1454 youngest_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1455 youngest_candidate_stream_iter = stream_iter;
1456 } else if (ret == 0) {
1457 /* Unable to pick which one should go first. */
1458 BT_COMP_LOGW(
1459 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1460 "stream-iter-addr=%p"
1461 "stream-iter-addr=%p",
1462 stream_iter, youngest_candidate_stream_iter);
1463 }
1464 }
1465 trace_idx++;
1466 } else {
1467 /*
1468 * trace_idx is not incremented since
1469 * g_ptr_array_remove_index_fast replaces the
1470 * element at trace_idx with the array's last element.
1471 */
1472 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1473 }
1474 }
1475 if (youngest_candidate_stream_iter) {
1476 *youngest_session_stream_iter = youngest_candidate_stream_iter;
1477 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1478 } else {
1479 /*
1480 * The only cases where we don't have a candidate for this
1481 * trace is:
1482 * 1. if we reached the end of all the iterators of all the
1483 * traces of this session,
1484 * 2. if we never had live stream iterator in the first place.
1485 *
1486 * In either cases, we return END.
1487 */
1488 BT_ASSERT(session->traces->len == 0);
1489 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1490 }
1491 end:
1492 return stream_iter_status;
1493 }
1494
1495 static inline void put_messages(bt_message_array_const msgs, uint64_t count)
1496 {
1497 uint64_t i;
1498
1499 for (i = 0; i < count; i++) {
1500 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1501 }
1502 }
1503
1504 bt_message_iterator_class_next_method_status
1505 lttng_live_msg_iter_next(bt_self_message_iterator *self_msg_it, bt_message_array_const msgs,
1506 uint64_t capacity, uint64_t *count)
1507 {
1508 bt_message_iterator_class_next_method_status status;
1509 enum lttng_live_viewer_status viewer_status;
1510 struct lttng_live_msg_iter *lttng_live_msg_iter =
1511 (struct lttng_live_msg_iter *) bt_self_message_iterator_get_data(self_msg_it);
1512 struct lttng_live_component *lttng_live = lttng_live_msg_iter->lttng_live_comp;
1513 bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
1514 bt_logging_level log_level = lttng_live_msg_iter->log_level;
1515 enum lttng_live_iterator_status stream_iter_status;
1516 uint64_t session_idx;
1517
1518 *count = 0;
1519
1520 BT_ASSERT_DBG(lttng_live_msg_iter);
1521
1522 if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
1523 /*
1524 * The iterator was interrupted in a previous call to the
1525 * `_next()` method. We currently do not support generating
1526 * messages after such event. The babeltrace2 CLI should never
1527 * be running the graph after being interrupted. So this check
1528 * is to prevent other graph users from using this live
1529 * iterator in an messed up internal state.
1530 */
1531 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1532 BT_COMP_LOGE_APPEND_CAUSE(
1533 self_comp,
1534 "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
1535 goto end;
1536 }
1537
1538 /*
1539 * Clear all the invalid message reference that might be left over in
1540 * the output array.
1541 */
1542 memset(msgs, 0, capacity * sizeof(*msgs));
1543
1544 /*
1545 * If no session are exposed on the relay found at the url provided by
1546 * the user, session count will be 0. In this case, we return status
1547 * end to return gracefully.
1548 */
1549 if (lttng_live_msg_iter->sessions->len == 0) {
1550 if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
1551 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1552 goto end;
1553 } else {
1554 /*
1555 * The are no more active session for this session
1556 * name. Retry to create a viewer session for the
1557 * requested session name.
1558 */
1559 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1560 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1561 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1562 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1563 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1564 "Error creating LTTng live viewer session");
1565 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1566 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1567 } else {
1568 bt_common_abort();
1569 }
1570 goto end;
1571 }
1572 }
1573 }
1574
1575 if (lttng_live_msg_iter->active_stream_iter == 0) {
1576 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1577 }
1578
1579 /*
1580 * Here the muxing of message is done.
1581 *
1582 * We need to iterate over all the streams of all the traces of all the
1583 * viewer sessions in order to get the message with the smallest
1584 * timestamp. In this case, a session is a viewer session and there is
1585 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1586 * kernel). Each viewer session can have multiple traces, for example,
1587 * 64bit UST viewer sessions could have multiple per-pid traces.
1588 *
1589 * We iterate over the streams of each traces to update and see what is
1590 * their next message's timestamp. From those timestamps, we select the
1591 * message with the smallest timestamp as the best candidate message
1592 * for that trace and do the same thing across all the sessions.
1593 *
1594 * We then compare the timestamp of best candidate message of all the
1595 * sessions to pick the message with the smallest timestamp and we
1596 * return it.
1597 */
1598 while (*count < capacity) {
1599 struct lttng_live_stream_iterator *youngest_stream_iter = NULL,
1600 *candidate_stream_iter = NULL;
1601 int64_t youngest_msg_ts_ns = INT64_MAX;
1602
1603 BT_ASSERT_DBG(lttng_live_msg_iter->sessions);
1604 session_idx = 0;
1605 while (session_idx < lttng_live_msg_iter->sessions->len) {
1606 struct lttng_live_session *session = (lttng_live_session *) g_ptr_array_index(
1607 lttng_live_msg_iter->sessions, session_idx);
1608
1609 /* Find the best candidate message to send downstream. */
1610 stream_iter_status = next_stream_iterator_for_session(lttng_live_msg_iter, session,
1611 &candidate_stream_iter);
1612
1613 /* If we receive an END status, it means that either:
1614 * - Those traces never had active streams (UST with no
1615 * data produced yet),
1616 * - All live stream iterators have ENDed.*/
1617 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1618 if (session->closed && session->traces->len == 0) {
1619 /*
1620 * Remove the session from the list.
1621 * session_idx is not modified since
1622 * g_ptr_array_remove_index_fast
1623 * replaces the the removed element with
1624 * the array's last element.
1625 */
1626 g_ptr_array_remove_index_fast(lttng_live_msg_iter->sessions, session_idx);
1627 } else {
1628 session_idx++;
1629 }
1630 continue;
1631 }
1632
1633 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1634 goto return_status;
1635 }
1636
1637 if (G_UNLIKELY(youngest_stream_iter == NULL) ||
1638 candidate_stream_iter->current_msg_ts_ns < youngest_msg_ts_ns) {
1639 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1640 youngest_stream_iter = candidate_stream_iter;
1641 } else if (candidate_stream_iter->current_msg_ts_ns == youngest_msg_ts_ns) {
1642 /*
1643 * The currently selected message to be sent
1644 * downstream next has the exact same timestamp
1645 * that of the current candidate message. We
1646 * must break the tie in a predictable manner.
1647 */
1648 BT_COMP_LOGD_STR(
1649 "Two of the next message candidates have the same timestamps, pick one deterministically.");
1650 /*
1651 * Order the messages in an arbitrary but
1652 * deterministic way.
1653 */
1654 int ret = common_muxing_compare_messages(candidate_stream_iter->current_msg,
1655 youngest_stream_iter->current_msg);
1656 if (ret < 0) {
1657 /*
1658 * The `candidate_stream_iter->current_msg`
1659 * should go first. Update the next
1660 * iterator and the current timestamp.
1661 */
1662 youngest_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1663 youngest_stream_iter = candidate_stream_iter;
1664 } else if (ret == 0) {
1665 /* Unable to pick which one should go first. */
1666 BT_COMP_LOGW(
1667 "Cannot deterministically pick next live stream message iterator because they have identical next messages: "
1668 "next-stream-iter-addr=%p"
1669 "candidate-stream-iter-addr=%p",
1670 youngest_stream_iter, candidate_stream_iter);
1671 }
1672 }
1673
1674 session_idx++;
1675 }
1676
1677 if (!youngest_stream_iter) {
1678 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1679 goto return_status;
1680 }
1681
1682 BT_ASSERT_DBG(youngest_stream_iter->current_msg);
1683 /* Ensure monotonicity. */
1684 BT_ASSERT_DBG(lttng_live_msg_iter->last_msg_ts_ns <=
1685 youngest_stream_iter->current_msg_ts_ns);
1686
1687 /*
1688 * Insert the next message to the message batch. This will set
1689 * stream iterator current messsage to NULL so that next time
1690 * we fetch the next message of that stream iterator
1691 */
1692 BT_MESSAGE_MOVE_REF(msgs[*count], youngest_stream_iter->current_msg);
1693 (*count)++;
1694
1695 /* Update the last timestamp in nanoseconds sent downstream. */
1696 lttng_live_msg_iter->last_msg_ts_ns = youngest_msg_ts_ns;
1697 youngest_stream_iter->current_msg_ts_ns = INT64_MAX;
1698
1699 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1700 }
1701
1702 return_status:
1703 switch (stream_iter_status) {
1704 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1705 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1706 /*
1707 * If we gathered messages, return _OK even if the graph was
1708 * interrupted. This allows for the components downstream to at
1709 * least get the thoses messages. If the graph was indeed
1710 * interrupted there should not be another _next() call as the
1711 * application will tear down the graph. This component class
1712 * doesn't support restarting after an interruption.
1713 */
1714 if (*count > 0) {
1715 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1716 } else {
1717 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
1718 }
1719 break;
1720 case LTTNG_LIVE_ITERATOR_STATUS_END:
1721 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1722 break;
1723 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1724 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1725 "Memory error preparing the next batch of messages: "
1726 "live-iter-status=%s",
1727 lttng_live_iterator_status_string(stream_iter_status));
1728 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1729 break;
1730 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1731 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1732 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1733 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1734 "Error preparing the next batch of messages: "
1735 "live-iter-status=%s",
1736 lttng_live_iterator_status_string(stream_iter_status));
1737
1738 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1739 /* Put all existing messages on error. */
1740 put_messages(msgs, *count);
1741 break;
1742 default:
1743 bt_common_abort();
1744 }
1745
1746 end:
1747 return status;
1748 }
1749
1750 static struct lttng_live_msg_iter *
1751 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
1752 bt_self_message_iterator *self_msg_it)
1753 {
1754 bt_self_component *self_comp = lttng_live_comp->self_comp;
1755 bt_logging_level log_level = lttng_live_comp->log_level;
1756
1757 struct lttng_live_msg_iter *lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1758 if (!lttng_live_msg_iter) {
1759 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to allocate lttng_live_msg_iter");
1760 goto end;
1761 }
1762
1763 lttng_live_msg_iter->log_level = lttng_live_comp->log_level;
1764 lttng_live_msg_iter->self_comp = lttng_live_comp->self_comp;
1765 lttng_live_msg_iter->lttng_live_comp = lttng_live_comp;
1766 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1767
1768 lttng_live_msg_iter->active_stream_iter = 0;
1769 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1770 lttng_live_msg_iter->was_interrupted = false;
1771
1772 lttng_live_msg_iter->sessions =
1773 g_ptr_array_new_with_free_func((GDestroyNotify) lttng_live_destroy_session);
1774 BT_ASSERT(lttng_live_msg_iter->sessions);
1775
1776 end:
1777 return lttng_live_msg_iter;
1778 }
1779
1780 bt_message_iterator_class_initialize_method_status
1781 lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
1782 bt_self_message_iterator_configuration *, bt_self_component_port_output *)
1783 {
1784 bt_message_iterator_class_initialize_method_status status;
1785 struct lttng_live_component *lttng_live;
1786 struct lttng_live_msg_iter *lttng_live_msg_iter;
1787 enum lttng_live_viewer_status viewer_status;
1788 bt_logging_level log_level;
1789 bt_self_component *self_comp = bt_self_message_iterator_borrow_component(self_msg_it);
1790
1791 lttng_live = (lttng_live_component *) bt_self_component_get_data(self_comp);
1792 log_level = lttng_live->log_level;
1793 self_comp = lttng_live->self_comp;
1794
1795 /* There can be only one downstream iterator at the same time. */
1796 BT_ASSERT(!lttng_live->has_msg_iter);
1797 lttng_live->has_msg_iter = true;
1798
1799 lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it);
1800 if (!lttng_live_msg_iter) {
1801 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter");
1802 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1803 goto error;
1804 }
1805
1806 viewer_status = live_viewer_connection_create(
1807 self_comp, NULL, log_level, lttng_live->params.url->str, false, lttng_live_msg_iter,
1808 &lttng_live_msg_iter->viewer_connection);
1809 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1810 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1811 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer connection");
1812 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1813 /*
1814 * Interruption in the _iter_init() method is not
1815 * supported. Return an error.
1816 */
1817 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted while creating viewer connection");
1818 }
1819
1820 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1821 goto error;
1822 }
1823
1824 viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
1825 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1826 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1827 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create viewer session");
1828 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1829 /*
1830 * Interruption in the _iter_init() method is not
1831 * supported. Return an error.
1832 */
1833 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Interrupted when creating viewer session");
1834 }
1835
1836 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1837 goto error;
1838 }
1839
1840 if (lttng_live_msg_iter->sessions->len == 0) {
1841 switch (lttng_live->params.sess_not_found_act) {
1842 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1843 BT_COMP_LOGI(
1844 "Unable to connect to the requested live viewer session. Keep trying to connect because of "
1845 "%s=\"%s\" component parameter: url=\"%s\"",
1846 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1847 lttng_live->params.url->str);
1848 break;
1849 case SESSION_NOT_FOUND_ACTION_FAIL:
1850 BT_COMP_LOGE_APPEND_CAUSE(
1851 self_comp,
1852 "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
1853 "component parameter: url =\"%s\"",
1854 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_FAIL_STR,
1855 lttng_live->params.url->str);
1856 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1857 goto error;
1858 case SESSION_NOT_FOUND_ACTION_END:
1859 BT_COMP_LOGI(
1860 "Unable to connect to the requested live viewer session. End gracefully at the first _next() "
1861 "call because of %s=\"%s\" component parameter: "
1862 "url=\"%s\"",
1863 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_END_STR,
1864 lttng_live->params.url->str);
1865 break;
1866 default:
1867 bt_common_abort();
1868 }
1869 }
1870
1871 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1872 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1873 goto end;
1874
1875 error:
1876 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1877 end:
1878 return status;
1879 }
1880
1881 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
1882 {URL_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
1883 bt_param_validation_value_descr::makeString()},
1884 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
1885
1886 static bt_component_class_query_method_status
1887 lttng_live_query_list_sessions(const bt_value *params, const bt_value **result,
1888 bt_self_component_class *self_comp_class, bt_logging_level log_level)
1889 {
1890 bt_component_class_query_method_status status;
1891 const bt_value *url_value = NULL;
1892 const char *url;
1893 struct live_viewer_connection *viewer_connection = NULL;
1894 enum lttng_live_viewer_status viewer_status;
1895 enum bt_param_validation_status validation_status;
1896 gchar *validate_error = NULL;
1897
1898 validation_status = bt_param_validation_validate(params, list_sessions_params, &validate_error);
1899 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
1900 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
1901 goto error;
1902 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
1903 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1904 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "%s", validate_error);
1905 goto error;
1906 }
1907
1908 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1909 url = bt_value_string_get(url_value);
1910
1911 viewer_status = live_viewer_connection_create(NULL, self_comp_class, log_level, url, true, NULL,
1912 &viewer_connection);
1913 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1914 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1915 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to create viewer connection");
1916 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1917 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1918 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
1919 } else {
1920 bt_common_abort();
1921 }
1922 goto error;
1923 }
1924
1925 status = live_viewer_connection_list_sessions(viewer_connection, result);
1926 if (status != BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK) {
1927 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Failed to list viewer sessions");
1928 goto error;
1929 }
1930
1931 goto end;
1932
1933 error:
1934 BT_VALUE_PUT_REF_AND_RESET(*result);
1935
1936 if (status >= 0) {
1937 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
1938 }
1939
1940 end:
1941 if (viewer_connection) {
1942 live_viewer_connection_destroy(viewer_connection);
1943 }
1944
1945 g_free(validate_error);
1946
1947 return status;
1948 }
1949
1950 static bt_component_class_query_method_status
1951 lttng_live_query_support_info(const bt_value *params, const bt_value **result,
1952 bt_self_component_class *self_comp_class, bt_logging_level log_level)
1953 {
1954 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
1955 const bt_value *input_type_value;
1956 const bt_value *input_value;
1957 double weight = 0;
1958 struct bt_common_lttng_live_url_parts parts = {};
1959
1960 /* Used by the logging macros */
1961 __attribute__((unused)) bt_self_component *self_comp = NULL;
1962
1963 *result = NULL;
1964 input_type_value = bt_value_map_borrow_entry_value_const(params, "type");
1965 if (!input_type_value) {
1966 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `type` parameter.");
1967 goto error;
1968 }
1969
1970 if (!bt_value_is_string(input_type_value)) {
1971 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "`type` parameter is not a string value.");
1972 goto error;
1973 }
1974
1975 if (strcmp(bt_value_string_get(input_type_value), "string") != 0) {
1976 /* We don't handle file system paths */
1977 goto create_result;
1978 }
1979
1980 input_value = bt_value_map_borrow_entry_value_const(params, "input");
1981 if (!input_value) {
1982 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Missing expected `input` parameter.");
1983 goto error;
1984 }
1985
1986 if (!bt_value_is_string(input_value)) {
1987 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
1988 "`input` parameter is not a string value.");
1989 goto error;
1990 }
1991
1992 parts = bt_common_parse_lttng_live_url(bt_value_string_get(input_value), NULL, 0);
1993 if (parts.session_name) {
1994 /*
1995 * Looks pretty much like an LTTng live URL: we got the
1996 * session name part, which forms a complete URL.
1997 */
1998 weight = .75;
1999 }
2000
2001 create_result:
2002 *result = bt_value_real_create_init(weight);
2003 if (!*result) {
2004 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
2005 goto error;
2006 }
2007
2008 goto end;
2009
2010 error:
2011 if (status >= 0) {
2012 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
2013 }
2014
2015 BT_ASSERT(!*result);
2016
2017 end:
2018 bt_common_destroy_lttng_live_url_parts(&parts);
2019 return status;
2020 }
2021
2022 bt_component_class_query_method_status lttng_live_query(bt_self_component_class_source *comp_class,
2023 bt_private_query_executor *priv_query_exec,
2024 const char *object, const bt_value *params,
2025 __attribute__((unused)) void *method_data,
2026 const bt_value **result)
2027 {
2028 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
2029 bt_self_component *self_comp = NULL;
2030 bt_self_component_class *self_comp_class =
2031 bt_self_component_class_source_as_self_component_class(comp_class);
2032 bt_logging_level log_level = bt_query_executor_get_logging_level(
2033 bt_private_query_executor_as_query_executor_const(priv_query_exec));
2034
2035 if (strcmp(object, "sessions") == 0) {
2036 status = lttng_live_query_list_sessions(params, result, self_comp_class, log_level);
2037 } else if (strcmp(object, "babeltrace.support-info") == 0) {
2038 status = lttng_live_query_support_info(params, result, self_comp_class, log_level);
2039 } else {
2040 BT_COMP_LOGI("Unknown query object `%s`", object);
2041 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_UNKNOWN_OBJECT;
2042 goto end;
2043 }
2044
2045 end:
2046 return status;
2047 }
2048
2049 static void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
2050 {
2051 if (!lttng_live) {
2052 return;
2053 }
2054 if (lttng_live->params.url) {
2055 g_string_free(lttng_live->params.url, TRUE);
2056 }
2057 g_free(lttng_live);
2058 }
2059
2060 void lttng_live_component_finalize(bt_self_component_source *component)
2061 {
2062 lttng_live_component *data = (lttng_live_component *) bt_self_component_get_data(
2063 bt_self_component_source_as_self_component(component));
2064
2065 if (!data) {
2066 return;
2067 }
2068 lttng_live_component_destroy_data(data);
2069 }
2070
2071 static enum session_not_found_action
2072 parse_session_not_found_action_param(const bt_value *no_session_param)
2073 {
2074 enum session_not_found_action action;
2075 const char *no_session_act_str = bt_value_string_get(no_session_param);
2076
2077 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
2078 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
2079 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
2080 action = SESSION_NOT_FOUND_ACTION_FAIL;
2081 } else {
2082 BT_ASSERT(strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0);
2083 action = SESSION_NOT_FOUND_ACTION_END;
2084 }
2085
2086 return action;
2087 }
2088
2089 static bt_param_validation_value_descr inputs_elem_descr =
2090 bt_param_validation_value_descr::makeString();
2091
2092 static const char *sess_not_found_action_choices[] = {
2093 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
2094 SESS_NOT_FOUND_ACTION_FAIL_STR,
2095 SESS_NOT_FOUND_ACTION_END_STR,
2096 };
2097
2098 static struct bt_param_validation_map_value_entry_descr params_descr[] = {
2099 {INPUTS_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY,
2100 bt_param_validation_value_descr::makeArray(1, 1, inputs_elem_descr)},
2101 {SESS_NOT_FOUND_ACTION_PARAM, BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL,
2102 bt_param_validation_value_descr::makeString(sess_not_found_action_choices)},
2103 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END};
2104
2105 static bt_component_class_initialize_method_status
2106 lttng_live_component_create(const bt_value *params, bt_logging_level log_level,
2107 bt_self_component *self_comp, struct lttng_live_component **component)
2108 {
2109 struct lttng_live_component *lttng_live = NULL;
2110 const bt_value *inputs_value;
2111 const bt_value *url_value;
2112 const bt_value *value;
2113 const char *url;
2114 enum bt_param_validation_status validation_status;
2115 gchar *validation_error = NULL;
2116 bt_component_class_initialize_method_status status;
2117
2118 validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
2119 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
2120 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2121 goto error;
2122 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
2123 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validation_error);
2124 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
2125 goto error;
2126 }
2127
2128 lttng_live = g_new0(struct lttng_live_component, 1);
2129 if (!lttng_live) {
2130 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2131 goto end;
2132 }
2133 lttng_live->log_level = log_level;
2134 lttng_live->self_comp = self_comp;
2135 lttng_live->max_query_size = MAX_QUERY_SIZE;
2136 lttng_live->has_msg_iter = false;
2137
2138 inputs_value = bt_value_map_borrow_entry_value_const(params, INPUTS_PARAM);
2139 url_value = bt_value_array_borrow_element_by_index_const(inputs_value, 0);
2140 url = bt_value_string_get(url_value);
2141
2142 lttng_live->params.url = g_string_new(url);
2143 if (!lttng_live->params.url) {
2144 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
2145 goto error;
2146 }
2147
2148 value = bt_value_map_borrow_entry_value_const(params, SESS_NOT_FOUND_ACTION_PARAM);
2149 if (value) {
2150 lttng_live->params.sess_not_found_act = parse_session_not_found_action_param(value);
2151 } else {
2152 BT_COMP_LOGI("Optional `%s` parameter is missing: "
2153 "defaulting to `%s`.",
2154 SESS_NOT_FOUND_ACTION_PARAM, SESS_NOT_FOUND_ACTION_CONTINUE_STR);
2155 lttng_live->params.sess_not_found_act = SESSION_NOT_FOUND_ACTION_CONTINUE;
2156 }
2157
2158 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
2159 goto end;
2160
2161 error:
2162 lttng_live_component_destroy_data(lttng_live);
2163 lttng_live = NULL;
2164 end:
2165 g_free(validation_error);
2166
2167 *component = lttng_live;
2168 return status;
2169 }
2170
2171 bt_component_class_initialize_method_status
2172 lttng_live_component_init(bt_self_component_source *self_comp_src,
2173 bt_self_component_source_configuration *, const bt_value *params, void *)
2174 {
2175 struct lttng_live_component *lttng_live;
2176 bt_component_class_initialize_method_status ret;
2177 bt_self_component *self_comp = bt_self_component_source_as_self_component(self_comp_src);
2178 bt_logging_level log_level =
2179 bt_component_get_logging_level(bt_self_component_as_component(self_comp));
2180 bt_self_component_add_port_status add_port_status;
2181
2182 ret = lttng_live_component_create(params, log_level, self_comp, &lttng_live);
2183 if (ret != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
2184 goto error;
2185 }
2186
2187 add_port_status = bt_self_component_source_add_output_port(self_comp_src, "out", NULL, NULL);
2188 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
2189 ret = (bt_component_class_initialize_method_status) add_port_status;
2190 goto end;
2191 }
2192
2193 bt_self_component_set_data(self_comp, lttng_live);
2194 goto end;
2195
2196 error:
2197 lttng_live_component_destroy_data(lttng_live);
2198 lttng_live = NULL;
2199 end:
2200 return ret;
2201 }
This page took 0.118884 seconds and 3 git commands to generate.